AcmeActor extracts primary/secondary URLs; Consul KV client for strings

This commit is contained in:
adrien 2021-09-16 15:03:33 +02:00
parent ea4f4f0b06
commit 39611ec0d4
6 changed files with 72 additions and 35 deletions

View file

@ -11,7 +11,7 @@ use crate::messages;
pub struct AcmeActor {
email: String,
last_ports: messages::PublicExposedPorts,
//last_ports: messages::PublicExposedPorts,
refresh: Duration,
rx_ports: watch::Receiver<messages::PublicExposedPorts>,
@ -29,7 +29,7 @@ impl AcmeActor {
let ctx = Self {
email: config.email,
last_ports: messages::PublicExposedPorts::new(),
//last_ports: messages::PublicExposedPorts::new(),
refresh: config.refresh_time,
rx_ports: rxp.clone(),
};
@ -40,29 +40,32 @@ impl AcmeActor {
pub async fn listen(&mut self) -> Result<()> {
let mut interval = time::interval(self.refresh);
loop {
// 1. Wait for an event
let new_ports = select! {
Some(ports) = self.rx_ports.recv() => Some(ports),
_ = interval.tick() => None,
else => return Ok(()) // Sender dropped, terminate loop.
};
// 2. Update last ports if needed
if let Some(p) = new_ports {
self.last_ports = p;
}
// 3. Flush IGD requests
match self.do_acme().await {
Ok(()) => debug!("Successfully updated ACME"),
Err(e) => error!("An error occured while updating ACME. {}", e),
select! {
Some(ports) = self.rx_ports.recv() => {
match self.do_acme(ports).await {
Ok(()) => debug!("Successfully updated ACME"),
Err(e) => error!("An error occured while updating ACME. {}", e),
}
},
_ = interval.tick() => continue,
else => break // Sender dropped, terminate loop.
}
}
Ok(())
}
pub async fn do_acme(&self) -> Result<()> {
debug!("Doing ACME!!!");
debug!("{:#?}", self.last_ports);
pub async fn do_acme(&self, ports: messages::PublicExposedPorts) -> Result<()> {
if ports.acme.is_empty() {
return Ok(());
}
let primary_url = &ports.acme[0];
let secondary_urls = &ports.acme[1..];
println!("Doing ACME!!!");
println!("Primary URL: {:?}", primary_url);
println!("Secondary URLs: {:?}", secondary_urls);
Ok(())
}

View file

@ -18,7 +18,7 @@ use crate::messages;
pub enum DiplonatParameter {
TcpPort(HashSet<u16>),
UdpPort(HashSet<u16>),
Acme(HashSet<String>),
Acme(Vec<String>),
}
#[derive(Serialize, Deserialize, Debug)]
@ -62,10 +62,6 @@ fn to_parameters(catalog: &consul::CatalogNode) -> Vec<DiplonatConsul> {
}
fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
// let mut op = messages::PublicExposedPorts {
// tcp_ports: HashSet::new(),
// udp_ports: HashSet::new()
// };
let mut op = messages::PublicExposedPorts::new();
for conf in params {
@ -74,7 +70,7 @@ fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
match parameter {
DiplonatParameter::TcpPort(p) => op.tcp_ports.extend(p),
DiplonatParameter::UdpPort(p) => op.udp_ports.extend(p),
DiplonatParameter::Acme(urls) => op.acme.extend(urls.clone()),
DiplonatParameter::Acme(urls) => op.acme.extend_from_slice(urls.as_slice()),
};
}
}
@ -85,10 +81,6 @@ fn to_open_ports(params: &Vec<DiplonatConsul>) -> messages::PublicExposedPorts {
impl ConsulActor {
pub fn new(config: RuntimeConfigConsul) -> Self {
let (tx, rx) = watch::channel(messages::PublicExposedPorts::new());
// let (tx, rx) = watch::channel(messages::PublicExposedPorts{
// tcp_ports: HashSet::new(),
// udp_ports: HashSet::new()
// });
return Self {
consul: consul::Consul::new(&config.url),
@ -118,7 +110,7 @@ impl ConsulActor {
};
self.retries = 0;
let msg = to_open_ports(&to_parameters(&catalog));
debug!("Extracted configuration: {:#?}", msg);
debug!("Extracted configuration:\n{:#?}", msg);
self.tx_open_ports.broadcast(msg)?;
}

41
src/consul_kv.rs Normal file
View file

@ -0,0 +1,41 @@
use anyhow::{anyhow, Result};
pub struct ConsulKV {
client: reqwest::Client,
url: String,
}
impl ConsulKV {
pub fn new(url: &str) -> Self {
Self {
client: reqwest::Client::new(),
url: url.to_string(),
}
}
pub async fn get_string(&self, key: &str) -> Result<String> {
let url = format!("{}/v1/kv/{}?raw", self.url, key);
let resp = self.client.get(&url).send().await?;
if resp.status() != reqwest::StatusCode::OK {
return Err(anyhow!("{} returned {}", url, resp.status()));
}
match resp.text().await {
Ok(s) => Ok(s),
Err(e) => Err(anyhow!("{}", e)),
}
}
pub async fn put_string(&self, key: &str, value: String) -> Result<()> {
let url = format!("{}/v1/kv/{}", self.url, key);
let resp = self.client.put(&url).body(value).send().await?;
match resp.status() {
reqwest::StatusCode::OK => Ok(()),
s => Err(anyhow!("{} returned {}", url, s)),
}
}
}

View file

@ -86,7 +86,7 @@ impl FirewallActor {
let ports_to_open = messages::PublicExposedPorts {
tcp_ports: diff_tcp,
udp_ports: diff_udp,
acme: HashSet::new(),
acme: Vec::new(),
};
fw::open_ports(&self.ipt, ports_to_open)?;

View file

@ -2,6 +2,7 @@ mod acme_actor;
mod config;
mod consul;
mod consul_actor;
mod consul_kv;
mod diplonat;
mod fw;
mod fw_actor;

View file

@ -4,7 +4,7 @@ use std::collections::HashSet;
pub struct PublicExposedPorts {
pub tcp_ports: HashSet<u16>,
pub udp_ports: HashSet<u16>,
pub acme: HashSet<String>,
pub acme: Vec<String>,
}
impl PublicExposedPorts {
@ -12,7 +12,7 @@ impl PublicExposedPorts {
return Self {
tcp_ports: HashSet::new(),
udp_ports: HashSet::new(),
acme: HashSet::new(),
acme: Vec::new(),
};
}
}