use std::collections::HashMap; use anyhow::Result; use bytes::Bytes; use log::*; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; // ---- Watch and retrieve Consul catalog ---- // #[derive(Serialize, Deserialize, Debug)] pub struct ConsulNode { #[serde(rename = "Node")] pub node: String, #[serde(rename = "Address")] pub address: String, } #[derive(Serialize, Deserialize, Debug)] pub struct ConsulServiceEntry { #[serde(rename = "Service")] pub service: String, #[serde(rename = "Address")] pub address: String, #[serde(rename = "Port")] pub port: u16, #[serde(rename = "Tags")] pub tags: Vec, } #[derive(Serialize, Deserialize, Debug)] pub struct ConsulNodeCatalog { #[serde(rename = "Node")] pub node: ConsulNode, #[serde(rename = "Services")] pub services: HashMap, } // ---- Consul session management ---- #[derive(Serialize, Deserialize, Debug)] pub struct ConsulSessionRequest { #[serde(rename = "Name")] pub name: String, #[serde(rename = "Node")] pub node: Option, #[serde(rename = "LockDelay")] pub lock_delay: Option, #[serde(rename = "TTL")] pub ttl: Option, #[serde(rename = "Behavior")] pub behavior: Option, } #[derive(Serialize, Deserialize, Debug)] pub struct ConsulSessionResponse { #[serde(rename = "ID")] pub id: String, } #[derive(Clone)] pub struct Consul { client: reqwest::Client, url: String, kv_prefix: String, pub local_node: String, } impl Consul { pub fn new(url: &str, kv_prefix: &str, local_node: &str) -> Self { return Self { client: reqwest::Client::new(), url: url.to_string(), kv_prefix: kv_prefix.to_string(), local_node: local_node.into(), }; } pub async fn list_nodes(&self) -> Result> { debug!("list_nodes"); let url = format!("{}/v1/catalog/nodes", self.url); let http = self.client.get(&url).send().await?; let resp: Vec = http.json().await?; Ok(resp.into_iter().map(|n| n.node).collect::>()) } pub async fn watch_node( &self, host: &str, idx: Option, ) -> Result<(ConsulNodeCatalog, usize)> { debug!("watch_node {} {:?}", host, idx); let url = match idx { Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i), None => format!("{}/v1/catalog/node/{}", self.url, host), }; let http = self.client.get(&url).send().await?; let new_idx = match http.headers().get("X-Consul-Index") { Some(v) => v.to_str()?.parse::()?, None => bail!("X-Consul-Index header not found"), }; let resp: ConsulNodeCatalog = http.json().await?; return Ok((resp, new_idx)); } // ---- KV get and put ---- pub async fn kv_get(&self, key: &str) -> Result> { debug!("kv_get {}", key); let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); let http = self.client.get(&url).send().await?; match http.status() { StatusCode::OK => Ok(Some(http.bytes().await?)), StatusCode::NOT_FOUND => Ok(None), _ => Err(anyhow!( "Consul request failed: {:?}", http.error_for_status() )), } } pub async fn kv_get_json Deserialize<'de>>(&self, key: &str) -> Result> { debug!("kv_get_json {}", key); let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key); let http = self.client.get(&url).send().await?; match http.status() { StatusCode::OK => Ok(Some(http.json().await?)), StatusCode::NOT_FOUND => Ok(None), _ => Err(anyhow!( "Consul request failed: {:?}", http.error_for_status() )), } } pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> { debug!("kv_put {}", key); let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); let http = self.client.put(&url).body(bytes).send().await?; http.error_for_status()?; Ok(()) } pub async fn kv_put_json(&self, key: &str, value: &T) -> Result<()> { debug!("kv_put_json {}", key); let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); let http = self.client.put(&url).json(value).send().await?; http.error_for_status()?; Ok(()) } pub async fn kv_delete(&self, key: &str) -> Result<()> { let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key); let http = self.client.delete(&url).send().await?; http.error_for_status()?; Ok(()) } // ---- Locking ---- pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result { debug!("create_session {:?}", req); let url = format!("{}/v1/session/create", self.url); let http = self.client.put(&url).json(req).send().await?; let resp: ConsulSessionResponse = http.json().await?; Ok(resp.id) } pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result { debug!("acquire {}", key); let url = format!( "{}/v1/kv/{}{}?acquire={}", self.url, self.kv_prefix, key, session ); let http = self.client.put(&url).body(bytes).send().await?; let resp: bool = http.json().await?; Ok(resp) } pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> { debug!("release {}", key); let url = format!( "{}/v1/kv/{}{}?release={}", self.url, self.kv_prefix, key, session ); let http = self.client.put(&url).body(bytes).send().await?; http.error_for_status()?; Ok(()) } }