Add STUN actor that saves autodiscovered IPv4/IPv6 to Consul
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing

This commit is contained in:
Alex 2023-04-04 18:46:14 +02:00
parent e64be9e881
commit 615f926618
14 changed files with 2030 additions and 324 deletions

866
Cargo.lock generated

File diff suppressed because it is too large Load diff

1265
Cargo.nix

File diff suppressed because it is too large Load diff

View file

@ -21,3 +21,4 @@ serde = { version = "1.0.107", features = ["derive"] }
serde-lexpr = "0.1.1"
serde_json = "1.0.53"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "net", "macros"] }
stun-client = "0.1.2"

View file

@ -5,9 +5,10 @@ mod runtime;
pub use options::{ConfigOpts, ConfigOptsAcme, ConfigOptsBase, ConfigOptsConsul};
pub use runtime::{
RuntimeConfig, RuntimeConfigAcme, RuntimeConfigConsul, RuntimeConfigFirewall, RuntimeConfigIgd,
RuntimeConfig, RuntimeConfigAcme, RuntimeConfigConsul, RuntimeConfigFirewall, RuntimeConfigIgd, RuntimeConfigStun
};
pub const EXPIRATION_TIME: u16 = 300;
pub const REFRESH_TIME: u16 = 60;
pub const CONSUL_URL: &str = "http://127.0.0.1:8500";
pub const STUN_SERVER: &str = "stun.nextcloud.com:443";

View file

@ -17,6 +17,8 @@ pub struct ConfigOptsBase {
pub expiration_time: Option<u16>,
/// Refresh time for IGD and Firewall rules [default: 300]
pub refresh_time: Option<u16>,
/// STUN server [default: stun.nextcloud.com:443]
pub stun_server: Option<String>,
}
/// ACME configuration options
@ -69,7 +71,7 @@ impl ConfigOpts {
}
// Currently only used in tests
#[allow(dead_code)]
#[cfg(test)]
pub fn from_iter<Iter: Clone>(iter: Iter) -> Result<RuntimeConfig>
where
Iter: IntoIterator<Item = (String, String)>,

View file

@ -20,6 +20,10 @@ fn minimal_valid_options() -> HashMap<String, String> {
fn all_valid_options() -> HashMap<String, String> {
let mut opts = minimal_valid_options();
opts.insert("DIPLONAT_EXPIRATION_TIME".to_string(), "30".to_string());
opts.insert(
"DIPLONAT_STUN_SERVER".to_string(),
"stun.nextcloud.com:443".to_string(),
);
opts.insert(
"DIPLONAT_PRIVATE_IP".to_string(),
"172.123.43.555".to_string(),

View file

@ -1,6 +1,7 @@
use std::fs::File;
use std::io::Read;
use std::time::Duration;
use std::net::{SocketAddr, ToSocketAddrs};
use anyhow::{anyhow, bail, Result};
@ -35,26 +36,36 @@ pub struct RuntimeConfigIgd {
pub refresh_time: Duration,
}
#[derive(Debug)]
pub struct RuntimeConfigStun {
pub stun_server_v4: SocketAddr,
pub stun_server_v6: SocketAddr,
pub refresh_time: Duration,
}
#[derive(Debug)]
pub struct RuntimeConfig {
pub acme: Option<RuntimeConfigAcme>,
pub consul: RuntimeConfigConsul,
pub firewall: RuntimeConfigFirewall,
pub igd: RuntimeConfigIgd,
pub stun: RuntimeConfigStun,
}
impl RuntimeConfig {
pub fn new(opts: ConfigOpts) -> Result<Self> {
let acme = RuntimeConfigAcme::new(opts.acme.clone())?;
let consul = RuntimeConfigConsul::new(opts.consul.clone())?;
let firewall = RuntimeConfigFirewall::new(opts.base.clone())?;
let igd = RuntimeConfigIgd::new(opts.base.clone())?;
let acme = RuntimeConfigAcme::new(opts.acme)?;
let consul = RuntimeConfigConsul::new(opts.consul)?;
let firewall = RuntimeConfigFirewall::new(&opts.base)?;
let igd = RuntimeConfigIgd::new(&opts.base)?;
let stun = RuntimeConfigStun::new(&opts.base)?;
Ok(Self {
acme,
consul,
firewall,
igd,
stun,
})
}
}
@ -115,7 +126,7 @@ impl RuntimeConfigConsul {
}
impl RuntimeConfigFirewall {
pub(super) fn new(opts: ConfigOptsBase) -> Result<Self> {
pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> {
let refresh_time = Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
Ok(Self { refresh_time })
@ -123,8 +134,8 @@ impl RuntimeConfigFirewall {
}
impl RuntimeConfigIgd {
pub(super) fn new(opts: ConfigOptsBase) -> Result<Self> {
let private_ip = opts.private_ip;
pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> {
let private_ip = opts.private_ip.clone();
let expiration_time = Duration::from_secs(
opts
.expiration_time
@ -149,3 +160,26 @@ impl RuntimeConfigIgd {
})
}
}
impl RuntimeConfigStun {
pub(super) fn new(opts: &ConfigOptsBase) -> Result<Self> {
let mut stun_server_v4 = None;
let mut stun_server_v6 = None;
for addr in opts.stun_server.as_deref().unwrap_or(super::STUN_SERVER).to_socket_addrs()? {
if addr.is_ipv4() {
stun_server_v4 = Some(addr);
}
if addr.is_ipv6() {
stun_server_v6 = Some(addr);
}
}
let refresh_time = Duration::from_secs(opts.refresh_time.unwrap_or(super::REFRESH_TIME).into());
Ok(Self {
stun_server_v4: stun_server_v4.ok_or(anyhow!("Unable to resolve STUN server's IPv4 address"))?,
stun_server_v6: stun_server_v6.ok_or(anyhow!("Unable to resolve STUN server's IPv6 address"))?,
refresh_time,
})
}
}

View file

@ -7,12 +7,14 @@ use crate::config::RuntimeConfigConsul;
#[derive(Serialize, Deserialize, Debug)]
pub struct ServiceEntry {
pub Tags: Vec<String>,
#[serde(rename = "Tags")]
pub tags: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct CatalogNode {
pub Services: HashMap<String, ServiceEntry>,
#[serde(rename = "Services")]
pub services: HashMap<String, ServiceEntry>,
}
pub struct Consul {
@ -71,7 +73,14 @@ impl Consul {
None => return Err(anyhow!("X-Consul-Index header not found")),
};
let resp: CatalogNode = http.json().await?;
return Ok(resp);
let resp: Option<CatalogNode> = http.json().await?;
return Ok(resp.unwrap_or_default());
}
pub async fn kv_put(&self, key: &str, bytes: Vec<u8>) -> Result<()> {
let url = format!("{}/v1/kv/{}", self.url, key);
let http = self.client.put(&url).body(bytes).send().await?;
http.error_for_status()?;
Ok(())
}
}

View file

@ -11,13 +11,16 @@ use crate::{consul, messages};
#[derive(Serialize, Deserialize, Debug)]
pub enum DiplonatParameter {
tcp_port(HashSet<u16>),
udp_port(HashSet<u16>),
#[serde(rename = "tcp_port")]
TcpPort(HashSet<u16>),
#[serde(rename = "udp_port")]
UdpPort(HashSet<u16>),
}
#[derive(Serialize, Deserialize, Debug)]
pub enum DiplonatConsul {
diplonat(Vec<DiplonatParameter>),
#[serde(rename = "diplonat")]
Diplonat(Vec<DiplonatParameter>),
}
pub struct ConsulActor {
@ -42,8 +45,8 @@ fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> {
let mut r = Vec::new();
for (_, service_info) in &catalog.Services {
for tag in &service_info.Tags {
for (_, service_info) in &catalog.services {
for tag in &service_info.tags {
let diplo_conf: error::Result<DiplonatConsul> = from_str(tag);
match diplo_conf {
Ok(conf) => r.push(conf),
@ -62,11 +65,11 @@ fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
};
for conf in params {
let DiplonatConsul::diplonat(c) = conf;
let DiplonatConsul::Diplonat(c) = conf;
for parameter in c {
match parameter {
DiplonatParameter::tcp_port(p) => op.tcp_ports.extend(p),
DiplonatParameter::udp_port(p) => op.udp_ports.extend(p),
DiplonatParameter::TcpPort(p) => op.tcp_ports.extend(p),
DiplonatParameter::UdpPort(p) => op.udp_ports.extend(p),
};
}
}

View file

@ -3,12 +3,14 @@ use tokio::try_join;
use crate::{
config::ConfigOpts, consul_actor::ConsulActor, fw_actor::FirewallActor, igd_actor::IgdActor,
stun_actor::StunActor,
};
pub struct Diplonat {
consul: ConsulActor,
firewall: FirewallActor,
igd: IgdActor,
stun: StunActor,
}
impl Diplonat {
@ -28,22 +30,30 @@ impl Diplonat {
)
.await?;
let sa = StunActor::new(
&rt_cfg.consul,
&rt_cfg.stun,
&rt_cfg.consul.node_name,
);
let ctx = Self {
consul: ca,
igd: ia,
firewall: fw,
stun: sa,
};
return Ok(ctx);
Ok(ctx)
}
pub async fn listen(&mut self) -> Result<()> {
try_join!(
self.consul.listen(),
self.igd.listen(),
self.firewall.listen()
self.firewall.listen(),
self.stun.listen(),
)?;
return Ok(());
Ok(())
}
}

View file

@ -11,6 +11,7 @@ pub fn setup(ipt: &iptables::IPTables) -> Result<()> {
// ensure we start from a clean state without any rule already set
cleanup(ipt)?;
info!("{}: creating DIPLONAT chain using", ipt.cmd);
ipt
.new_chain("filter", "DIPLONAT")
.context("Failed to create new chain")?;
@ -23,6 +24,7 @@ pub fn setup(ipt: &iptables::IPTables) -> Result<()> {
pub fn open_ports(ipt: &iptables::IPTables, ports: messages::PublicExposedPorts) -> Result<()> {
for p in ports.tcp_ports {
info!("{}: opening TCP port {}", ipt.cmd, p);
ipt
.append(
"filter",
@ -33,6 +35,7 @@ pub fn open_ports(ipt: &iptables::IPTables, ports: messages::PublicExposedPorts)
}
for p in ports.udp_ports {
info!("{}: opening UDP port {}", ipt.cmd, p);
ipt
.append(
"filter",
@ -80,6 +83,7 @@ pub fn get_opened_ports(ipt: &iptables::IPTables) -> Result<messages::PublicExpo
pub fn cleanup(ipt: &iptables::IPTables) -> Result<()> {
if ipt.chain_exists("filter", "DIPLONAT")? {
info!("{}: removing old DIPLONAT chain", ipt.cmd);
ipt
.flush_chain("filter", "DIPLONAT")
.context("Failed to flush the DIPLONAT chain")?;

View file

@ -46,6 +46,7 @@ impl IgdActor {
gw.addr.ip()
),
};
#[allow(unused_parens)]
let public_ip = get_if_addrs::get_if_addrs()?
.into_iter()
.map(|i| i.addr.ip())

View file

@ -6,6 +6,7 @@ mod fw;
mod fw_actor;
mod igd_actor;
mod messages;
mod stun_actor;
use diplonat::Diplonat;
use log::*;
@ -15,6 +16,10 @@ async fn main() {
pretty_env_logger::init();
info!("Starting Diplonat");
let mut diplo = Diplonat::new().await.expect("Setup failed");
diplo.listen().await.expect("A runtime error occured");
Diplonat::new()
.await
.expect("Setup failed")
.listen()
.await
.expect("A runtime error occured");
}

95
src/stun_actor.rs Normal file
View file

@ -0,0 +1,95 @@
use std::net::{SocketAddr, IpAddr};
use std::time::{Duration, SystemTime};
use anyhow::{Result, bail, anyhow};
use log::*;
use serde::{Serialize, Deserialize};
use crate::config::{RuntimeConfigConsul, RuntimeConfigStun};
use crate::consul;
pub struct StunActor {
node: String,
consul: consul::Consul,
stun_server_v4: SocketAddr,
stun_server_v6: SocketAddr,
refresh_time: Duration,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct AutodiscoverResult {
pub timestamp: u64,
pub address: IpAddr,
}
impl StunActor {
pub fn new(consul_config: &RuntimeConfigConsul, stun_config: &RuntimeConfigStun, node: &str) -> Self {
assert!(stun_config.stun_server_v4.is_ipv4());
assert!(stun_config.stun_server_v6.is_ipv6());
Self {
consul: consul::Consul::new(consul_config),
node: node.to_string(),
stun_server_v4: stun_config.stun_server_v4,
stun_server_v6: stun_config.stun_server_v6,
refresh_time: stun_config.refresh_time,
}
}
pub async fn listen(&mut self) -> Result<()> {
loop {
if let Err(e) = self.autodiscover_ip(self.stun_server_v4).await {
error!("Unable to autodiscover IPv4 address: {}", e);
}
if let Err(e) = self.autodiscover_ip(self.stun_server_v6).await {
error!("Unable to autodiscover IPv6 address: {}", e);
}
tokio::time::sleep(self.refresh_time).await;
}
}
async fn autodiscover_ip(&self, stun_server: SocketAddr) -> Result<()> {
let binding_addr = match stun_server.is_ipv4() {
true => "0.0.0.0:34791".parse().unwrap(),
false => "[::]:34792".parse().unwrap(),
};
let discovered_addr = get_mapped_addr(stun_server, binding_addr).await?.ip();
let consul_key = match stun_server.is_ipv4() {
true => {
debug!("Autodiscovered IPv4: {}", discovered_addr);
format!("diplonat/autodiscovery/ipv4/{}", self.node)
}
false => {
debug!("Autodiscovered IPv6: {}", discovered_addr);
format!("diplonat/autodiscovery/ipv6/{}", self.node)
}
};
self.consul.kv_put(&consul_key, serde_json::to_vec(&AutodiscoverResult{
timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
address: discovered_addr,
})?).await?;
Ok(())
}
}
async fn get_mapped_addr(stun_server: SocketAddr, binding_addr: SocketAddr) -> Result<SocketAddr> {
use stun_client::*;
let mut client = Client::new(binding_addr, None).await.unwrap();
let res = client
.binding_request(stun_server, None)
.await
.unwrap();
if res.get_class() != Class::SuccessResponse {
bail!("STUN server did not responde with a success response");
}
let xor_mapped_addr = Attribute::get_xor_mapped_address(&res)
.ok_or(anyhow!("no XorMappedAddress found in STUN response"))?;
Ok(xor_mapped_addr)
}