From 39611ec0d477e8a5fa958cece99b84f70ccda71d Mon Sep 17 00:00:00 2001 From: adrien Date: Thu, 16 Sep 2021 15:03:33 +0200 Subject: [PATCH] AcmeActor extracts primary/secondary URLs; Consul KV client for strings --- src/acme_actor.rs | 45 ++++++++++++++++++++++++--------------------- src/consul_actor.rs | 14 +++----------- src/consul_kv.rs | 41 +++++++++++++++++++++++++++++++++++++++++ src/fw_actor.rs | 2 +- src/main.rs | 1 + src/messages.rs | 4 ++-- 6 files changed, 72 insertions(+), 35 deletions(-) create mode 100644 src/consul_kv.rs diff --git a/src/acme_actor.rs b/src/acme_actor.rs index 0d7aec4..cd41b1f 100644 --- a/src/acme_actor.rs +++ b/src/acme_actor.rs @@ -11,7 +11,7 @@ use crate::messages; pub struct AcmeActor { email: String, - last_ports: messages::PublicExposedPorts, + //last_ports: messages::PublicExposedPorts, refresh: Duration, rx_ports: watch::Receiver, @@ -29,7 +29,7 @@ impl AcmeActor { let ctx = Self { email: config.email, - last_ports: messages::PublicExposedPorts::new(), + //last_ports: messages::PublicExposedPorts::new(), refresh: config.refresh_time, rx_ports: rxp.clone(), }; @@ -40,29 +40,32 @@ impl AcmeActor { pub async fn listen(&mut self) -> Result<()> { let mut interval = time::interval(self.refresh); loop { - // 1. Wait for an event - let new_ports = select! { - Some(ports) = self.rx_ports.recv() => Some(ports), - _ = interval.tick() => None, - else => return Ok(()) // Sender dropped, terminate loop. - }; - - // 2. Update last ports if needed - if let Some(p) = new_ports { - self.last_ports = p; - } - - // 3. Flush IGD requests - match self.do_acme().await { - Ok(()) => debug!("Successfully updated ACME"), - Err(e) => error!("An error occured while updating ACME. {}", e), + select! { + Some(ports) = self.rx_ports.recv() => { + match self.do_acme(ports).await { + Ok(()) => debug!("Successfully updated ACME"), + Err(e) => error!("An error occured while updating ACME. {}", e), + } + }, + _ = interval.tick() => continue, + else => break // Sender dropped, terminate loop. } } + + Ok(()) } - pub async fn do_acme(&self) -> Result<()> { - debug!("Doing ACME!!!"); - debug!("{:#?}", self.last_ports); + pub async fn do_acme(&self, ports: messages::PublicExposedPorts) -> Result<()> { + if ports.acme.is_empty() { + return Ok(()); + } + + let primary_url = &ports.acme[0]; + let secondary_urls = &ports.acme[1..]; + + println!("Doing ACME!!!"); + println!("Primary URL: {:?}", primary_url); + println!("Secondary URLs: {:?}", secondary_urls); Ok(()) } diff --git a/src/consul_actor.rs b/src/consul_actor.rs index 136b248..1b80730 100644 --- a/src/consul_actor.rs +++ b/src/consul_actor.rs @@ -18,7 +18,7 @@ use crate::messages; pub enum DiplonatParameter { TcpPort(HashSet), UdpPort(HashSet), - Acme(HashSet), + Acme(Vec), } #[derive(Serialize, Deserialize, Debug)] @@ -62,10 +62,6 @@ fn to_parameters(catalog: &consul::CatalogNode) -> Vec { } fn to_open_ports(params: &Vec) -> messages::PublicExposedPorts { - // let mut op = messages::PublicExposedPorts { - // tcp_ports: HashSet::new(), - // udp_ports: HashSet::new() - // }; let mut op = messages::PublicExposedPorts::new(); for conf in params { @@ -74,7 +70,7 @@ fn to_open_ports(params: &Vec) -> messages::PublicExposedPorts { match parameter { DiplonatParameter::TcpPort(p) => op.tcp_ports.extend(p), DiplonatParameter::UdpPort(p) => op.udp_ports.extend(p), - DiplonatParameter::Acme(urls) => op.acme.extend(urls.clone()), + DiplonatParameter::Acme(urls) => op.acme.extend_from_slice(urls.as_slice()), }; } } @@ -85,10 +81,6 @@ fn to_open_ports(params: &Vec) -> messages::PublicExposedPorts { impl ConsulActor { pub fn new(config: RuntimeConfigConsul) -> Self { let (tx, rx) = watch::channel(messages::PublicExposedPorts::new()); - // let (tx, rx) = watch::channel(messages::PublicExposedPorts{ - // tcp_ports: HashSet::new(), - // udp_ports: HashSet::new() - // }); return Self { consul: consul::Consul::new(&config.url), @@ -118,7 +110,7 @@ impl ConsulActor { }; self.retries = 0; let msg = to_open_ports(&to_parameters(&catalog)); - debug!("Extracted configuration: {:#?}", msg); + debug!("Extracted configuration:\n{:#?}", msg); self.tx_open_ports.broadcast(msg)?; } diff --git a/src/consul_kv.rs b/src/consul_kv.rs new file mode 100644 index 0000000..5b3d0ef --- /dev/null +++ b/src/consul_kv.rs @@ -0,0 +1,41 @@ +use anyhow::{anyhow, Result}; + +pub struct ConsulKV { + client: reqwest::Client, + url: String, +} + +impl ConsulKV { + pub fn new(url: &str) -> Self { + Self { + client: reqwest::Client::new(), + url: url.to_string(), + } + } + + pub async fn get_string(&self, key: &str) -> Result { + let url = format!("{}/v1/kv/{}?raw", self.url, key); + + let resp = self.client.get(&url).send().await?; + + if resp.status() != reqwest::StatusCode::OK { + return Err(anyhow!("{} returned {}", url, resp.status())); + } + + match resp.text().await { + Ok(s) => Ok(s), + Err(e) => Err(anyhow!("{}", e)), + } + } + + pub async fn put_string(&self, key: &str, value: String) -> Result<()> { + let url = format!("{}/v1/kv/{}", self.url, key); + + let resp = self.client.put(&url).body(value).send().await?; + + match resp.status() { + reqwest::StatusCode::OK => Ok(()), + s => Err(anyhow!("{} returned {}", url, s)), + } + } +} diff --git a/src/fw_actor.rs b/src/fw_actor.rs index fa905c0..1649bc4 100644 --- a/src/fw_actor.rs +++ b/src/fw_actor.rs @@ -86,7 +86,7 @@ impl FirewallActor { let ports_to_open = messages::PublicExposedPorts { tcp_ports: diff_tcp, udp_ports: diff_udp, - acme: HashSet::new(), + acme: Vec::new(), }; fw::open_ports(&self.ipt, ports_to_open)?; diff --git a/src/main.rs b/src/main.rs index abcc628..1bb8d14 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod acme_actor; mod config; mod consul; mod consul_actor; +mod consul_kv; mod diplonat; mod fw; mod fw_actor; diff --git a/src/messages.rs b/src/messages.rs index ea1629a..6c2ab38 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -4,7 +4,7 @@ use std::collections::HashSet; pub struct PublicExposedPorts { pub tcp_ports: HashSet, pub udp_ports: HashSet, - pub acme: HashSet, + pub acme: Vec, } impl PublicExposedPorts { @@ -12,7 +12,7 @@ impl PublicExposedPorts { return Self { tcp_ports: HashSet::new(), udp_ports: HashSet::new(), - acme: HashSet::new(), + acme: Vec::new(), }; } }