Watch multiple consul nodes
This commit is contained in:
parent
bb77e7c459
commit
0682c74e9d
5 changed files with 245 additions and 26 deletions
115
Cargo.lock
generated
115
Cargo.lock
generated
|
@ -28,6 +28,15 @@ dependencies = [
|
|||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ansi_term"
|
||||
version = "0.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
|
||||
dependencies = [
|
||||
"winapi 0.3.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.51"
|
||||
|
@ -136,6 +145,21 @@ version = "1.4.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fff857943da45f546682664a79488be82e69e43c1a7a2307679ab9afb3a66d2e"
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "2.34.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c"
|
||||
dependencies = [
|
||||
"ansi_term",
|
||||
"atty",
|
||||
"bitflags",
|
||||
"strsim",
|
||||
"textwrap",
|
||||
"unicode-width",
|
||||
"vec_map",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cloudabi"
|
||||
version = "0.0.3"
|
||||
|
@ -491,6 +515,15 @@ version = "0.11.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
|
||||
dependencies = [
|
||||
"unicode-segmentation",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.1.19"
|
||||
|
@ -994,6 +1027,30 @@ dependencies = [
|
|||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
|
||||
dependencies = [
|
||||
"proc-macro-error-attr",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error-attr"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-hack"
|
||||
version = "0.5.19"
|
||||
|
@ -1453,6 +1510,36 @@ dependencies = [
|
|||
"bytes 0.4.12",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "strsim"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
|
||||
|
||||
[[package]]
|
||||
name = "structopt"
|
||||
version = "0.3.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40b9788f4202aa75c240ecc9c15c65185e6a39ccdeb0fd5d008b98825464c87c"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"lazy_static",
|
||||
"structopt-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "structopt-derive"
|
||||
version = "0.4.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro-error",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.82"
|
||||
|
@ -1487,6 +1574,15 @@ dependencies = [
|
|||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "textwrap"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
|
||||
dependencies = [
|
||||
"unicode-width",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "time"
|
||||
version = "0.1.43"
|
||||
|
@ -1796,6 +1892,7 @@ dependencies = [
|
|||
"rustls-pemfile",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"structopt",
|
||||
"tokio 1.14.0",
|
||||
"tokio-rustls",
|
||||
"unicase",
|
||||
|
@ -1831,6 +1928,18 @@ dependencies = [
|
|||
"tinyvec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicode-segmentation"
|
||||
version = "1.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-width"
|
||||
version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-xid"
|
||||
version = "0.2.2"
|
||||
|
@ -1880,6 +1989,12 @@ version = "0.2.15"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
|
||||
|
||||
[[package]]
|
||||
name = "vec_map"
|
||||
version = "0.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
|
||||
|
||||
[[package]]
|
||||
name = "version_check"
|
||||
version = "0.9.3"
|
||||
|
|
|
@ -30,3 +30,4 @@ http = "0.2"
|
|||
hyper-reverse-proxy = "0.4"
|
||||
unicase = "2"
|
||||
lazy_static = "1.4"
|
||||
structopt = "0.3"
|
||||
|
|
|
@ -7,6 +7,12 @@ use reqwest::StatusCode;
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
// ---- Watch and retrieve Consul catalog ----
|
||||
//
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct ConsulNodeListNode {
|
||||
#[serde(rename = "Node")]
|
||||
pub node: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct ConsulServiceEntry {
|
||||
|
@ -59,8 +65,6 @@ pub struct Consul {
|
|||
url: String,
|
||||
kv_prefix: String,
|
||||
|
||||
idx: Option<u64>,
|
||||
|
||||
pub local_node: String,
|
||||
}
|
||||
|
||||
|
@ -70,29 +74,36 @@ impl Consul {
|
|||
client: reqwest::Client::new(),
|
||||
url: url.to_string(),
|
||||
kv_prefix: kv_prefix.to_string(),
|
||||
idx: None,
|
||||
local_node: local_node.into(),
|
||||
};
|
||||
}
|
||||
|
||||
pub fn watch_node_reset(&mut self) -> () {
|
||||
self.idx = None;
|
||||
pub async fn list_nodes(&self) -> Result<Vec<String>> {
|
||||
debug!("list_nodes");
|
||||
|
||||
let url = format!("{}/v1/catalog/nodes", self.url);
|
||||
|
||||
let http = self.client.get(&url).send().await?;
|
||||
let resp: Vec<ConsulNodeListNode> = http.json().await?;
|
||||
Ok(resp.into_iter().map(|n| n.node).collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
pub async fn watch_node(&mut self, host: &str) -> Result<ConsulNodeCatalog> {
|
||||
let url = match self.idx {
|
||||
pub async fn watch_node(&self, host: &str, idx: Option<usize>) -> Result<(ConsulNodeCatalog, usize)> {
|
||||
debug!("watch_node {} {:?}", host, idx);
|
||||
|
||||
let url = match idx {
|
||||
Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
|
||||
None => format!("{}/v1/catalog/node/{}", self.url, host),
|
||||
};
|
||||
|
||||
let http = self.client.get(&url).send().await?;
|
||||
self.idx = match http.headers().get("X-Consul-Index") {
|
||||
Some(v) => Some(v.to_str()?.parse::<u64>()?),
|
||||
None => return Err(anyhow!("X-Consul-Index header not found")),
|
||||
let new_idx = match http.headers().get("X-Consul-Index") {
|
||||
Some(v) => v.to_str()?.parse::<usize>()?,
|
||||
None => bail!("X-Consul-Index header not found"),
|
||||
};
|
||||
|
||||
let resp: ConsulNodeCatalog = http.json().await?;
|
||||
return Ok(resp);
|
||||
return Ok((resp, new_idx));
|
||||
}
|
||||
|
||||
// ---- KV get and put ----
|
||||
|
|
23
src/main.rs
23
src/main.rs
|
@ -1,6 +1,8 @@
|
|||
#[macro_use]
|
||||
extern crate anyhow;
|
||||
|
||||
use structopt::StructOpt;
|
||||
|
||||
mod cert;
|
||||
mod cert_store;
|
||||
mod consul;
|
||||
|
@ -11,15 +13,34 @@ mod reverse_proxy;
|
|||
|
||||
use log::*;
|
||||
|
||||
#[derive(StructOpt, Debug)]
|
||||
#[structopt(name = "tricot")]
|
||||
struct Opt {
|
||||
/// Address of consul server
|
||||
#[structopt(long = "consul-addr", env = "TRICOT_CONSUL_HOST", default_value = "http://127.0.0.1:8500/")]
|
||||
pub consul_addr: String,
|
||||
|
||||
/// Prefix of Tricot's entries in Consul KV space
|
||||
#[structopt(long = "consul-kv-prefix", env = "TRICOT_CONSUL_KV_PREFIX", default_value = "tricot/")]
|
||||
pub consul_kv_prefix: String,
|
||||
|
||||
/// Node name
|
||||
#[structopt(long = "node-name", env = "TRICOT_NODE_NAME", default_value = "<none>")]
|
||||
pub node_name: String,
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||
async fn main() {
|
||||
if std::env::var("RUST_LOG").is_err() {
|
||||
std::env::set_var("RUST_LOG", "tricot=debug")
|
||||
}
|
||||
pretty_env_logger::init();
|
||||
|
||||
let opt = Opt::from_args();
|
||||
|
||||
info!("Starting Tricot");
|
||||
|
||||
let consul = consul::Consul::new("http://10.42.0.21:8500", "tricot/", "carcajou");
|
||||
let consul = consul::Consul::new(&opt.consul_addr, &opt.consul_kv_prefix, &opt.node_name);
|
||||
let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone());
|
||||
|
||||
let cert_store = cert_store::CertStore::new(consul.clone());
|
||||
|
|
|
@ -1,7 +1,13 @@
|
|||
use std::net::SocketAddr;
|
||||
use std::sync::{atomic, Arc};
|
||||
use std::collections::HashMap;
|
||||
use std::{cmp, time::Duration};
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use futures::future::{BoxFuture};
|
||||
|
||||
use log::*;
|
||||
use tokio::{sync::watch, time::sleep};
|
||||
|
||||
|
@ -63,7 +69,7 @@ fn parse_tricot_tag(target_addr: SocketAddr, tag: &str) -> Option<ProxyEntry> {
|
|||
})
|
||||
}
|
||||
|
||||
fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig {
|
||||
fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> Vec<ProxyEntry> {
|
||||
let mut entries = vec![];
|
||||
|
||||
for (_, svc) in catalog.services.iter() {
|
||||
|
@ -79,37 +85,102 @@ fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig {
|
|||
}
|
||||
}
|
||||
|
||||
ProxyConfig { entries }
|
||||
entries
|
||||
}
|
||||
|
||||
pub fn spawn_proxy_config_task(mut consul: Consul) -> watch::Receiver<Arc<ProxyConfig>> {
|
||||
#[derive(Default)]
|
||||
struct NodeWatchState {
|
||||
last_idx: Option<usize>,
|
||||
last_catalog: Option<ConsulNodeCatalog>,
|
||||
retries: u32,
|
||||
}
|
||||
|
||||
pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfig>> {
|
||||
let (tx, rx) = watch::channel(Arc::new(ProxyConfig {
|
||||
entries: Vec::new(),
|
||||
}));
|
||||
|
||||
let consul = Arc::new(consul);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut retries = 0;
|
||||
let node = consul.local_node.clone();
|
||||
let mut nodes = HashMap::new();
|
||||
let mut watches = FuturesUnordered::<BoxFuture<'static, (String, Result<_>)>>::new();
|
||||
|
||||
loop {
|
||||
let catalog = match consul.watch_node(&node).await {
|
||||
Ok(c) => c,
|
||||
match consul.list_nodes().await {
|
||||
Ok(consul_nodes) => {
|
||||
info!("Watched consul nodes: {:?}", consul_nodes);
|
||||
for node in consul_nodes {
|
||||
if !nodes.contains_key(&node) {
|
||||
nodes.insert(node.clone(), NodeWatchState::default());
|
||||
|
||||
let node = node.to_string();
|
||||
let consul = consul.clone();
|
||||
|
||||
watches.push(Box::pin(async move {
|
||||
let res = consul.watch_node(&node, None).await;
|
||||
(node, res)
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
consul.watch_node_reset();
|
||||
retries = cmp::min(std::u32::MAX - 1, retries) + 1;
|
||||
let will_retry_in = retry_to_time(retries, Duration::from_secs(600));
|
||||
warn!("Could not get Consul node list: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
let (node, res): (String, Result<_>) = match watches.next().await {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
warn!("No nodes currently watched in proxy_config.rs");
|
||||
sleep(Duration::from_secs(10)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok((catalog, new_idx)) => {
|
||||
let mut watch_state = nodes.get_mut(&node).unwrap();
|
||||
watch_state.last_idx = Some(new_idx);
|
||||
watch_state.last_catalog = Some(catalog);
|
||||
watch_state.retries = 0;
|
||||
|
||||
let idx = watch_state.last_idx;
|
||||
let consul = consul.clone();
|
||||
watches.push(Box::pin(async move {
|
||||
let res = consul.watch_node(&node, idx).await;
|
||||
(node, res)
|
||||
}));
|
||||
}
|
||||
Err(e) => {
|
||||
let mut watch_state = nodes.get_mut(&node).unwrap();
|
||||
watch_state.retries += 1;
|
||||
watch_state.last_idx = None;
|
||||
|
||||
let will_retry_in = retry_to_time(watch_state.retries, Duration::from_secs(600));
|
||||
error!(
|
||||
"Failed to query consul. Will retry in {}s. {}",
|
||||
will_retry_in.as_secs(),
|
||||
e
|
||||
);
|
||||
|
||||
let consul = consul.clone();
|
||||
watches.push(Box::pin(async move {
|
||||
sleep(will_retry_in).await;
|
||||
let res = consul.watch_node(&node, None).await;
|
||||
(node, res)
|
||||
}));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
retries = 0;
|
||||
}
|
||||
|
||||
let config = parse_consul_catalog(&catalog);
|
||||
let mut entries = vec![];
|
||||
for (_, watch_state) in nodes.iter() {
|
||||
if let Some(catalog) = &watch_state.last_catalog {
|
||||
entries.extend(parse_consul_catalog(catalog));
|
||||
}
|
||||
}
|
||||
let config = ProxyConfig { entries };
|
||||
debug!("Extracted configuration: {:#?}", config);
|
||||
|
||||
tx.send(Arc::new(config)).expect("Internal error");
|
||||
|
|
Loading…
Reference in a new issue