use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::time::{Duration, SystemTime}; use anyhow::{anyhow, bail, Result}; use log::*; use serde::{Deserialize, Serialize}; use crate::config::{RuntimeConfigConsul, RuntimeConfigStun}; use crate::consul; /// If autodiscovery returns None but an address was obtained less than /// this number of seconds ago (here 15 minutes), we keep that address /// in the Consul db instead of insterting a None. const PERSIST_SOME_RESULT_DURATION_SECS: u64 = 900; pub struct StunActor { consul: consul::Consul, refresh_time: Duration, autodiscovery_v4: StunAutodiscovery, autodiscovery_v6: StunAutodiscovery, } pub struct StunAutodiscovery { consul_key: String, is_ipv4: bool, stun_server: Option, last_result: Option, } #[derive(Serialize, Deserialize, Debug)] pub struct AutodiscoverResult { pub timestamp: u64, pub address: Option, } impl StunActor { pub fn new( consul_config: &RuntimeConfigConsul, stun_config: &RuntimeConfigStun, node: &str, ) -> Self { assert!(stun_config .stun_server_v4 .map(|x| x.is_ipv4()) .unwrap_or(true)); assert!(stun_config.stun_server_v6.is_ipv6()); let autodiscovery_v4 = StunAutodiscovery { consul_key: format!("diplonat/autodiscovery/ipv4/{}", node), is_ipv4: true, stun_server: stun_config.stun_server_v4, last_result: None, }; let autodiscovery_v6 = StunAutodiscovery { consul_key: format!("diplonat/autodiscovery/ipv6/{}", node), is_ipv4: false, stun_server: Some(stun_config.stun_server_v6), last_result: None, }; Self { consul: consul::Consul::new(consul_config), autodiscovery_v4, autodiscovery_v6, refresh_time: stun_config.refresh_time, } } pub async fn listen(&mut self) -> Result<()> { loop { if let Err(e) = self.autodiscovery_v4.do_iteration(&self.consul).await { error!("Unable to autodiscover IPv4 address: {}", e); } if let Err(e) = self.autodiscovery_v6.do_iteration(&self.consul).await { error!("Unable to autodiscover IPv6 address: {}", e); } tokio::time::sleep(self.refresh_time).await; } } } impl StunAutodiscovery { async fn do_iteration(&mut self, consul: &consul::Consul) -> Result<()> { let binding_ip = match self.is_ipv4 { true => IpAddr::V4(Ipv4Addr::UNSPECIFIED), // 0.0.0.0 false => IpAddr::V6(Ipv6Addr::UNSPECIFIED), // [::] }; let binding_addr = SocketAddr::new(binding_ip, 0); let discovered_addr = match self.stun_server { Some(stun_server) => { assert_eq!(self.is_ipv4, stun_server.is_ipv4()); get_mapped_addr(stun_server, binding_addr) .await? .map(|x| x.ip()) } None => None, }; let now = timestamp(); if discovered_addr.is_none() { if let Some(last_result) = &self.last_result { if last_result.address.is_some() && now - last_result.timestamp <= PERSIST_SOME_RESULT_DURATION_SECS { // Keep non-None result that was obtained before by not // writing/taking into account None result. return Ok(()); } } } let current_result = AutodiscoverResult { timestamp: now, address: discovered_addr, }; let msg = format!( "STUN autodiscovery result: {} -> {:?}", self.consul_key, discovered_addr ); if self.last_result.as_ref().and_then(|x| x.address) != discovered_addr { info!("{}", msg); } else { debug!("{}", msg); } consul .kv_put(&self.consul_key, serde_json::to_vec(¤t_result)?) .await?; self.last_result = Some(current_result); Ok(()) } } async fn get_mapped_addr( stun_server: SocketAddr, binding_addr: SocketAddr, ) -> Result> { use stun_client::*; let mut client = Client::new(binding_addr, None).await?; let res = match client.binding_request(stun_server, None).await { Err(e) => { info!( "STUN binding request to {} failed, assuming no address (error: {})", binding_addr, e ); return Ok(None); } Ok(r) => r, }; if res.get_class() != Class::SuccessResponse { bail!("STUN server did not responde with a success response"); } let xor_mapped_addr = Attribute::get_xor_mapped_address(&res) .ok_or(anyhow!("no XorMappedAddress found in STUN response"))?; Ok(Some(xor_mapped_addr)) } fn timestamp() -> u64 { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("clock error") .as_secs() }