Working parsing
This commit is contained in:
parent
4cebe77b6f
commit
453b1c684e
5 changed files with 78 additions and 11 deletions
34
Cargo.lock
generated
34
Cargo.lock
generated
|
@ -131,6 +131,7 @@ dependencies = [
|
||||||
"pretty_env_logger",
|
"pretty_env_logger",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde-lexpr",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
@ -522,6 +523,29 @@ version = "1.4.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "lexpr"
|
||||||
|
version = "0.2.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2ab101919962a12ffdaf7170f0943715a9d47f35a9753986028c305183bbf1a6"
|
||||||
|
dependencies = [
|
||||||
|
"itoa",
|
||||||
|
"lexpr-macros",
|
||||||
|
"proc-macro-hack",
|
||||||
|
"ryu",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "lexpr-macros"
|
||||||
|
version = "0.2.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "cd627fb38e19c00d8d068618259205f7a91c91aeade5c15bc35dbca037bb1c35"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro-hack",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libc"
|
name = "libc"
|
||||||
version = "0.2.66"
|
version = "0.2.66"
|
||||||
|
@ -974,6 +998,16 @@ dependencies = [
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde-lexpr"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "143e5e658ac3a7374bdf285b9355cab74dd144293b86c9be27eab39452239d41"
|
||||||
|
dependencies = [
|
||||||
|
"lexpr",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_derive"
|
name = "serde_derive"
|
||||||
version = "1.0.107"
|
version = "1.0.107"
|
||||||
|
|
|
@ -15,4 +15,5 @@ tokio = "0.2.11"
|
||||||
futures = "0.3.5"
|
futures = "0.3.5"
|
||||||
serde = { version = "1.0.107", features = ["derive"] }
|
serde = { version = "1.0.107", features = ["derive"] }
|
||||||
serde_json = "1.0.53"
|
serde_json = "1.0.53"
|
||||||
|
serde-lexpr = "0.1.1"
|
||||||
anyhow = "1.0.28"
|
anyhow = "1.0.28"
|
||||||
|
|
|
@ -59,6 +59,6 @@ To test the Consul Catalog part, you can do:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
consul agent -dev #in a separate terminal, if not already running
|
consul agent -dev #in a separate terminal, if not already running
|
||||||
consul services register -name=example -port=1337 -tag="diplonat_pub_port=1337"
|
consul services register -name=example -port=1337 -tag="(diplonat ((port 1337) (port 1338)))"
|
||||||
consul services -id=example
|
consul services -id=example
|
||||||
```
|
```
|
||||||
|
|
|
@ -4,12 +4,12 @@ use anyhow::{Result, anyhow};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub struct ServiceEntry {
|
pub struct ServiceEntry {
|
||||||
Tags: Vec<String>
|
pub Tags: Vec<String>
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub struct CatalogNode {
|
pub struct CatalogNode {
|
||||||
Services: HashMap<String, ServiceEntry>
|
pub Services: HashMap<String, ServiceEntry>
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Consul {
|
pub struct Consul {
|
||||||
|
|
|
@ -1,16 +1,28 @@
|
||||||
use crate::consul::Consul;
|
|
||||||
use tokio::sync::watch;
|
|
||||||
use tokio::time::delay_for;
|
|
||||||
use crate::messages;
|
|
||||||
use anyhow::Result;
|
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use log::*;
|
use log::*;
|
||||||
|
use tokio::sync::watch;
|
||||||
|
use tokio::time::delay_for;
|
||||||
|
use anyhow::Result;
|
||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
use serde_lexpr::{from_str,to_string,error};
|
||||||
|
use crate::messages;
|
||||||
|
use crate::consul;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub enum DiplonatParameter {
|
||||||
|
port(Vec<u16>)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub enum DiplonatConsul {
|
||||||
|
diplonat(Vec<DiplonatParameter>)
|
||||||
|
}
|
||||||
|
|
||||||
pub struct ConsulActor {
|
pub struct ConsulActor {
|
||||||
pub rx_open_ports: watch::Receiver<messages::OpenPorts>,
|
pub rx_open_ports: watch::Receiver<messages::OpenPorts>,
|
||||||
|
|
||||||
consul: Consul,
|
consul: consul::Consul,
|
||||||
node: String,
|
node: String,
|
||||||
retries: u32,
|
retries: u32,
|
||||||
tx_open_ports: watch::Sender<messages::OpenPorts>
|
tx_open_ports: watch::Sender<messages::OpenPorts>
|
||||||
|
@ -22,11 +34,31 @@ fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
|
||||||
return Duration::from_secs(cmp::max(max_time.as_secs(), 1.2f64.powf(retries as f64) as u64))
|
return Duration::from_secs(cmp::max(max_time.as_secs(), 1.2f64.powf(retries as f64) as u64))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn from_catalog_to_open_ports(catalog: &consul::CatalogNode) -> messages::OpenPorts {
|
||||||
|
let mut op = messages::OpenPorts { ports: Vec::new() };
|
||||||
|
for (_, service_info) in &catalog.Services {
|
||||||
|
for tag in &service_info.Tags {
|
||||||
|
let diplo_conf: error::Result<DiplonatConsul> = from_str(tag);
|
||||||
|
match diplo_conf {
|
||||||
|
Ok(conf) => {
|
||||||
|
let DiplonatConsul::diplonat(c) = conf;
|
||||||
|
for parameter in &c {
|
||||||
|
let DiplonatParameter::port(p) = parameter;
|
||||||
|
op.ports.extend(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => debug!("Failed to parse entry {}. {}", tag, e),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return op;
|
||||||
|
}
|
||||||
|
|
||||||
impl ConsulActor {
|
impl ConsulActor {
|
||||||
pub fn new(url: &str, node: &str) -> Self {
|
pub fn new(url: &str, node: &str) -> Self {
|
||||||
let (tx, rx) = watch::channel(messages::OpenPorts{ports: Vec::new() });
|
let (tx, rx) = watch::channel(messages::OpenPorts{ports: Vec::new() });
|
||||||
return Self {
|
return Self {
|
||||||
consul: Consul::new(url),
|
consul: consul::Consul::new(url),
|
||||||
rx_open_ports: rx,
|
rx_open_ports: rx,
|
||||||
tx_open_ports: tx,
|
tx_open_ports: tx,
|
||||||
node: node.to_string(),
|
node: node.to_string(),
|
||||||
|
@ -47,7 +79,7 @@ impl ConsulActor {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("{:#?}", catalog);
|
info!("{:#?}", from_catalog_to_open_ports(&catalog));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue