STUN actor: try to avoid ip address flapping #21
2 changed files with 78 additions and 49 deletions
|
@ -1,4 +1,4 @@
|
||||||
FROM rust:1.57-bullseye as builder
|
FROM rust:1.69-bullseye as builder
|
||||||
|
|
||||||
RUN apt-get update && \
|
RUN apt-get update && \
|
||||||
apt-get install -y libssl-dev pkg-config
|
apt-get install -y libssl-dev pkg-config
|
||||||
|
|
|
@ -8,12 +8,24 @@ use serde::{Deserialize, Serialize};
|
||||||
use crate::config::{RuntimeConfigConsul, RuntimeConfigStun};
|
use crate::config::{RuntimeConfigConsul, RuntimeConfigStun};
|
||||||
use crate::consul;
|
use crate::consul;
|
||||||
|
|
||||||
|
/// If autodiscovery returns None but an address was obtained less than
|
||||||
|
/// this number of seconds ago (here 15 minutes), we keep that address
|
||||||
|
/// in the Consul db instead of insterting a None.
|
||||||
|
const PERSIST_SOME_RESULT_DURATION_SECS: u64 = 900;
|
||||||
|
|
||||||
pub struct StunActor {
|
pub struct StunActor {
|
||||||
node: String,
|
|
||||||
consul: consul::Consul,
|
consul: consul::Consul,
|
||||||
stun_server_v4: Option<SocketAddr>,
|
|
||||||
stun_server_v6: SocketAddr,
|
|
||||||
refresh_time: Duration,
|
refresh_time: Duration,
|
||||||
|
|
||||||
|
autodiscovery_v4: StunAutodiscovery,
|
||||||
|
autodiscovery_v6: StunAutodiscovery,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct StunAutodiscovery {
|
||||||
|
consul_key: String,
|
||||||
|
is_ipv4: bool,
|
||||||
|
stun_server: Option<SocketAddr>,
|
||||||
|
last_result: Option<AutodiscoverResult>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
@ -34,78 +46,95 @@ impl StunActor {
|
||||||
.unwrap_or(true));
|
.unwrap_or(true));
|
||||||
assert!(stun_config.stun_server_v6.is_ipv6());
|
assert!(stun_config.stun_server_v6.is_ipv6());
|
||||||
|
|
||||||
|
let autodiscovery_v4 = StunAutodiscovery {
|
||||||
|
consul_key: format!("diplonat/autodiscovery/ipv4/{}", node),
|
||||||
|
is_ipv4: true,
|
||||||
|
stun_server: stun_config.stun_server_v4,
|
||||||
|
last_result: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let autodiscovery_v6 = StunAutodiscovery {
|
||||||
|
consul_key: format!("diplonat/autodiscovery/ipv6/{}", node),
|
||||||
|
is_ipv4: false,
|
||||||
|
stun_server: Some(stun_config.stun_server_v6),
|
||||||
|
last_result: None,
|
||||||
|
};
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
consul: consul::Consul::new(consul_config),
|
consul: consul::Consul::new(consul_config),
|
||||||
node: node.to_string(),
|
autodiscovery_v4,
|
||||||
stun_server_v4: stun_config.stun_server_v4,
|
autodiscovery_v6,
|
||||||
stun_server_v6: stun_config.stun_server_v6,
|
|
||||||
refresh_time: stun_config.refresh_time,
|
refresh_time: stun_config.refresh_time,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn listen(&mut self) -> Result<()> {
|
pub async fn listen(&mut self) -> Result<()> {
|
||||||
loop {
|
loop {
|
||||||
let ipv4_result = match self.stun_server_v4 {
|
if let Err(e) = self.autodiscovery_v4.do_iteration(&self.consul).await {
|
||||||
Some(stun_server_v4) => self.autodiscover_ip(stun_server_v4).await,
|
|
||||||
None => self.autodiscover_none_ipv4().await,
|
|
||||||
};
|
|
||||||
if let Err(e) = ipv4_result {
|
|
||||||
error!("Unable to autodiscover IPv4 address: {}", e);
|
error!("Unable to autodiscover IPv4 address: {}", e);
|
||||||
}
|
}
|
||||||
if let Err(e) = self.autodiscover_ip(self.stun_server_v6).await {
|
|
||||||
|
if let Err(e) = self.autodiscovery_v6.do_iteration(&self.consul).await {
|
||||||
error!("Unable to autodiscover IPv6 address: {}", e);
|
error!("Unable to autodiscover IPv6 address: {}", e);
|
||||||
}
|
}
|
||||||
tokio::time::sleep(self.refresh_time).await;
|
tokio::time::sleep(self.refresh_time).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn autodiscover_ip(&self, stun_server: SocketAddr) -> Result<()> {
|
impl StunAutodiscovery {
|
||||||
let binding_ip = match stun_server.is_ipv4() {
|
async fn do_iteration(&mut self, consul: &consul::Consul) -> Result<()> {
|
||||||
|
let binding_ip = match self.is_ipv4 {
|
||||||
true => IpAddr::V4(Ipv4Addr::UNSPECIFIED), // 0.0.0.0
|
true => IpAddr::V4(Ipv4Addr::UNSPECIFIED), // 0.0.0.0
|
||||||
false => IpAddr::V6(Ipv6Addr::UNSPECIFIED), // [::]
|
false => IpAddr::V6(Ipv6Addr::UNSPECIFIED), // [::]
|
||||||
};
|
};
|
||||||
let binding_addr = SocketAddr::new(binding_ip, 0);
|
let binding_addr = SocketAddr::new(binding_ip, 0);
|
||||||
|
|
||||||
let discovered_addr = get_mapped_addr(stun_server, binding_addr)
|
let discovered_addr = match self.stun_server {
|
||||||
.await?
|
Some(stun_server) => {
|
||||||
.map(|x| x.ip());
|
assert_eq!(self.is_ipv4, stun_server.is_ipv4());
|
||||||
|
|
||||||
let consul_key = match stun_server.is_ipv4() {
|
get_mapped_addr(stun_server, binding_addr)
|
||||||
true => {
|
.await?
|
||||||
debug!("Autodiscovered IPv4: {:?}", discovered_addr);
|
.map(|x| x.ip())
|
||||||
format!("diplonat/autodiscovery/ipv4/{}", self.node)
|
|
||||||
}
|
|
||||||
false => {
|
|
||||||
debug!("Autodiscovered IPv6: {:?}", discovered_addr);
|
|
||||||
format!("diplonat/autodiscovery/ipv6/{}", self.node)
|
|
||||||
}
|
}
|
||||||
|
None => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.consul
|
let now = timestamp();
|
||||||
.kv_put(
|
|
||||||
&consul_key,
|
if discovered_addr.is_none() {
|
||||||
serde_json::to_vec(&AutodiscoverResult {
|
if let Some(last_result) = &self.last_result {
|
||||||
timestamp: timestamp(),
|
if last_result.address.is_some()
|
||||||
address: discovered_addr,
|
&& now - last_result.timestamp <= PERSIST_SOME_RESULT_DURATION_SECS
|
||||||
})?,
|
{
|
||||||
)
|
// Keep non-None result that was obtained before by not
|
||||||
|
// writing/taking into account None result.
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let current_result = AutodiscoverResult {
|
||||||
|
timestamp: now,
|
||||||
|
address: discovered_addr,
|
||||||
|
};
|
||||||
|
|
||||||
|
let msg = format!(
|
||||||
|
"STUN autodiscovery result: {} -> {:?}",
|
||||||
|
self.consul_key, discovered_addr
|
||||||
|
);
|
||||||
|
if self.last_result.as_ref().and_then(|x| x.address) != discovered_addr {
|
||||||
|
info!("{}", msg);
|
||||||
|
} else {
|
||||||
|
debug!("{}", msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
consul
|
||||||
|
.kv_put(&self.consul_key, serde_json::to_vec(¤t_result)?)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
self.last_result = Some(current_result);
|
||||||
}
|
|
||||||
|
|
||||||
async fn autodiscover_none_ipv4(&self) -> Result<()> {
|
|
||||||
let consul_key = format!("diplonat/autodiscovery/ipv4/{}", self.node);
|
|
||||||
|
|
||||||
self.consul
|
|
||||||
.kv_put(
|
|
||||||
&consul_key,
|
|
||||||
serde_json::to_vec(&AutodiscoverResult {
|
|
||||||
timestamp: timestamp(),
|
|
||||||
address: None,
|
|
||||||
})?,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue