//! Fetch autodiscoverd IP addresses stored by Diplonat into Consul use std::collections::HashMap; use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::Arc; use std::time::{Duration, SystemTime}; use anyhow::{anyhow, bail, Result}; use regex::Regex; use serde::{Deserialize, Serialize}; use tokio::{select, sync::watch}; use tracing::*; use df_consul::*; #[derive(Serialize, Deserialize, Debug)] pub struct DiplonatAutodiscoveryResult { pub timestamp: u64, pub address: Option, } #[derive(Default, Debug)] pub struct AutodiscoveredAddresses { pub ipv4: HashMap>, pub ipv6: HashMap>, } pub fn watch_autodiscovered_ips( consul: Consul, mut must_exit: watch::Receiver, ) -> watch::Receiver> { let (tx, rx) = watch::channel(Arc::new(AutodiscoveredAddresses::default())); tokio::spawn(async move { let mut last_index = None; let re = Regex::new(r".*autodiscovery/(\w+)/(\w+)$").unwrap(); while !*must_exit.borrow() { let r = select! { _ = must_exit.changed() => continue, r = consul.kv_get_prefix("diplonat/autodiscovery/", last_index) => r, }; let entries = match r { Err(e) => { warn!("Error fetching diplonat autodiscovery consul prefix: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; continue; } Ok(r) => { last_index = Some(r.index()); r.into_inner() } }; let mut addresses = AutodiscoveredAddresses::default(); for (k, v) in entries { if let Err(e) = parse_autodiscovered_address(&re, &mut addresses, &k, &v) { warn!( "Invalid k/v pair in diplonat autodiscovery results: {} = {} ({})", k, std::str::from_utf8(&v).unwrap_or(""), e ); } } if tx.send(Arc::new(addresses)).is_err() { info!("Autodiscovered addresses watcher terminating"); } } }); rx } fn parse_autodiscovered_address( re: &Regex, addresses: &mut AutodiscoveredAddresses, k: &str, v: &[u8], ) -> Result<()> { let caps = re.captures(k).ok_or(anyhow!("key does not match regex"))?; if let (Some(family), Some(node)) = (caps.get(1), caps.get(2)) { match family.as_str() { "ipv4" => { let r: DiplonatAutodiscoveryResult = serde_json::from_slice(v)?; addresses.ipv4.insert(node.as_str().to_string(), r); } "ipv6" => { let r: DiplonatAutodiscoveryResult = serde_json::from_slice(v)?; addresses.ipv6.insert(node.as_str().to_string(), r); } _ => bail!("invalid address family {}", family.as_str()), } } else { bail!("invalid regex captures {:?}", caps); } Ok(()) } pub fn timestamp() -> u64 { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("clock error") .as_secs() }