From 8c43611eb5bbaeb42f19da8d8ed521df208bfada Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 21 May 2020 22:25:33 +0200 Subject: [PATCH] Broken Diplonat --- README.md | 24 ++++++++++++++++++ src/adapter.rs | 7 ------ src/config.rs | 48 ------------------------------------ src/diplonat.rs | 32 +++++++++++++++++------- src/environment_adapter.rs | 50 ++++++++++++++++++++++++++++++++++++++ src/gw.rs | 12 --------- src/igd_adapter.rs | 31 ++++++++++++++++++----- src/main.rs | 6 ++--- src/node_state.rs | 25 +++++++++++++++++++ 9 files changed, 150 insertions(+), 85 deletions(-) delete mode 100644 src/adapter.rs delete mode 100644 src/config.rs create mode 100644 src/environment_adapter.rs delete mode 100644 src/gw.rs create mode 100644 src/node_state.rs diff --git a/README.md b/README.md index af84a5c..257278d 100644 --- a/README.md +++ b/README.md @@ -21,3 +21,27 @@ export DIPLONAT_CONSUL_NODE_NAME="lheureduthe" export RUST_LOG=debug cargo run ``` + +## About Consul Catalog + + * We query the `/v1/catalog/node/` endpoint + * We can watch it thanks to [Blocking Queries](https://www.consul.io/api/features/blocking.html) + +eg: + +```bash +curl -vvv http://127.0.0.1:8500/v1/catalog/node/lheureduthe +# returns X-Consul-Index: 15 +curl -vvv http://127.0.0.1:8500/v1/catalog/node/lheureduthe?index=15 +``` + +Each time you do the request, the whole list of services bound to the node is returned. + + +To test the Consul Catalog part, you can do: + +```bash +consul agent -dev #in a separate terminal, if not already running +consul services register -name=example -port=1337 +consul services -id=example +``` diff --git a/src/adapter.rs b/src/adapter.rs deleted file mode 100644 index 567e1cd..0000000 --- a/src/adapter.rs +++ /dev/null @@ -1,7 +0,0 @@ -use anyhow::Result; -use crate::*; - -pub trait Adapter { - fn new(&self, parent: &diplonat::Diplonat) -> Result<()>; - fn run(&self) -> Result<()>; -} diff --git a/src/config.rs b/src/config.rs deleted file mode 100644 index c335e8d..0000000 --- a/src/config.rs +++ /dev/null @@ -1,48 +0,0 @@ -use std::env; -use anyhow::{Result, Context, anyhow}; -use log::*; - -pub struct DiplonatConfig { - pub private_ip: String, - pub consul_node_name: String, - pub consul_url: String, - pub refresh_time: u32, - pub expiration_time: u32 -} - -pub fn load_env() -> Result { - let epi = "DIPLONAT_PRIVATE_IP"; - let ert = "DIPLONAT_REFRESH_TIME"; - let eet = "DIPLONAT_EXPIRATION_TIME"; - let ecnd = "DIPLONAT_CONSUL_NODE_NAME"; - let ecu = "DIPLONAT_CONSUL_URL"; - - let config = DiplonatConfig { - consul_url: match env::var(ecu) { Ok(e) => e, Err(_) => "http://127.0.0.1:8500".to_string() }, - private_ip: env::var(epi) - .with_context(|| format!("{} env var must be defined, eg: 192.168.0.18", epi))?, - refresh_time: env::var(ert) - .with_context(|| format!("{} env var must be defined, eg: 60", ert))? - .parse() - .with_context(|| format!("{} env var must be an integer, eg: 60", ert))?, - expiration_time: env::var(eet) - .with_context(|| format!("{} env var must be defined, eg: 300", eet))? - .parse() - .with_context(|| format!("{} env var must be an integer, eg: 300", eet))?, - consul_node_name: env::var(ecnd) - .with_context(|| format!("{} env var must be defined", ecnd))? - }; - - if config.refresh_time * 2 > config.expiration_time { - return Err(anyhow!("Expiration time (currently: {}s) must be twice bigger than refresh time (currently: {}s)", config.expiration_time, config.refresh_time)) - } - - info!("Consul URL: {}", config.consul_url); - info!("Consul node name: {}", config.consul_node_name); - info!("Private IP address: {}", config.private_ip); - info!("Refresh time: {} seconds", config.refresh_time); - info!("Expiration time: {} seconds", config.expiration_time); - return Ok(config); -} - - diff --git a/src/diplonat.rs b/src/diplonat.rs index a53a19e..01bd994 100644 --- a/src/diplonat.rs +++ b/src/diplonat.rs @@ -1,27 +1,41 @@ use anyhow::{Result, Context}; use tokio::sync::broadcast; use futures::future::try_join_all; -use crate::*; +use log::*; +use std::cell::Cell; + +use crate::environment_adapter::*; +use crate::igd_adapter::*; +use crate::node_state::*; pub struct Diplonat<'a> { - pub config: config::DiplonatConfig, - pub gateway: igd::aio::Gateway, pub notif: broadcast::Sender<()>, - pub public_ports: &'a[u16], - adapters: &'a[&'a dyn adapter::Adapter] + pub state: Cell, + + env: EnvironmentAdapter, + igd: IgdAdapter<'a>, } impl<'a> Diplonat<'a> { pub async fn new() -> Result> { let (tx, _) = broadcast::channel(1); + let ns = Cell::new(NodeState::new()); + + // we deliberately choose to init one after another let ctx = Diplonat { - config: config::load_env().context("Unable to read configuration from environment")?, - gateway: gw::get_gateway().await?, notif: tx, - public_ports: &[110, 111, 112], - adapters: &[] + state: ns, + + env: EnvironmentAdapter::new(&ns, &tx).await?, + igd: IgdAdapter::new(&ns, &tx).await? }; + info!("Consul URL: {:#?}", ns.consul_url); + info!("Consul node name: {:#?}", ns.consul_node_name); + info!("Private IP address: {:#?}", ns.private_ip); + info!("Refresh time: {:#?} seconds", ns.refresh_time); + info!("Expiration time: {:#?} seconds", ns.expiration_time); + return Ok(ctx); } diff --git a/src/environment_adapter.rs b/src/environment_adapter.rs new file mode 100644 index 0000000..e4fad70 --- /dev/null +++ b/src/environment_adapter.rs @@ -0,0 +1,50 @@ +use std::env; +use tokio::sync::broadcast; +use anyhow::{Result, Context, anyhow}; +use log::*; +use crate::diplonat::*; +use crate::node_state::*; +use std::cell::Cell; + +const epi: &'static str = "DIPLONAT_PRIVATE_IP"; +const ert: &'static str = "DIPLONAT_REFRESH_TIME"; +const eet: &'static str = "DIPLONAT_EXPIRATION_TIME"; +const ecnd: &'static str = "DIPLONAT_CONSUL_NODE_NAME"; +const ecu: &'static str = "DIPLONAT_CONSUL_URL"; + +pub struct EnvironmentAdapter {} + +impl EnvironmentAdapter { + pub async fn new(ns: &Cell, _: &broadcast::Sender<()>) -> Result { + ns.consul_node_name = Some(match env::var(ecu) { + Ok(e) => e, + Err(_) => "http://127.0.0.1:8500".to_string() + }); + + ns.private_ip = Some(env::var(epi) + .with_context(|| format!("{} env var must be defined, eg: 192.168.0.18", epi))?); + + ns.refresh_time = Some(env::var(ert) + .with_context(|| format!("{} env var must be defined, eg: 60", ert))? + .parse() + .with_context(|| format!("{} env var must be an integer, eg: 60", ert))?); + + ns.expiration_time = Some(env::var(eet) + .with_context(|| format!("{} env var must be defined, eg: 300", eet))? + .parse() + .with_context(|| format!("{} env var must be an integer, eg: 300", eet))?); + + ns.consul_node_name = Some(env::var(ecnd) + .with_context(|| format!("{} env var must be defined", ecnd))?); + + match (ns.refresh_time, ns.expiration_time) { + (Some(rt), Some(et)) if rt * 2 <= et => debug!("Checked refresh time is lower than expiration time"), + (Some(rt), Some(et)) => return Err(anyhow!("Expiration time (currently: {}s) must be twice bigger than refresh time (currently: {}s)", rt, et)), + _ => return Err(anyhow!("Please define refresh time and expiration time")) + } + + return Ok(Self{}); + } +} + + diff --git a/src/gw.rs b/src/gw.rs deleted file mode 100644 index 4489cf9..0000000 --- a/src/gw.rs +++ /dev/null @@ -1,12 +0,0 @@ -use igd::aio::search_gateway; -use anyhow::{Result, Context}; -use log::*; - -pub async fn get_gateway() -> Result { - let gw = search_gateway(Default::default()) - .await - .context("Failed to find gateway")?; - - info!("Gateway: {}", gw); - return Ok(gw); -} diff --git a/src/igd_adapter.rs b/src/igd_adapter.rs index 3803d5f..6624ab3 100644 --- a/src/igd_adapter.rs +++ b/src/igd_adapter.rs @@ -1,11 +1,30 @@ -use crate::*; -use anyhow::Result; +use igd::aio::*; +use log::*; +use tokio::sync::broadcast; +use anyhow::{Result, Context}; +use std::cell::Cell; -pub struct IgdAdapter {} -impl adapter::Adapter for IgdAdapter { - fn new(&self, parent: &diplonat::Diplonat) -> Result<()> { - return Ok(()); +use crate::diplonat::*; +use crate::node_state::*; + +pub struct IgdAdapter<'a> { + state: &'a Cell, + gateway: Gateway, +} +impl<'a> IgdAdapter<'a> { + pub async fn new(ns: &'a Cell, send: &broadcast::Sender<()>) -> Result> { + let gw = search_gateway(Default::default()) + .await + .context("Failed to find gateway")?; + info!("Gateway: {}", gw); + + let ctx = Self { + state: ns, + gateway: gw + }; + return Ok(ctx); } + fn run(&self) -> Result<()> { return Ok(()); } diff --git a/src/main.rs b/src/main.rs index c4d8c0f..d5656aa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,13 @@ mod diplonat; -mod config; -mod gw; -mod adapter; +mod node_state; +mod environment_adapter; mod igd_adapter; //use std::net::SocketAddrV4; //use std::collections::HashMap; //use igd::PortMappingProtocol; use log::*; +use node_state::*; use diplonat::*; #[tokio::main] diff --git a/src/node_state.rs b/src/node_state.rs new file mode 100644 index 0000000..ecf7484 --- /dev/null +++ b/src/node_state.rs @@ -0,0 +1,25 @@ +pub struct NodeState { + pub consul_node_name: Option, + pub consul_url: Option, + + pub refresh_time: Option, + pub expiration_time: Option, + + pub public_ip: Option, + pub private_ip: Option, + pub public_ports: Vec, +} + +impl NodeState { + pub fn new() -> Self { + return Self { + consul_node_name: None, + consul_url: None, + refresh_time: None, + expiration_time: None, + public_ip: None, + private_ip: None, + public_ports: Vec::new() + }; + } +}