Merge pull request 'Allow Diplonat to automatically detect it's private IP' (#12) from autodetect-private-ip into main

Reviewed-on: #12
This commit is contained in:
adrien 2021-12-28 11:56:12 +01:00
commit 4560622fa1
11 changed files with 397 additions and 437 deletions

721
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -10,13 +10,14 @@ edition = "2018"
anyhow = "1.0.28"
envy = "0.4"
futures = "0.3.5"
igd = { version = "0.10.0", features = ["aio"] }
get_if_addrs = "0.5"
igd = { version = "0.12.0", features = ["aio"] }
iptables = "0.2.2"
log = "0.4"
pretty_env_logger = "0.4"
regex = "1"
reqwest = { version = "0.10", features = ["json"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0.107", features = ["derive"] }
serde-lexpr = "0.1.1"
serde_json = "1.0.53"
tokio = "0.2"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "net", "macros"] }

View file

@ -62,7 +62,9 @@ impl ConfigOpts {
// Currently only used in tests
#[allow(dead_code)]
pub fn from_iter<Iter: Clone>(iter: Iter) -> Result<RuntimeConfig>
where Iter: IntoIterator<Item = (String, String)> {
where
Iter: IntoIterator<Item = (String, String)>,
{
let base: ConfigOptsBase = envy::prefixed("DIPLONAT_").from_iter(iter.clone())?;
let consul: ConfigOptsConsul = envy::prefixed("DIPLONAT_CONSUL_").from_iter(iter.clone())?;
let acme: ConfigOptsAcme = envy::prefixed("DIPLONAT_ACME_").from_iter(iter.clone())?;

View file

@ -10,10 +10,6 @@ use crate::config::*;
fn minimal_valid_options() -> HashMap<String, String> {
let mut opts = HashMap::new();
opts.insert(
"DIPLONAT_PRIVATE_IP".to_string(),
"172.123.43.555".to_string(),
);
opts.insert(
"DIPLONAT_CONSUL_NODE_NAME".to_string(),
"consul_node".to_string(),
@ -24,6 +20,10 @@ fn minimal_valid_options() -> HashMap<String, String> {
fn all_valid_options() -> HashMap<String, String> {
let mut opts = minimal_valid_options();
opts.insert("DIPLONAT_EXPIRATION_TIME".to_string(), "30".to_string());
opts.insert(
"DIPLONAT_PRIVATE_IP".to_string(),
"172.123.43.555".to_string(),
);
opts.insert("DIPLONAT_REFRESH_TIME".to_string(), "10".to_string());
opts.insert(
"DIPLONAT_CONSUL_URL".to_string(),
@ -40,7 +40,6 @@ fn all_valid_options() -> HashMap<String, String> {
#[test]
#[should_panic]
fn err_empty_env() {
std::env::remove_var("DIPLONAT_PRIVATE_IP");
std::env::remove_var("DIPLONAT_CONSUL_NODE_NAME");
ConfigOpts::from_env().unwrap();
}
@ -60,10 +59,7 @@ fn ok_from_iter_minimal_valid_options() {
rt_config.firewall.refresh_time,
Duration::from_secs(REFRESH_TIME.into())
);
assert_eq!(
&rt_config.igd.private_ip,
opts.get(&"DIPLONAT_PRIVATE_IP".to_string()).unwrap()
);
assert!(rt_config.igd.private_ip.is_none());
assert_eq!(
rt_config.igd.expiration_time,
Duration::from_secs(EXPIRATION_TIME.into())
@ -120,7 +116,7 @@ fn ok_from_iter_all_valid_options() {
);
assert_eq!(rt_config.firewall.refresh_time, refresh_time);
assert_eq!(
&rt_config.igd.private_ip,
&rt_config.igd.private_ip.unwrap(),
opts.get(&"DIPLONAT_PRIVATE_IP".to_string()).unwrap()
);
assert_eq!(rt_config.igd.expiration_time, expiration_time);

View file

@ -27,7 +27,7 @@ pub struct RuntimeConfigFirewall {
#[derive(Debug)]
pub struct RuntimeConfigIgd {
pub private_ip: String,
pub private_ip: Option<String>,
pub expiration_time: Duration,
pub refresh_time: Duration,
}
@ -59,7 +59,7 @@ impl RuntimeConfig {
impl RuntimeConfigAcme {
pub fn new(opts: ConfigOptsAcme) -> Result<Option<Self>> {
if !opts.enable {
return Ok(None)
return Ok(None);
}
let email = opts.email.expect(
@ -91,9 +91,7 @@ impl RuntimeConfigFirewall {
impl RuntimeConfigIgd {
pub(super) fn new(opts: ConfigOptsBase) -> Result<Self> {
let private_ip = opts
.private_ip
.expect("'DIPLONAT_PRIVATE_IP' environment variable is required");
let private_ip = opts.private_ip;
let expiration_time = Duration::from_secs(
opts
.expiration_time
@ -108,7 +106,7 @@ impl RuntimeConfigIgd {
(currently: {}s)",
expiration_time.as_secs(),
refresh_time.as_secs()
))
));
}
Ok(Self {

View file

@ -25,10 +25,12 @@ impl Consul {
client: reqwest::Client::new(),
url: url.to_string(),
idx: None,
}
};
}
pub fn watch_node_reset(&mut self) -> () { self.idx = None; }
pub fn watch_node_reset(&mut self) -> () {
self.idx = None;
}
pub async fn watch_node(&mut self, host: &str) -> Result<CatalogNode> {
let url = match self.idx {
@ -43,6 +45,6 @@ impl Consul {
};
let resp: CatalogNode = http.json().await?;
return Ok(resp)
return Ok(resp);
}
}

View file

@ -4,7 +4,7 @@ use anyhow::Result;
use log::*;
use serde::{Deserialize, Serialize};
use serde_lexpr::{error, from_str};
use tokio::{sync::watch, time::delay_for};
use tokio::{sync::watch, time::sleep};
use crate::{consul, messages};
@ -35,7 +35,7 @@ fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
return Duration::from_secs(cmp::min(
max_time.as_secs(),
1.2f64.powf(retries as f64) as u64,
))
));
}
fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> {
@ -51,7 +51,7 @@ fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> {
}
}
return r
return r;
}
fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
@ -70,7 +70,7 @@ fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
}
}
return op
return op;
}
impl ConsulActor {
@ -86,7 +86,7 @@ impl ConsulActor {
tx_open_ports: tx,
node: node.to_string(),
retries: 0,
}
};
}
pub async fn listen(&mut self) -> Result<()> {
@ -102,15 +102,15 @@ impl ConsulActor {
will_retry_in.as_secs(),
e
);
delay_for(will_retry_in).await;
continue
sleep(will_retry_in).await;
continue;
}
};
self.retries = 0;
let msg = to_open_ports(&to_parameters(&catalog));
debug!("Extracted configuration: {:#?}", msg);
self.tx_open_ports.broadcast(msg)?;
self.tx_open_ports.send(msg)?;
}
}
}

View file

@ -21,7 +21,7 @@ impl Diplonat {
let fw = FirewallActor::new(rt_cfg.firewall.refresh_time, &ca.rx_open_ports).await?;
let ia = IgdActor::new(
&rt_cfg.igd.private_ip,
rt_cfg.igd.private_ip.as_ref().map(String::as_str),
rt_cfg.igd.refresh_time,
rt_cfg.igd.expiration_time,
&ca.rx_open_ports,
@ -34,7 +34,7 @@ impl Diplonat {
firewall: fw,
};
return Ok(ctx)
return Ok(ctx);
}
pub async fn listen(&mut self) -> Result<()> {
@ -44,6 +44,6 @@ impl Diplonat {
self.firewall.listen()
)?;
return Ok(())
return Ok(());
}
}

View file

@ -32,7 +32,7 @@ impl FirewallActor {
fw::setup(&ctx.ipt)?;
return Ok(ctx)
return Ok(ctx);
}
pub async fn listen(&mut self) -> Result<()> {
@ -40,7 +40,7 @@ impl FirewallActor {
loop {
// 1. Wait for an event
let new_ports = select! {
Some(ports) = self.rx_ports.recv() => Some(ports),
_ = self.rx_ports.changed() => Some(self.rx_ports.borrow().clone()),
_ = interval.tick() => None,
else => return Ok(()) // Sender dropped, terminate loop.
};
@ -81,6 +81,6 @@ impl FirewallActor {
fw::open_ports(&self.ipt, ports_to_open)?;
return Ok(())
return Ok(());
}
}

View file

@ -22,7 +22,7 @@ pub struct IgdActor {
impl IgdActor {
pub async fn new(
priv_ip: &str,
priv_ip: Option<&str>,
refresh: Duration,
expire: Duration,
rxp: &watch::Receiver<messages::PublicExposedPorts>,
@ -32,16 +32,44 @@ impl IgdActor {
.context("Failed to find IGD gateway")?;
info!("IGD gateway: {}", gw);
let private_ip = if let Some(ip) = priv_ip {
info!("Using private IP from config: {}", ip);
ip.to_string()
} else {
info!("Trying to automatically detect private IP");
let gwa = gw.addr.ip().octets();
let cmplen = match gwa {
[192, 168, _, _] => 3,
[10, _, _, _] => 2,
_ => panic!(
"Gateway IP does not appear to be in a local network ({})",
gw.addr.ip()
),
};
let public_ip = get_if_addrs::get_if_addrs()?
.into_iter()
.map(|i| i.addr.ip())
.filter(|a| match a {
std::net::IpAddr::V4(a4) => (a4.octets()[..cmplen] == gwa[..cmplen]),
_ => false,
})
.next()
.expect("No interface has an IP on same subnet as gateway")
.to_string();
info!("Found private IP: {}", public_ip);
public_ip
};
let ctx = Self {
gateway: gw,
rx_ports: rxp.clone(),
private_ip: priv_ip.to_string(),
private_ip,
refresh: refresh,
expire: expire,
last_ports: messages::PublicExposedPorts::new(),
};
return Ok(ctx)
return Ok(ctx);
}
pub async fn listen(&mut self) -> Result<()> {
@ -49,7 +77,7 @@ impl IgdActor {
loop {
// 1. Wait for an event
let new_ports = select! {
Some(ports) = self.rx_ports.recv() => Some(ports),
_ = self.rx_ports.changed() => Some(self.rx_ports.borrow().clone()),
_ = interval.tick() => None,
else => return Ok(()) // Sender dropped, terminate loop.
};
@ -93,6 +121,6 @@ impl IgdActor {
}
}
return Ok(())
return Ok(());
}
}

View file

@ -11,6 +11,6 @@ impl PublicExposedPorts {
return Self {
tcp_ports: HashSet::new(),
udp_ports: HashSet::new(),
}
};
}
}