Broken Diplonat
This commit is contained in:
parent
4da459ce8b
commit
8c43611eb5
9 changed files with 150 additions and 85 deletions
24
README.md
24
README.md
|
@ -21,3 +21,27 @@ export DIPLONAT_CONSUL_NODE_NAME="lheureduthe"
|
||||||
export RUST_LOG=debug
|
export RUST_LOG=debug
|
||||||
cargo run
|
cargo run
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## About Consul Catalog
|
||||||
|
|
||||||
|
* We query the `/v1/catalog/node/<node>` endpoint
|
||||||
|
* We can watch it thanks to [Blocking Queries](https://www.consul.io/api/features/blocking.html)
|
||||||
|
|
||||||
|
eg:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -vvv http://127.0.0.1:8500/v1/catalog/node/lheureduthe
|
||||||
|
# returns X-Consul-Index: 15
|
||||||
|
curl -vvv http://127.0.0.1:8500/v1/catalog/node/lheureduthe?index=15
|
||||||
|
```
|
||||||
|
|
||||||
|
Each time you do the request, the whole list of services bound to the node is returned.
|
||||||
|
|
||||||
|
|
||||||
|
To test the Consul Catalog part, you can do:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
consul agent -dev #in a separate terminal, if not already running
|
||||||
|
consul services register -name=example -port=1337
|
||||||
|
consul services -id=example
|
||||||
|
```
|
||||||
|
|
|
@ -1,7 +0,0 @@
|
||||||
use anyhow::Result;
|
|
||||||
use crate::*;
|
|
||||||
|
|
||||||
pub trait Adapter {
|
|
||||||
fn new(&self, parent: &diplonat::Diplonat) -> Result<()>;
|
|
||||||
fn run(&self) -> Result<()>;
|
|
||||||
}
|
|
|
@ -1,48 +0,0 @@
|
||||||
use std::env;
|
|
||||||
use anyhow::{Result, Context, anyhow};
|
|
||||||
use log::*;
|
|
||||||
|
|
||||||
pub struct DiplonatConfig {
|
|
||||||
pub private_ip: String,
|
|
||||||
pub consul_node_name: String,
|
|
||||||
pub consul_url: String,
|
|
||||||
pub refresh_time: u32,
|
|
||||||
pub expiration_time: u32
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn load_env() -> Result<DiplonatConfig> {
|
|
||||||
let epi = "DIPLONAT_PRIVATE_IP";
|
|
||||||
let ert = "DIPLONAT_REFRESH_TIME";
|
|
||||||
let eet = "DIPLONAT_EXPIRATION_TIME";
|
|
||||||
let ecnd = "DIPLONAT_CONSUL_NODE_NAME";
|
|
||||||
let ecu = "DIPLONAT_CONSUL_URL";
|
|
||||||
|
|
||||||
let config = DiplonatConfig {
|
|
||||||
consul_url: match env::var(ecu) { Ok(e) => e, Err(_) => "http://127.0.0.1:8500".to_string() },
|
|
||||||
private_ip: env::var(epi)
|
|
||||||
.with_context(|| format!("{} env var must be defined, eg: 192.168.0.18", epi))?,
|
|
||||||
refresh_time: env::var(ert)
|
|
||||||
.with_context(|| format!("{} env var must be defined, eg: 60", ert))?
|
|
||||||
.parse()
|
|
||||||
.with_context(|| format!("{} env var must be an integer, eg: 60", ert))?,
|
|
||||||
expiration_time: env::var(eet)
|
|
||||||
.with_context(|| format!("{} env var must be defined, eg: 300", eet))?
|
|
||||||
.parse()
|
|
||||||
.with_context(|| format!("{} env var must be an integer, eg: 300", eet))?,
|
|
||||||
consul_node_name: env::var(ecnd)
|
|
||||||
.with_context(|| format!("{} env var must be defined", ecnd))?
|
|
||||||
};
|
|
||||||
|
|
||||||
if config.refresh_time * 2 > config.expiration_time {
|
|
||||||
return Err(anyhow!("Expiration time (currently: {}s) must be twice bigger than refresh time (currently: {}s)", config.expiration_time, config.refresh_time))
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Consul URL: {}", config.consul_url);
|
|
||||||
info!("Consul node name: {}", config.consul_node_name);
|
|
||||||
info!("Private IP address: {}", config.private_ip);
|
|
||||||
info!("Refresh time: {} seconds", config.refresh_time);
|
|
||||||
info!("Expiration time: {} seconds", config.expiration_time);
|
|
||||||
return Ok(config);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
|
@ -1,27 +1,41 @@
|
||||||
use anyhow::{Result, Context};
|
use anyhow::{Result, Context};
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use futures::future::try_join_all;
|
use futures::future::try_join_all;
|
||||||
use crate::*;
|
use log::*;
|
||||||
|
use std::cell::Cell;
|
||||||
|
|
||||||
|
use crate::environment_adapter::*;
|
||||||
|
use crate::igd_adapter::*;
|
||||||
|
use crate::node_state::*;
|
||||||
|
|
||||||
pub struct Diplonat<'a> {
|
pub struct Diplonat<'a> {
|
||||||
pub config: config::DiplonatConfig,
|
|
||||||
pub gateway: igd::aio::Gateway,
|
|
||||||
pub notif: broadcast::Sender<()>,
|
pub notif: broadcast::Sender<()>,
|
||||||
pub public_ports: &'a[u16],
|
pub state: Cell<NodeState>,
|
||||||
adapters: &'a[&'a dyn adapter::Adapter]
|
|
||||||
|
env: EnvironmentAdapter,
|
||||||
|
igd: IgdAdapter<'a>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Diplonat<'a> {
|
impl<'a> Diplonat<'a> {
|
||||||
pub async fn new() -> Result<Diplonat<'a>> {
|
pub async fn new() -> Result<Diplonat<'a>> {
|
||||||
let (tx, _) = broadcast::channel(1);
|
let (tx, _) = broadcast::channel(1);
|
||||||
|
let ns = Cell::new(NodeState::new());
|
||||||
|
|
||||||
|
// we deliberately choose to init one after another
|
||||||
let ctx = Diplonat {
|
let ctx = Diplonat {
|
||||||
config: config::load_env().context("Unable to read configuration from environment")?,
|
|
||||||
gateway: gw::get_gateway().await?,
|
|
||||||
notif: tx,
|
notif: tx,
|
||||||
public_ports: &[110, 111, 112],
|
state: ns,
|
||||||
adapters: &[]
|
|
||||||
|
env: EnvironmentAdapter::new(&ns, &tx).await?,
|
||||||
|
igd: IgdAdapter::new(&ns, &tx).await?
|
||||||
};
|
};
|
||||||
|
|
||||||
|
info!("Consul URL: {:#?}", ns.consul_url);
|
||||||
|
info!("Consul node name: {:#?}", ns.consul_node_name);
|
||||||
|
info!("Private IP address: {:#?}", ns.private_ip);
|
||||||
|
info!("Refresh time: {:#?} seconds", ns.refresh_time);
|
||||||
|
info!("Expiration time: {:#?} seconds", ns.expiration_time);
|
||||||
|
|
||||||
return Ok(ctx);
|
return Ok(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
50
src/environment_adapter.rs
Normal file
50
src/environment_adapter.rs
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
use std::env;
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
use anyhow::{Result, Context, anyhow};
|
||||||
|
use log::*;
|
||||||
|
use crate::diplonat::*;
|
||||||
|
use crate::node_state::*;
|
||||||
|
use std::cell::Cell;
|
||||||
|
|
||||||
|
const epi: &'static str = "DIPLONAT_PRIVATE_IP";
|
||||||
|
const ert: &'static str = "DIPLONAT_REFRESH_TIME";
|
||||||
|
const eet: &'static str = "DIPLONAT_EXPIRATION_TIME";
|
||||||
|
const ecnd: &'static str = "DIPLONAT_CONSUL_NODE_NAME";
|
||||||
|
const ecu: &'static str = "DIPLONAT_CONSUL_URL";
|
||||||
|
|
||||||
|
pub struct EnvironmentAdapter {}
|
||||||
|
|
||||||
|
impl EnvironmentAdapter {
|
||||||
|
pub async fn new(ns: &Cell<NodeState>, _: &broadcast::Sender<()>) -> Result<Self> {
|
||||||
|
ns.consul_node_name = Some(match env::var(ecu) {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(_) => "http://127.0.0.1:8500".to_string()
|
||||||
|
});
|
||||||
|
|
||||||
|
ns.private_ip = Some(env::var(epi)
|
||||||
|
.with_context(|| format!("{} env var must be defined, eg: 192.168.0.18", epi))?);
|
||||||
|
|
||||||
|
ns.refresh_time = Some(env::var(ert)
|
||||||
|
.with_context(|| format!("{} env var must be defined, eg: 60", ert))?
|
||||||
|
.parse()
|
||||||
|
.with_context(|| format!("{} env var must be an integer, eg: 60", ert))?);
|
||||||
|
|
||||||
|
ns.expiration_time = Some(env::var(eet)
|
||||||
|
.with_context(|| format!("{} env var must be defined, eg: 300", eet))?
|
||||||
|
.parse()
|
||||||
|
.with_context(|| format!("{} env var must be an integer, eg: 300", eet))?);
|
||||||
|
|
||||||
|
ns.consul_node_name = Some(env::var(ecnd)
|
||||||
|
.with_context(|| format!("{} env var must be defined", ecnd))?);
|
||||||
|
|
||||||
|
match (ns.refresh_time, ns.expiration_time) {
|
||||||
|
(Some(rt), Some(et)) if rt * 2 <= et => debug!("Checked refresh time is lower than expiration time"),
|
||||||
|
(Some(rt), Some(et)) => return Err(anyhow!("Expiration time (currently: {}s) must be twice bigger than refresh time (currently: {}s)", rt, et)),
|
||||||
|
_ => return Err(anyhow!("Please define refresh time and expiration time"))
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(Self{});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
12
src/gw.rs
12
src/gw.rs
|
@ -1,12 +0,0 @@
|
||||||
use igd::aio::search_gateway;
|
|
||||||
use anyhow::{Result, Context};
|
|
||||||
use log::*;
|
|
||||||
|
|
||||||
pub async fn get_gateway() -> Result<igd::aio::Gateway> {
|
|
||||||
let gw = search_gateway(Default::default())
|
|
||||||
.await
|
|
||||||
.context("Failed to find gateway")?;
|
|
||||||
|
|
||||||
info!("Gateway: {}", gw);
|
|
||||||
return Ok(gw);
|
|
||||||
}
|
|
|
@ -1,11 +1,30 @@
|
||||||
use crate::*;
|
use igd::aio::*;
|
||||||
use anyhow::Result;
|
use log::*;
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
use anyhow::{Result, Context};
|
||||||
|
use std::cell::Cell;
|
||||||
|
|
||||||
pub struct IgdAdapter {}
|
use crate::diplonat::*;
|
||||||
impl adapter::Adapter for IgdAdapter {
|
use crate::node_state::*;
|
||||||
fn new(&self, parent: &diplonat::Diplonat) -> Result<()> {
|
|
||||||
return Ok(());
|
pub struct IgdAdapter<'a> {
|
||||||
|
state: &'a Cell<NodeState>,
|
||||||
|
gateway: Gateway,
|
||||||
|
}
|
||||||
|
impl<'a> IgdAdapter<'a> {
|
||||||
|
pub async fn new(ns: &'a Cell<NodeState>, send: &broadcast::Sender<()>) -> Result<IgdAdapter<'a>> {
|
||||||
|
let gw = search_gateway(Default::default())
|
||||||
|
.await
|
||||||
|
.context("Failed to find gateway")?;
|
||||||
|
info!("Gateway: {}", gw);
|
||||||
|
|
||||||
|
let ctx = Self {
|
||||||
|
state: ns,
|
||||||
|
gateway: gw
|
||||||
|
};
|
||||||
|
return Ok(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run(&self) -> Result<()> {
|
fn run(&self) -> Result<()> {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,13 @@
|
||||||
mod diplonat;
|
mod diplonat;
|
||||||
mod config;
|
mod node_state;
|
||||||
mod gw;
|
mod environment_adapter;
|
||||||
mod adapter;
|
|
||||||
mod igd_adapter;
|
mod igd_adapter;
|
||||||
|
|
||||||
//use std::net::SocketAddrV4;
|
//use std::net::SocketAddrV4;
|
||||||
//use std::collections::HashMap;
|
//use std::collections::HashMap;
|
||||||
//use igd::PortMappingProtocol;
|
//use igd::PortMappingProtocol;
|
||||||
use log::*;
|
use log::*;
|
||||||
|
use node_state::*;
|
||||||
use diplonat::*;
|
use diplonat::*;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
|
25
src/node_state.rs
Normal file
25
src/node_state.rs
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
pub struct NodeState {
|
||||||
|
pub consul_node_name: Option<String>,
|
||||||
|
pub consul_url: Option<String>,
|
||||||
|
|
||||||
|
pub refresh_time: Option<u32>,
|
||||||
|
pub expiration_time: Option<u32>,
|
||||||
|
|
||||||
|
pub public_ip: Option<String>,
|
||||||
|
pub private_ip: Option<String>,
|
||||||
|
pub public_ports: Vec<u16>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NodeState {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
return Self {
|
||||||
|
consul_node_name: None,
|
||||||
|
consul_url: None,
|
||||||
|
refresh_time: None,
|
||||||
|
expiration_time: None,
|
||||||
|
public_ip: None,
|
||||||
|
private_ip: None,
|
||||||
|
public_ports: Vec::new()
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue