use std::collections::HashMap; use std::fs::File; use std::io::Read; use anyhow::{anyhow, bail, Result}; use bytes::Bytes; use log::*; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; pub struct ConsulConfig { pub addr: String, pub ca_cert: Option, pub tls_skip_verify: bool, pub client_cert: Option, pub client_key: Option, } // ---- Watch and retrieve Consul catalog ---- // #[derive(Serialize, Deserialize, Debug)] pub struct ConsulNode { #[serde(rename = "Node")] pub node: String, #[serde(rename = "Address")] pub address: String, #[serde(rename = "Meta")] pub meta: HashMap, } #[derive(Serialize, Deserialize, Debug)] pub struct ConsulServiceEntry { #[serde(rename = "Service")] pub service: String, #[serde(rename = "Address")] pub address: String, #[serde(rename = "Port")] pub port: u16, #[serde(rename = "Tags")] pub tags: Vec, } #[derive(Serialize, Deserialize, Debug)] pub struct ConsulNodeCatalog { #[serde(rename = "Node")] pub node: ConsulNode, #[serde(rename = "Services")] pub services: HashMap, } // ---- Consul session management ---- #[derive(Serialize, Deserialize, Debug)] pub struct ConsulSessionRequest { #[serde(rename = "Name")] pub name: String, #[serde(rename = "Node")] pub node: Option, #[serde(rename = "LockDelay")] pub lock_delay: Option, #[serde(rename = "TTL")] pub ttl: Option, #[serde(rename = "Behavior")] pub behavior: Option, } #[derive(Serialize, Deserialize, Debug)] pub struct ConsulSessionResponse { #[serde(rename = "ID")] pub id: String, } #[derive(Clone)] pub struct Consul { client: reqwest::Client, url: String, kv_prefix: String, } impl Consul { pub fn new(config: ConsulConfig, kv_prefix: &str) -> Result { let client = match (&config.client_cert, &config.client_key) { (Some(client_cert), Some(client_key)) => { let mut client_cert_buf = vec![]; File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; let mut client_key_buf = vec![]; File::open(client_key)?.read_to_end(&mut client_key_buf)?; let identity = reqwest::Identity::from_pem( &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], )?; if config.tls_skip_verify { reqwest::Client::builder() .use_rustls_tls() .danger_accept_invalid_certs(true) .identity(identity) .build()? } else if let Some(ca_cert) = &config.ca_cert { let mut ca_cert_buf = vec![]; File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; reqwest::Client::builder() .use_rustls_tls() .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) .identity(identity) .build()? } else { reqwest::Client::builder() .use_rustls_tls() .identity(identity) .build()? } } (None, None) => reqwest::Client::new(), _ => bail!("Incomplete Consul TLS configuration parameters"), }; Ok(Self { client, url: config.addr.trim_end_matches('/').to_string(), kv_prefix: kv_prefix.to_string(), }) } pub async fn list_nodes(&self) -> Result> { debug!("list_nodes"); let url = format!("{}/v1/catalog/nodes", self.url); let http = self.client.get(&url).send().await?; let resp: Vec = http.json().await?; Ok(resp) } pub async fn watch_node( &self, host: &str, idx: Option, ) -> Result<(ConsulNodeCatalog, usize)> { debug!("watch_node {} {:?}", host, idx); let url = match idx { Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i), None => format!("{}/v1/catalog/node/{}", self.url, host), }; let http = self.client.get(&url).send().await?; let new_idx = match http.headers().get("X-Consul-Index") { Some(v) => v.to_str()?.parse::()?, None => bail!("X-Consul-Index header not found"), }; let resp: ConsulNodeCatalog = http.json().await?; Ok((resp, new_idx)) } // ---- KV get and put ---- pub async fn kv_get(&self, key: &str) -> Result> { debug!("kv_get {}", key); let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); let http = self.client.get(&url).send().await?; match http.status() { StatusCode::OK => Ok(Some(http.bytes().await?)), StatusCode::NOT_FOUND => Ok(None), _ => Err(anyhow!( "Consul request failed: {:?}", http.error_for_status() )), } } pub async fn kv_get_json Deserialize<'de>>(&self, key: &str) -> Result> { debug!("kv_get_json {}", key); let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); let http = self.client.get(&url).send().await?; match http.status() { StatusCode::OK => Ok(Some(http.json().await?)), StatusCode::NOT_FOUND => Ok(None), _ => Err(anyhow!( "Consul request failed: {:?}", http.error_for_status() )), } } pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> { debug!("kv_put {}", key); let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); let http = self.client.put(&url).body(bytes).send().await?; http.error_for_status()?; Ok(()) } pub async fn kv_put_json(&self, key: &str, value: &T) -> Result<()> { debug!("kv_put_json {}", key); let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); let http = self.client.put(&url).json(value).send().await?; http.error_for_status()?; Ok(()) } pub async fn kv_delete(&self, key: &str) -> Result<()> { let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); let http = self.client.delete(&url).send().await?; http.error_for_status()?; Ok(()) } // ---- Locking ---- pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result { debug!("create_session {:?}", req); let url = format!("{}/v1/session/create", self.url); let http = self.client.put(&url).json(req).send().await?; let resp: ConsulSessionResponse = http.json().await?; Ok(resp.id) } pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result { debug!("acquire {}", key); let url = format!( "{}/v1/kv/{}{}?acquire={}", self.url, self.kv_prefix, key, session ); let http = self.client.put(&url).body(bytes).send().await?; let resp: bool = http.json().await?; Ok(resp) } pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> { debug!("release {}", key); let url = format!( "{}/v1/kv/{}{}?release={}", self.url, self.kv_prefix, key, session ); let http = self.client.put(&url).body(bytes).send().await?; http.error_for_status()?; Ok(()) } }