2023-04-21 11:51:21 +00:00
|
|
|
//! Fetch autodiscoverd IP addresses stored by Diplonat into Consul
|
|
|
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
use std::net::{Ipv4Addr, Ipv6Addr};
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::time::{Duration, SystemTime};
|
|
|
|
|
|
|
|
use anyhow::{anyhow, bail, Result};
|
|
|
|
use regex::Regex;
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
use tokio::{select, sync::watch};
|
|
|
|
use tracing::*;
|
|
|
|
|
|
|
|
use df_consul::*;
|
|
|
|
|
2023-04-21 12:14:07 +00:00
|
|
|
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
|
2023-04-21 11:51:21 +00:00
|
|
|
pub struct DiplonatAutodiscoveryResult<A> {
|
|
|
|
pub timestamp: u64,
|
|
|
|
pub address: Option<A>,
|
|
|
|
}
|
|
|
|
|
2023-04-21 12:14:07 +00:00
|
|
|
#[derive(Default, Debug, Eq, PartialEq)]
|
2023-04-21 11:51:21 +00:00
|
|
|
pub struct AutodiscoveredAddresses {
|
|
|
|
pub ipv4: HashMap<String, DiplonatAutodiscoveryResult<Ipv4Addr>>,
|
|
|
|
pub ipv6: HashMap<String, DiplonatAutodiscoveryResult<Ipv6Addr>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn watch_autodiscovered_ips(
|
|
|
|
consul: Consul,
|
|
|
|
mut must_exit: watch::Receiver<bool>,
|
|
|
|
) -> watch::Receiver<Arc<AutodiscoveredAddresses>> {
|
|
|
|
let (tx, rx) = watch::channel(Arc::new(AutodiscoveredAddresses::default()));
|
2023-04-21 12:14:07 +00:00
|
|
|
let rx2 = rx.clone();
|
2023-04-21 11:51:21 +00:00
|
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
let mut last_index = None;
|
2023-04-21 11:55:59 +00:00
|
|
|
let re = Regex::new(r".*autodiscovery/(ipv[46])/([^/]+)$").unwrap();
|
2023-04-21 11:51:21 +00:00
|
|
|
|
|
|
|
while !*must_exit.borrow() {
|
|
|
|
let r = select! {
|
|
|
|
_ = must_exit.changed() => continue,
|
|
|
|
r = consul.kv_get_prefix("diplonat/autodiscovery/", last_index) => r,
|
|
|
|
};
|
|
|
|
|
|
|
|
let entries = match r {
|
|
|
|
Err(e) => {
|
|
|
|
warn!("Error fetching diplonat autodiscovery consul prefix: {}", e);
|
|
|
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
Ok(r) => {
|
|
|
|
last_index = Some(r.index());
|
|
|
|
r.into_inner()
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut addresses = AutodiscoveredAddresses::default();
|
|
|
|
|
|
|
|
for (k, v) in entries {
|
|
|
|
if let Err(e) = parse_autodiscovered_address(&re, &mut addresses, &k, &v) {
|
|
|
|
warn!(
|
|
|
|
"Invalid k/v pair in diplonat autodiscovery results: {} = {} ({})",
|
|
|
|
k,
|
|
|
|
std::str::from_utf8(&v).unwrap_or("<???>"),
|
|
|
|
e
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-04-21 12:14:07 +00:00
|
|
|
if addresses.strip_timestamps() != rx2.borrow().strip_timestamps() {
|
|
|
|
addresses.dump();
|
|
|
|
}
|
|
|
|
|
2023-04-21 11:51:21 +00:00
|
|
|
if tx.send(Arc::new(addresses)).is_err() {
|
|
|
|
info!("Autodiscovered addresses watcher terminating");
|
2023-04-21 12:14:07 +00:00
|
|
|
return;
|
2023-04-21 11:51:21 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
rx
|
|
|
|
}
|
|
|
|
|
|
|
|
fn parse_autodiscovered_address(
|
|
|
|
re: &Regex,
|
|
|
|
addresses: &mut AutodiscoveredAddresses,
|
|
|
|
k: &str,
|
|
|
|
v: &[u8],
|
|
|
|
) -> Result<()> {
|
|
|
|
let caps = re.captures(k).ok_or(anyhow!("key does not match regex"))?;
|
|
|
|
|
|
|
|
if let (Some(family), Some(node)) = (caps.get(1), caps.get(2)) {
|
|
|
|
match family.as_str() {
|
|
|
|
"ipv4" => {
|
|
|
|
let r: DiplonatAutodiscoveryResult<Ipv4Addr> = serde_json::from_slice(v)?;
|
|
|
|
addresses.ipv4.insert(node.as_str().to_string(), r);
|
|
|
|
}
|
|
|
|
"ipv6" => {
|
|
|
|
let r: DiplonatAutodiscoveryResult<Ipv6Addr> = serde_json::from_slice(v)?;
|
|
|
|
addresses.ipv6.insert(node.as_str().to_string(), r);
|
|
|
|
}
|
|
|
|
_ => bail!("invalid address family {}", family.as_str()),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
bail!("invalid regex captures {:?}", caps);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-04-21 12:14:07 +00:00
|
|
|
impl AutodiscoveredAddresses {
|
2023-04-21 13:58:46 +00:00
|
|
|
fn strip_timestamps(
|
|
|
|
&self,
|
|
|
|
) -> (
|
|
|
|
HashMap<&str, Option<Ipv4Addr>>,
|
|
|
|
HashMap<&str, Option<Ipv6Addr>>,
|
|
|
|
) {
|
|
|
|
(
|
|
|
|
self.ipv4
|
|
|
|
.iter()
|
|
|
|
.map(|(k, v)| (k.as_str(), v.address))
|
|
|
|
.collect(),
|
|
|
|
self.ipv6
|
|
|
|
.iter()
|
|
|
|
.map(|(k, v)| (k.as_str(), v.address))
|
|
|
|
.collect(),
|
|
|
|
)
|
2023-04-21 12:14:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn dump(&self) {
|
|
|
|
println!("---- Autodiscovered addresses (fetched from DiploNAT): ----");
|
|
|
|
for (k, v) in self.ipv4.iter() {
|
|
|
|
println!(" IPv4 {} {} {:?}", k, v.timestamp, v.address);
|
|
|
|
}
|
|
|
|
for (k, v) in self.ipv6.iter() {
|
|
|
|
println!(" IPv6 {} {} {:?}", k, v.timestamp, v.address);
|
|
|
|
}
|
|
|
|
println!("");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-04-21 11:51:21 +00:00
|
|
|
pub fn timestamp() -> u64 {
|
|
|
|
SystemTime::now()
|
|
|
|
.duration_since(SystemTime::UNIX_EPOCH)
|
|
|
|
.expect("clock error")
|
|
|
|
.as_secs()
|
|
|
|
}
|