From 0682c74e9d5083b43b3f83f8bb1ca747658d1455 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 7 Dec 2021 17:56:15 +0100 Subject: [PATCH] Watch multiple consul nodes --- Cargo.lock | 115 ++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/consul.rs | 33 ++++++++----- src/main.rs | 23 ++++++++- src/proxy_config.rs | 99 ++++++++++++++++++++++++++++++++------ 5 files changed, 245 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef90926..e297475 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,6 +28,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "anyhow" version = "1.0.51" @@ -136,6 +145,21 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fff857943da45f546682664a79488be82e69e43c1a7a2307679ab9afb3a66d2e" +[[package]] +name = "clap" +version = "2.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" +dependencies = [ + "ansi_term", + "atty", + "bitflags", + "strsim", + "textwrap", + "unicode-width", + "vec_map", +] + [[package]] name = "cloudabi" version = "0.0.3" @@ -491,6 +515,15 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -994,6 +1027,30 @@ dependencies = [ "log", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -1453,6 +1510,36 @@ dependencies = [ "bytes 0.4.12", ] +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + +[[package]] +name = "structopt" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40b9788f4202aa75c240ecc9c15c65185e6a39ccdeb0fd5d008b98825464c87c" +dependencies = [ + "clap", + "lazy_static", + "structopt-derive", +] + +[[package]] +name = "structopt-derive" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "syn" version = "1.0.82" @@ -1487,6 +1574,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + [[package]] name = "time" version = "0.1.43" @@ -1796,6 +1892,7 @@ dependencies = [ "rustls-pemfile", "serde", "serde_json", + "structopt", "tokio 1.14.0", "tokio-rustls", "unicase", @@ -1831,6 +1928,18 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b" + +[[package]] +name = "unicode-width" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" + [[package]] name = "unicode-xid" version = "0.2.2" @@ -1880,6 +1989,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + [[package]] name = "version_check" version = "0.9.3" diff --git a/Cargo.toml b/Cargo.toml index d5f7aa9..5c6cb2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,3 +30,4 @@ http = "0.2" hyper-reverse-proxy = "0.4" unicase = "2" lazy_static = "1.4" +structopt = "0.3" diff --git a/src/consul.rs b/src/consul.rs index 6fc031c..240c177 100644 --- a/src/consul.rs +++ b/src/consul.rs @@ -7,6 +7,12 @@ use reqwest::StatusCode; use serde::{Deserialize, Serialize}; // ---- Watch and retrieve Consul catalog ---- +// +#[derive(Serialize, Deserialize, Debug)] +pub struct ConsulNodeListNode { + #[serde(rename = "Node")] + pub node: String, +} #[derive(Serialize, Deserialize, Debug)] pub struct ConsulServiceEntry { @@ -59,8 +65,6 @@ pub struct Consul { url: String, kv_prefix: String, - idx: Option, - pub local_node: String, } @@ -70,29 +74,36 @@ impl Consul { client: reqwest::Client::new(), url: url.to_string(), kv_prefix: kv_prefix.to_string(), - idx: None, local_node: local_node.into(), }; } - pub fn watch_node_reset(&mut self) -> () { - self.idx = None; + pub async fn list_nodes(&self) -> Result> { + debug!("list_nodes"); + + let url = format!("{}/v1/catalog/nodes", self.url); + + let http = self.client.get(&url).send().await?; + let resp: Vec = http.json().await?; + Ok(resp.into_iter().map(|n| n.node).collect::>()) } - pub async fn watch_node(&mut self, host: &str) -> Result { - let url = match self.idx { + pub async fn watch_node(&self, host: &str, idx: Option) -> Result<(ConsulNodeCatalog, usize)> { + debug!("watch_node {} {:?}", host, idx); + + let url = match idx { Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i), None => format!("{}/v1/catalog/node/{}", self.url, host), }; let http = self.client.get(&url).send().await?; - self.idx = match http.headers().get("X-Consul-Index") { - Some(v) => Some(v.to_str()?.parse::()?), - None => return Err(anyhow!("X-Consul-Index header not found")), + let new_idx = match http.headers().get("X-Consul-Index") { + Some(v) => v.to_str()?.parse::()?, + None => bail!("X-Consul-Index header not found"), }; let resp: ConsulNodeCatalog = http.json().await?; - return Ok(resp); + return Ok((resp, new_idx)); } // ---- KV get and put ---- diff --git a/src/main.rs b/src/main.rs index 9d710b2..4a0c0ec 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,8 @@ #[macro_use] extern crate anyhow; +use structopt::StructOpt; + mod cert; mod cert_store; mod consul; @@ -11,15 +13,34 @@ mod reverse_proxy; use log::*; +#[derive(StructOpt, Debug)] +#[structopt(name = "tricot")] +struct Opt { + /// Address of consul server + #[structopt(long = "consul-addr", env = "TRICOT_CONSUL_HOST", default_value = "http://127.0.0.1:8500/")] + pub consul_addr: String, + + /// Prefix of Tricot's entries in Consul KV space + #[structopt(long = "consul-kv-prefix", env = "TRICOT_CONSUL_KV_PREFIX", default_value = "tricot/")] + pub consul_kv_prefix: String, + + /// Node name + #[structopt(long = "node-name", env = "TRICOT_NODE_NAME", default_value = "")] + pub node_name: String, +} + #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() { if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", "tricot=debug") } pretty_env_logger::init(); + + let opt = Opt::from_args(); + info!("Starting Tricot"); - let consul = consul::Consul::new("http://10.42.0.21:8500", "tricot/", "carcajou"); + let consul = consul::Consul::new(&opt.consul_addr, &opt.consul_kv_prefix, &opt.node_name); let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone()); let cert_store = cert_store::CertStore::new(consul.clone()); diff --git a/src/proxy_config.rs b/src/proxy_config.rs index ba58484..31a2659 100644 --- a/src/proxy_config.rs +++ b/src/proxy_config.rs @@ -1,7 +1,13 @@ use std::net::SocketAddr; use std::sync::{atomic, Arc}; +use std::collections::HashMap; use std::{cmp, time::Duration}; +use anyhow::Result; + +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::future::{BoxFuture}; + use log::*; use tokio::{sync::watch, time::sleep}; @@ -63,7 +69,7 @@ fn parse_tricot_tag(target_addr: SocketAddr, tag: &str) -> Option { }) } -fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig { +fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> Vec { let mut entries = vec![]; for (_, svc) in catalog.services.iter() { @@ -79,37 +85,102 @@ fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig { } } - ProxyConfig { entries } + entries } -pub fn spawn_proxy_config_task(mut consul: Consul) -> watch::Receiver> { +#[derive(Default)] +struct NodeWatchState { + last_idx: Option, + last_catalog: Option, + retries: u32, +} + +pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver> { let (tx, rx) = watch::channel(Arc::new(ProxyConfig { entries: Vec::new(), })); + + let consul = Arc::new(consul); tokio::spawn(async move { - let mut retries = 0; - let node = consul.local_node.clone(); + let mut nodes = HashMap::new(); + let mut watches = FuturesUnordered::)>>::new(); loop { - let catalog = match consul.watch_node(&node).await { - Ok(c) => c, + match consul.list_nodes().await { + Ok(consul_nodes) => { + info!("Watched consul nodes: {:?}", consul_nodes); + for node in consul_nodes { + if !nodes.contains_key(&node) { + nodes.insert(node.clone(), NodeWatchState::default()); + + let node = node.to_string(); + let consul = consul.clone(); + + watches.push(Box::pin(async move { + let res = consul.watch_node(&node, None).await; + (node, res) + })); + } + } + } Err(e) => { - consul.watch_node_reset(); - retries = cmp::min(std::u32::MAX - 1, retries) + 1; - let will_retry_in = retry_to_time(retries, Duration::from_secs(600)); + warn!("Could not get Consul node list: {}", e); + } + } + + let (node, res): (String, Result<_>) = match watches.next().await { + Some(v) => v, + None => { + warn!("No nodes currently watched in proxy_config.rs"); + sleep(Duration::from_secs(10)).await; + continue; + } + }; + + match res { + Ok((catalog, new_idx)) => { + let mut watch_state = nodes.get_mut(&node).unwrap(); + watch_state.last_idx = Some(new_idx); + watch_state.last_catalog = Some(catalog); + watch_state.retries = 0; + + let idx = watch_state.last_idx; + let consul = consul.clone(); + watches.push(Box::pin(async move { + let res = consul.watch_node(&node, idx).await; + (node, res) + })); + } + Err(e) => { + let mut watch_state = nodes.get_mut(&node).unwrap(); + watch_state.retries += 1; + watch_state.last_idx = None; + + let will_retry_in = retry_to_time(watch_state.retries, Duration::from_secs(600)); error!( "Failed to query consul. Will retry in {}s. {}", will_retry_in.as_secs(), e ); - sleep(will_retry_in).await; + + let consul = consul.clone(); + watches.push(Box::pin(async move { + sleep(will_retry_in).await; + let res = consul.watch_node(&node, None).await; + (node, res) + })); continue; } - }; - retries = 0; + } - let config = parse_consul_catalog(&catalog); + let mut entries = vec![]; + for (_, watch_state) in nodes.iter() { + if let Some(catalog) = &watch_state.last_catalog { + entries.extend(parse_consul_catalog(catalog)); + } + } + let config = ProxyConfig { entries }; debug!("Extracted configuration: {:#?}", config); tx.send(Arc::new(config)).expect("Internal error");