From 552fc7e5a0d623114901d4d8e8e29bfffa47e09c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 2 Feb 2023 15:17:23 +0100 Subject: [PATCH] Split into several files and make more APIs --- Cargo.toml | 4 +- examples/health-service-1.json | 206 +++++++++++++++++++++++ examples/test.rs | 59 +++++-- src/catalog.rs | 230 +++++++++++++++++++++++++ src/kv.rs | 64 +++++++ src/lib.rs | 295 +++++++-------------------------- src/locking.rs | 65 ++++++++ src/with_index.rs | 75 +++++++++ 8 files changed, 743 insertions(+), 255 deletions(-) create mode 100644 examples/health-service-1.json create mode 100644 src/catalog.rs create mode 100644 src/kv.rs create mode 100644 src/locking.rs create mode 100644 src/with_index.rs diff --git a/Cargo.toml b/Cargo.toml index a36f28b..b5860aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,8 @@ serde = { version = "1.0.149", features = ["derive"] } log = "0.4" bytes = "1" reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls-manual-roots" ] } +tokio = { version = "1.23", default-features = false, features = [ "macros" ] } +futures = "0.3.25" [dev-dependencies] -tokio = { version = "1.22", features = ["rt", "rt-multi-thread", "macros"] } +tokio = { version = "1.23", features = ["rt", "rt-multi-thread", "macros"] } diff --git a/examples/health-service-1.json b/examples/health-service-1.json new file mode 100644 index 0000000..915b3a9 --- /dev/null +++ b/examples/health-service-1.json @@ -0,0 +1,206 @@ +[ + { + "Node": { + "ID": "9ccf821c-5e3e-cbe0-3727-0b0e5df35412", + "Node": "celeri", + "Address": "10.83.1.3", + "Datacenter": "prod", + "TaggedAddresses": { + "lan": "10.83.1.3", + "lan_ipv4": "10.83.1.3", + "wan": "10.83.1.3", + "wan_ipv4": "10.83.1.3" + }, + "Meta": { + "cname_target": "neptune.site.deuxfleurs.fr.", + "consul-network-segment": "", + "public_ipv4": "77.207.15.215", + "public_ipv6": "2001:910:1204:1::33", + "site": "neptune" + }, + "CreateIndex": 28797576, + "ModifyIndex": 28797584 + }, + "Service": { + "ID": "_nomad-task-c7132dd2-cc89-cbb4-5e53-6bd7c8c33677-front-https-jitsi-https_port", + "Service": "https-jitsi", + "Tags": [ + "jitsi", + "tricot jitsi.deuxfleurs.fr", + "d53-cname jitsi.deuxfleurs.fr" + ], + "Address": "10.83.1.3", + "TaggedAddresses": { + "lan_ipv4": { + "Address": "10.83.1.3", + "Port": 30140 + }, + "wan_ipv4": { + "Address": "10.83.1.3", + "Port": 30140 + } + }, + "Meta": { + "external-source": "nomad" + }, + "Port": 30140, + "Weights": { + "Passing": 1, + "Warning": 1 + }, + "EnableTagOverride": false, + "Proxy": { + "Mode": "", + "MeshGateway": {}, + "Expose": {} + }, + "Connect": {}, + "CreateIndex": 29922481, + "ModifyIndex": 29922481 + }, + "Checks": [ + { + "Node": "celeri", + "CheckID": "serfHealth", + "Name": "Serf Health Status", + "Status": "critical", + "Notes": "", + "Output": "Agent not live or unreachable", + "ServiceID": "", + "ServiceName": "", + "ServiceTags": [], + "Type": "", + "Interval": "", + "Timeout": "", + "ExposedPort": 0, + "Definition": {}, + "CreateIndex": 28797576, + "ModifyIndex": 32831331 + }, + { + "Node": "celeri", + "CheckID": "_nomad-check-1e8b81aa1790b51ffdf12117760d851bf00cb96f", + "Name": "service: \"https-jitsi\" check", + "Status": "passing", + "Notes": "", + "Output": "TCP connect 10.83.1.3:30140: Success", + "ServiceID": "_nomad-task-c7132dd2-cc89-cbb4-5e53-6bd7c8c33677-front-https-jitsi-https_port", + "ServiceName": "https-jitsi", + "ServiceTags": [ + "jitsi", + "tricot jitsi.deuxfleurs.fr", + "d53-cname jitsi.deuxfleurs.fr" + ], + "Type": "tcp", + "Interval": "1m0s", + "Timeout": "1m0s", + "ExposedPort": 0, + "Definition": {}, + "CreateIndex": 29922481, + "ModifyIndex": 29922617 + } + ] + }, + { + "Node": { + "ID": "cee1c827-ff32-6f38-a815-69038a961a80", + "Node": "dahlia", + "Address": "10.83.2.1", + "Datacenter": "prod", + "TaggedAddresses": { + "lan": "10.83.2.1", + "lan_ipv4": "10.83.2.1", + "wan": "10.83.2.1", + "wan_ipv4": "10.83.2.1" + }, + "Meta": { + "cname_target": "orion.site.deuxfleurs.fr.", + "consul-network-segment": "", + "public_ipv4": "82.66.80.201", + "public_ipv6": "2a01:e0a:28f:5e60::11", + "site": "orion" + }, + "CreateIndex": 31880680, + "ModifyIndex": 31880870 + }, + "Service": { + "ID": "_nomad-task-8214d936-aab2-35b4-d24e-8941a2d3b5b0-front-https-jitsi-https_port", + "Service": "https-jitsi", + "Tags": [ + "jitsi", + "tricot jitsi.deuxfleurs.fr", + "d53-cname jitsi.deuxfleurs.fr" + ], + "Address": "10.83.2.1", + "TaggedAddresses": { + "lan_ipv4": { + "Address": "10.83.2.1", + "Port": 25753 + }, + "wan_ipv4": { + "Address": "10.83.2.1", + "Port": 25753 + } + }, + "Meta": { + "external-source": "nomad" + }, + "Port": 25753, + "Weights": { + "Passing": 1, + "Warning": 1 + }, + "EnableTagOverride": false, + "Proxy": { + "Mode": "", + "MeshGateway": {}, + "Expose": {} + }, + "Connect": {}, + "CreateIndex": 33002874, + "ModifyIndex": 33002874 + }, + "Checks": [ + { + "Node": "dahlia", + "CheckID": "serfHealth", + "Name": "Serf Health Status", + "Status": "passing", + "Notes": "", + "Output": "Agent alive and reachable", + "ServiceID": "", + "ServiceName": "", + "ServiceTags": [], + "Type": "", + "Interval": "", + "Timeout": "", + "ExposedPort": 0, + "Definition": {}, + "CreateIndex": 31880680, + "ModifyIndex": 31880680 + }, + { + "Node": "dahlia", + "CheckID": "_nomad-check-75542d8bde0dd9887ff8af10a02bf6a1f605695d", + "Name": "service: \"https-jitsi\" check", + "Status": "passing", + "Notes": "", + "Output": "TCP connect 10.83.2.1:25753: Success", + "ServiceID": "_nomad-task-8214d936-aab2-35b4-d24e-8941a2d3b5b0-front-https-jitsi-https_port", + "ServiceName": "https-jitsi", + "ServiceTags": [ + "jitsi", + "tricot jitsi.deuxfleurs.fr", + "d53-cname jitsi.deuxfleurs.fr" + ], + "Type": "tcp", + "Interval": "1m0s", + "Timeout": "1m0s", + "ExposedPort": 0, + "Definition": {}, + "CreateIndex": 33002874, + "ModifyIndex": 33002958 + } + ] + } +] diff --git a/examples/test.rs b/examples/test.rs index e7c34c8..ba29583 100644 --- a/examples/test.rs +++ b/examples/test.rs @@ -2,23 +2,52 @@ use df_consul::*; #[tokio::main] async fn main() { - let config = ConsulConfig { - addr: "http://localhost:8500".into(), - ca_cert: None, - tls_skip_verify: false, - client_cert: None, - client_key: None, - }; + let config = ConsulConfig { + addr: "http://localhost:8500".into(), + ca_cert: None, + tls_skip_verify: false, + client_cert: None, + client_key: None, + }; - let consul = Consul::new(config, "").unwrap(); + let consul = Consul::new(config, "").unwrap(); - println!("== LIST NODES =="); - let list_nodes = consul.list_nodes().await.unwrap(); - println!("{:?}", list_nodes); + println!("== LIST NODES =="); + let nodes = consul.catalog_node_list(None).await.unwrap(); + println!("{:?}", nodes); - println!("== CATALOG 1 =="); - println!("{:?}", consul.watch_node("caribou", None).await.unwrap()); + if let Some(node) = nodes.first() { + println!("== NODE {} ==", node.node); + println!("{:?}", consul.catalog_node(&node.node, None).await.unwrap()); + } - println!("== CATALOG 2 =="); - println!("{:?}", consul.watch_node("cariacou", None).await.unwrap()); + println!("== LIST SERVICES =="); + let services = consul.catalog_service_list(None).await.unwrap(); + println!("{:?}", services); + + if let Some(service) = services.keys().next() { + println!("== SERVICE NODES {} ==", service); + println!( + "{:?}", + consul.catalog_service_nodes(service, None).await.unwrap() + ); + + println!("== SERVICE HEALTH {} ==", service); + println!( + "{:?}", + consul + .health_service_instances(service, None) + .await + .unwrap() + ); + } + + println!("== WATCHING EVERYTHING =="); + let mut watch = consul.watch_all_service_health(); + loop { + if watch.changed().await.is_err() { + break; + } + println!("\n{:?}", watch.borrow_and_update()); + } } diff --git a/src/catalog.rs b/src/catalog.rs new file mode 100644 index 0000000..952e99e --- /dev/null +++ b/src/catalog.rs @@ -0,0 +1,230 @@ +use std::collections::HashMap; +use std::fmt::Write; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use futures::future::BoxFuture; +use futures::stream::futures_unordered::FuturesUnordered; +use futures::{FutureExt, StreamExt}; +use log::*; +use serde::{Deserialize, Serialize}; +use tokio::select; +use tokio::sync::watch; + +use crate::{Consul, WithIndex}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulNode { + pub node: String, + pub address: String, + pub meta: HashMap, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulService { + pub service: String, + pub address: String, + pub port: u16, + pub tags: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulCatalogNode { + pub node: ConsulNode, + pub services: HashMap, +} + +pub type ConsulServiceList = HashMap>; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulServiceNode { + pub node: String, + pub address: String, + pub node_meta: HashMap, + pub service_name: String, + pub service_tags: Vec, + pub service_address: String, + pub service_port: u16, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulHealthServiceNode { + pub node: ConsulNode, + pub service: ConsulService, + pub checks: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct ConsulHealthCheck { + pub node: String, + #[serde(rename = "CheckID")] + pub check_id: String, + pub name: String, + pub status: String, + pub output: String, + #[serde(rename = "Type")] + pub type_: String, +} + +pub type AllServiceHealth = HashMap>; + +impl Consul { + pub async fn catalog_node_list( + &self, + last_index: Option, + ) -> Result>> { + self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index) + .await + } + + pub async fn catalog_node( + &self, + host: &str, + last_index: Option, + ) -> Result>> { + self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index) + .await + } + + pub async fn catalog_service_list( + &self, + last_index: Option, + ) -> Result> { + self.get_with_index::( + format!("{}/v1/catalog/services", self.url), + last_index, + ) + .await + } + + pub async fn catalog_service_nodes( + &self, + service: &str, + last_index: Option, + ) -> Result>> { + self.get_with_index( + format!("{}/v1/catalog/service/{}", self.url, service), + last_index, + ) + .await + } + + pub async fn health_service_instances( + &self, + service: &str, + last_index: Option, + ) -> Result>> { + self.get_with_index( + format!("{}/v1/health/service/{}", self.url, service), + last_index, + ) + .await + } + + pub fn watch_all_service_health(&self) -> watch::Receiver { + let (tx, rx) = watch::channel(HashMap::new()); + + tokio::spawn(do_watch_all_service_health(self.clone(), tx)); + + rx + } + + async fn get_with_index Deserialize<'de>>( + &self, + mut url: String, + last_index: Option, + ) -> Result> { + if let Some(i) = last_index { + if url.contains('?') { + write!(&mut url, "&index={}", i).unwrap(); + } else { + write!(&mut url, "?index={}", i).unwrap(); + } + } + debug!("GET {} as {}", url, std::any::type_name::()); + + let http = self.client.get(&url).send().await?; + + Ok(WithIndex::::index_from(&http)?.value(http.json().await?)) + } +} + +async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender) { + let mut services = AllServiceHealth::new(); + let mut service_watchers = FuturesUnordered::)>>::new(); + let mut service_list: BoxFuture> = Box::pin(consul.catalog_service_list(None)); + + loop { + select! { + list_res = &mut service_list => { + match list_res { + Ok(list) => { + let list_index = list.index(); + for service in list.into_inner().keys() { + if !services.contains_key(service) { + services.insert(service.to_string(), Arc::new([])); + + let service = service.to_string(); + service_watchers.push(Box::pin(async { + let res = consul.health_service_instances(&service, None).await; + (service, res) + })); + } + } + service_list = Box::pin(consul.catalog_service_list(Some(list_index))); + } + Err(e) => { + warn!("Error listing services: {}", e); + service_list = Box::pin(async { + tokio::time::sleep(Duration::from_secs(30)).await; + consul.catalog_service_list(None).await + }); + } + } + } + (service, watch_res) = service_watchers.next().then(some_or_pending) => { + match watch_res { + Ok(nodes) => { + let index = nodes.index(); + services.insert(service.clone(), nodes.into_inner().into()); + + let consul = &consul; + service_watchers.push(Box::pin(async move { + let res = consul.health_service_instances(&service, Some(index)).await; + (service, res) + })); + + if tx.send(services.clone()).is_err() { + break; + } + } + Err(e) => { + warn!("Error getting service {}: {}", service, e); + service_watchers.push(Box::pin(async { + tokio::time::sleep(Duration::from_secs(30)).await; + let res = consul.health_service_instances(&service, None).await; + (service, res) + })); + } + } + } + _ = tx.closed() => { + break; + } + } + } +} + +async fn some_or_pending(value: Option) -> T { + match value { + Some(v) => v, + None => futures::future::pending().await, + } +} diff --git a/src/kv.rs b/src/kv.rs new file mode 100644 index 0000000..6c6f15a --- /dev/null +++ b/src/kv.rs @@ -0,0 +1,64 @@ +use anyhow::{anyhow, Result}; +use bytes::Bytes; +use log::*; +use reqwest::StatusCode; +use serde::{Deserialize, Serialize}; + +use crate::Consul; + +impl Consul { + pub async fn kv_get(&self, key: &str) -> Result> { + debug!("kv_get {}", key); + + let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); + let http = self.client.get(&url).send().await?; + match http.status() { + StatusCode::OK => Ok(Some(http.bytes().await?)), + StatusCode::NOT_FOUND => Ok(None), + _ => Err(anyhow!( + "Consul request failed: {:?}", + http.error_for_status() + )), + } + } + + pub async fn kv_get_json Deserialize<'de>>(&self, key: &str) -> Result> { + debug!("kv_get_json {}", key); + + let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); + let http = self.client.get(&url).send().await?; + match http.status() { + StatusCode::OK => Ok(Some(http.json().await?)), + StatusCode::NOT_FOUND => Ok(None), + _ => Err(anyhow!( + "Consul request failed: {:?}", + http.error_for_status() + )), + } + } + + pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> { + debug!("kv_put {}", key); + + let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); + let http = self.client.put(&url).body(bytes).send().await?; + http.error_for_status()?; + Ok(()) + } + + pub async fn kv_put_json(&self, key: &str, value: &T) -> Result<()> { + debug!("kv_put_json {}", key); + + let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); + let http = self.client.put(&url).json(value).send().await?; + http.error_for_status()?; + Ok(()) + } + + pub async fn kv_delete(&self, key: &str) -> Result<()> { + let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); + let http = self.client.delete(&url).send().await?; + http.error_for_status()?; + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 8a9ad53..6326fa0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,258 +1,75 @@ -use std::collections::HashMap; +mod catalog; +mod kv; +mod locking; +mod with_index; + use std::fs::File; use std::io::Read; -use anyhow::{anyhow, bail, Result}; -use bytes::Bytes; -use log::*; -use reqwest::StatusCode; -use serde::{Deserialize, Serialize}; +use anyhow::{bail, Result}; + +pub use with_index::WithIndex; pub struct ConsulConfig { - pub addr: String, - pub ca_cert: Option, - pub tls_skip_verify: bool, - pub client_cert: Option, - pub client_key: Option, -} - -// ---- Watch and retrieve Consul catalog ---- -// -#[derive(Serialize, Deserialize, Debug)] -pub struct ConsulNode { - #[serde(rename = "Node")] - pub node: String, - #[serde(rename = "Address")] - pub address: String, - #[serde(rename = "Meta")] - pub meta: HashMap, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct ConsulServiceEntry { - #[serde(rename = "Service")] - pub service: String, - - #[serde(rename = "Address")] - pub address: String, - - #[serde(rename = "Port")] - pub port: u16, - - #[serde(rename = "Tags")] - pub tags: Vec, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct ConsulNodeCatalog { - #[serde(rename = "Node")] - pub node: ConsulNode, - #[serde(rename = "Services")] - pub services: HashMap, -} - -// ---- Consul session management ---- - -#[derive(Serialize, Deserialize, Debug)] -pub struct ConsulSessionRequest { - #[serde(rename = "Name")] - pub name: String, - - #[serde(rename = "Node")] - pub node: Option, - - #[serde(rename = "LockDelay")] - pub lock_delay: Option, - - #[serde(rename = "TTL")] - pub ttl: Option, - - #[serde(rename = "Behavior")] - pub behavior: Option, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct ConsulSessionResponse { - #[serde(rename = "ID")] - pub id: String, + pub addr: String, + pub ca_cert: Option, + pub tls_skip_verify: bool, + pub client_cert: Option, + pub client_key: Option, } #[derive(Clone)] pub struct Consul { - client: reqwest::Client, + client: reqwest::Client, - url: String, - kv_prefix: String, + url: String, + kv_prefix: String, } impl Consul { - pub fn new(config: ConsulConfig, kv_prefix: &str) -> Result { - let client = match (&config.client_cert, &config.client_key) { - (Some(client_cert), Some(client_key)) => { - let mut client_cert_buf = vec![]; - File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; + pub fn new(config: ConsulConfig, kv_prefix: &str) -> Result { + let client = match (&config.client_cert, &config.client_key) { + (Some(client_cert), Some(client_key)) => { + let mut client_cert_buf = vec![]; + File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; - let mut client_key_buf = vec![]; - File::open(client_key)?.read_to_end(&mut client_key_buf)?; + let mut client_key_buf = vec![]; + File::open(client_key)?.read_to_end(&mut client_key_buf)?; - let identity = reqwest::Identity::from_pem( - &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], - )?; + let identity = reqwest::Identity::from_pem( + &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], + )?; - if config.tls_skip_verify { - reqwest::Client::builder() - .use_rustls_tls() - .danger_accept_invalid_certs(true) - .identity(identity) - .build()? - } else if let Some(ca_cert) = &config.ca_cert { - let mut ca_cert_buf = vec![]; - File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; + if config.tls_skip_verify { + reqwest::Client::builder() + .use_rustls_tls() + .danger_accept_invalid_certs(true) + .identity(identity) + .build()? + } else if let Some(ca_cert) = &config.ca_cert { + let mut ca_cert_buf = vec![]; + File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; - reqwest::Client::builder() - .use_rustls_tls() - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) - .identity(identity) - .build()? - } else { - reqwest::Client::builder() - .use_rustls_tls() - .identity(identity) - .build()? - } - } - (None, None) => reqwest::Client::new(), - _ => bail!("Incomplete Consul TLS configuration parameters"), - }; + reqwest::Client::builder() + .use_rustls_tls() + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) + .identity(identity) + .build()? + } else { + reqwest::Client::builder() + .use_rustls_tls() + .identity(identity) + .build()? + } + } + (None, None) => reqwest::Client::new(), + _ => bail!("Incomplete Consul TLS configuration parameters"), + }; - Ok(Self { - client, - url: config.addr.trim_end_matches('/').to_string(), - kv_prefix: kv_prefix.to_string(), - }) - } - - pub async fn list_nodes(&self) -> Result> { - debug!("list_nodes"); - - let url = format!("{}/v1/catalog/nodes", self.url); - - let http = self.client.get(&url).send().await?; - let resp: Vec = http.json().await?; - Ok(resp) - } - - pub async fn watch_node( - &self, - host: &str, - idx: Option, - ) -> Result<(Option, usize)> { - debug!("watch_node {} {:?}", host, idx); - - let url = match idx { - Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i), - None => format!("{}/v1/catalog/node/{}", self.url, host), - }; - - let http = self.client.get(&url).send().await?; - let new_idx = match http.headers().get("X-Consul-Index") { - Some(v) => v.to_str()?.parse::()?, - None => bail!("X-Consul-Index header not found"), - }; - - let resp: Option = http.json().await?; - Ok((resp, new_idx)) - } - - // ---- KV get and put ---- - - pub async fn kv_get(&self, key: &str) -> Result> { - debug!("kv_get {}", key); - - let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); - let http = self.client.get(&url).send().await?; - match http.status() { - StatusCode::OK => Ok(Some(http.bytes().await?)), - StatusCode::NOT_FOUND => Ok(None), - _ => Err(anyhow!( - "Consul request failed: {:?}", - http.error_for_status() - )), - } - } - - pub async fn kv_get_json Deserialize<'de>>(&self, key: &str) -> Result> { - debug!("kv_get_json {}", key); - - let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); - let http = self.client.get(&url).send().await?; - match http.status() { - StatusCode::OK => Ok(Some(http.json().await?)), - StatusCode::NOT_FOUND => Ok(None), - _ => Err(anyhow!( - "Consul request failed: {:?}", - http.error_for_status() - )), - } - } - - pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> { - debug!("kv_put {}", key); - - let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); - let http = self.client.put(&url).body(bytes).send().await?; - http.error_for_status()?; - Ok(()) - } - - pub async fn kv_put_json(&self, key: &str, value: &T) -> Result<()> { - debug!("kv_put_json {}", key); - - let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); - let http = self.client.put(&url).json(value).send().await?; - http.error_for_status()?; - Ok(()) - } - - pub async fn kv_delete(&self, key: &str) -> Result<()> { - let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); - let http = self.client.delete(&url).send().await?; - http.error_for_status()?; - Ok(()) - } - - // ---- Locking ---- - - pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result { - debug!("create_session {:?}", req); - - let url = format!("{}/v1/session/create", self.url); - let http = self.client.put(&url).json(req).send().await?; - let resp: ConsulSessionResponse = http.json().await?; - Ok(resp.id) - } - - pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result { - debug!("acquire {}", key); - - let url = format!( - "{}/v1/kv/{}{}?acquire={}", - self.url, self.kv_prefix, key, session - ); - let http = self.client.put(&url).body(bytes).send().await?; - let resp: bool = http.json().await?; - Ok(resp) - } - - pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> { - debug!("release {}", key); - - let url = format!( - "{}/v1/kv/{}{}?release={}", - self.url, self.kv_prefix, key, session - ); - let http = self.client.put(&url).body(bytes).send().await?; - http.error_for_status()?; - Ok(()) - } + Ok(Self { + client, + url: config.addr.trim_end_matches('/').to_string(), + kv_prefix: kv_prefix.to_string(), + }) + } } diff --git a/src/locking.rs b/src/locking.rs new file mode 100644 index 0000000..12e9ac0 --- /dev/null +++ b/src/locking.rs @@ -0,0 +1,65 @@ +use anyhow::Result; +use bytes::Bytes; +use log::*; +use serde::{Deserialize, Serialize}; + +use crate::Consul; + +#[derive(Serialize, Deserialize, Debug)] +pub struct ConsulSessionRequest { + #[serde(rename = "Name")] + pub name: String, + + #[serde(rename = "Node")] + pub node: Option, + + #[serde(rename = "LockDelay")] + pub lock_delay: Option, + + #[serde(rename = "TTL")] + pub ttl: Option, + + #[serde(rename = "Behavior")] + pub behavior: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ConsulSessionResponse { + #[serde(rename = "ID")] + pub id: String, +} + +impl Consul { + pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result { + debug!("create_session {:?}", req); + + let url = format!("{}/v1/session/create", self.url); + let http = self.client.put(&url).json(req).send().await?; + let resp: ConsulSessionResponse = http.json().await?; + Ok(resp.id) + } + + pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result { + debug!("acquire {}", key); + + let url = format!( + "{}/v1/kv/{}{}?acquire={}", + self.url, self.kv_prefix, key, session + ); + let http = self.client.put(&url).body(bytes).send().await?; + let resp: bool = http.json().await?; + Ok(resp) + } + + pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> { + debug!("release {}", key); + + let url = format!( + "{}/v1/kv/{}{}?release={}", + self.url, self.kv_prefix, key, session + ); + let http = self.client.put(&url).body(bytes).send().await?; + http.error_for_status()?; + Ok(()) + } +} diff --git a/src/with_index.rs b/src/with_index.rs new file mode 100644 index 0000000..90e06be --- /dev/null +++ b/src/with_index.rs @@ -0,0 +1,75 @@ +use std::fmt::{Debug, Display}; + +use anyhow::{bail, Result}; +use reqwest::Response; + +pub struct WithIndex { + value: T, + index: usize, +} + +impl WithIndex { + pub fn index_from(resp: &Response) -> Result> { + let index = match resp.headers().get("X-Consul-Index") { + Some(v) => v.to_str()?.parse::()?, + None => bail!("X-Consul-Index header not found"), + }; + Ok(WithIndexBuilder { + index, + _phantom: Default::default(), + }) + } + + pub fn into_inner(self) -> T { + self.value + } + + pub fn index(&self) -> usize { + self.index + } +} + +impl std::convert::AsRef for WithIndex { + fn as_ref(&self) -> &T { + &self.value + } +} + +impl std::borrow::Borrow for WithIndex { + fn borrow(&self) -> &T { + &self.value + } +} + +impl std::ops::Deref for WithIndex { + type Target = T; + fn deref(&self) -> &T { + &self.value + } +} + +impl Debug for WithIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + ::fmt(self, f) + } +} + +impl Display for WithIndex { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + ::fmt(self, f) + } +} + +pub struct WithIndexBuilder { + _phantom: std::marker::PhantomData, + index: usize, +} + +impl WithIndexBuilder { + pub fn value(self, value: T) -> WithIndex { + WithIndex { + value, + index: self.index, + } + } +}