diff --git a/README.md b/README.md index 6b8e9df..1911569 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,6 @@ To test the Consul Catalog part, you can do: ```bash consul agent -dev #in a separate terminal, if not already running -consul services register -name=example -port=1337 +consul services register -name=example -port=1337 -tag="diplonat_pub_port=1337" consul services -id=example ``` diff --git a/src/consul_actor.rs b/src/consul_actor.rs index 0f09851..c87874d 100644 --- a/src/consul_actor.rs +++ b/src/consul_actor.rs @@ -1,15 +1,51 @@ use crate::consul::Consul; use tokio::sync::watch; +use tokio::time::delay_for; +use crate::messages; +use anyhow::Result; +use std::cmp; +use std::time::Duration; +use log::*; pub struct ConsulActor { - consul: Consul - out: + pub rx_open_ports: watch::Receiver, + + consul: Consul, + node: String, + retries: u32, + tx_open_ports: watch::Sender +} + +fn retry_to_time(retries: u32, max_time: Duration) -> Duration { + // 1.2^x seems to be a good value to exponentially increase time at a good pace + // eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5 minutes + return Duration::from_secs(cmp::max(max_time.as_secs(), 1.2f64.powf(retries as f64) as u64)) } impl ConsulActor { - fn new(url: &str) -> Self { + fn new(url: &str, node: &str) -> Self { + let (tx, rx) = watch::channel(messages::OpenPorts{ports: Vec::new() }); return Self { - consul: Consul::new(url); + consul: Consul::new(url), + rx_open_ports: rx, + tx_open_ports: tx, + node: node.to_string(), + retries: 0, }; } + + async fn listen(&mut self) -> Result<()> { + loop { + let catalog = match self.consul.watch_node(&self.node).await { + Ok(c) => c, + Err(e) => { + self.retries = cmp::min(u32::MAX - 1, self.retries) + 1; + let will_retry_in = retry_to_time(self.retries, Duration::from_secs(600)); + error!("Failed to query consul. Will retry in {}s. {}", will_retry_in.as_secs(), e); + delay_for(will_retry_in).await; + continue; + } + }; + } + } } diff --git a/src/main.rs b/src/main.rs index 844bff9..4fee661 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +mod messages; mod consul; mod consul_actor; diff --git a/src/messages.rs b/src/messages.rs index e69de29..719d2c0 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -0,0 +1,4 @@ +#[derive(Debug, Clone)] +pub struct OpenPorts { + pub ports: Vec +}