//! Fetch autodiscoverd IP addresses stored by Diplonat into Consul use std::collections::HashMap; use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::Arc; use std::time::{Duration, SystemTime}; use anyhow::{anyhow, bail, Result}; use regex::Regex; use serde::{Deserialize, Serialize}; use tokio::{select, sync::watch}; use tracing::*; use df_consul::*; #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] pub struct DiplonatAutodiscoveryResult { pub timestamp: u64, pub address: Option, } #[derive(Default, Debug, Eq, PartialEq)] pub struct AutodiscoveredAddresses { pub ipv4: HashMap>, pub ipv6: HashMap>, } pub fn watch_autodiscovered_ips( consul: Consul, mut must_exit: watch::Receiver, ) -> watch::Receiver> { let (tx, rx) = watch::channel(Arc::new(AutodiscoveredAddresses::default())); let rx2 = rx.clone(); tokio::spawn(async move { let mut last_index = None; let re = Regex::new(r".*autodiscovery/(ipv[46])/([^/]+)$").unwrap(); while !*must_exit.borrow() { let r = select! { _ = must_exit.changed() => continue, r = consul.kv_get_prefix("diplonat/autodiscovery/", last_index) => r, }; let entries = match r { Err(e) => { warn!("Error fetching diplonat autodiscovery consul prefix: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; continue; } Ok(r) => { last_index = Some(r.index()); r.into_inner() } }; let mut addresses = AutodiscoveredAddresses::default(); for (k, v) in entries { if let Err(e) = parse_autodiscovered_address(&re, &mut addresses, &k, &v) { warn!( "Invalid k/v pair in diplonat autodiscovery results: {} = {} ({})", k, std::str::from_utf8(&v).unwrap_or(""), e ); } } if addresses.strip_timestamps() != rx2.borrow().strip_timestamps() { addresses.dump(); } if tx.send(Arc::new(addresses)).is_err() { info!("Autodiscovered addresses watcher terminating"); return; } } }); rx } fn parse_autodiscovered_address( re: &Regex, addresses: &mut AutodiscoveredAddresses, k: &str, v: &[u8], ) -> Result<()> { let caps = re.captures(k).ok_or(anyhow!("key does not match regex"))?; if let (Some(family), Some(node)) = (caps.get(1), caps.get(2)) { match family.as_str() { "ipv4" => { let r: DiplonatAutodiscoveryResult = serde_json::from_slice(v)?; addresses.ipv4.insert(node.as_str().to_string(), r); } "ipv6" => { let r: DiplonatAutodiscoveryResult = serde_json::from_slice(v)?; addresses.ipv6.insert(node.as_str().to_string(), r); } _ => bail!("invalid address family {}", family.as_str()), } } else { bail!("invalid regex captures {:?}", caps); } Ok(()) } impl AutodiscoveredAddresses { fn strip_timestamps( &self, ) -> ( HashMap<&str, Option>, HashMap<&str, Option>, ) { ( self.ipv4 .iter() .map(|(k, v)| (k.as_str(), v.address)) .collect(), self.ipv6 .iter() .map(|(k, v)| (k.as_str(), v.address)) .collect(), ) } fn dump(&self) { println!("---- Autodiscovered addresses (fetched from DiploNAT): ----"); for (k, v) in self.ipv4.iter() { println!(" IPv4 {} {} {:?}", k, v.timestamp, v.address); } for (k, v) in self.ipv6.iter() { println!(" IPv6 {} {} {:?}", k, v.timestamp, v.address); } println!(""); } } pub fn timestamp() -> u64 { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("clock error") .as_secs() }