Externalise Consul module to df-consul crate

This commit is contained in:
Alex 2022-12-07 14:28:29 +01:00
parent 752593e274
commit fad172e54a
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
6 changed files with 33 additions and 272 deletions

14
Cargo.lock generated
View file

@ -395,6 +395,19 @@ dependencies = [
"num_cpus", "num_cpus",
] ]
[[package]]
name = "df-consul"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ec5111d5daddfbab491780564dc8270c29c90553fab98d677489600a8623c76"
dependencies = [
"anyhow",
"bytes",
"log",
"reqwest",
"serde",
]
[[package]] [[package]]
name = "dhat" name = "dhat"
version = "0.3.2" version = "0.3.2"
@ -1945,6 +1958,7 @@ dependencies = [
"async-compression", "async-compression",
"bytes", "bytes",
"chrono", "chrono",
"df-consul",
"dhat", "dhat",
"envy", "envy",
"futures", "futures",

View file

@ -37,6 +37,7 @@ uuid = { version = "1.2", features = ["v4"] }
opentelemetry = "0.17" opentelemetry = "0.17"
opentelemetry-prometheus = "0.10" opentelemetry-prometheus = "0.10"
prometheus = "0.13" prometheus = "0.13"
df-consul = "0.1.0"
dhat = { version = "0.3", optional = true } dhat = { version = "0.3", optional = true }

View file

@ -20,6 +20,7 @@ use crate::proxy_config::*;
pub struct CertStore { pub struct CertStore {
consul: Consul, consul: Consul,
node_name: String,
letsencrypt_email: String, letsencrypt_email: String,
certs: RwLock<HashMap<String, Arc<Cert>>>, certs: RwLock<HashMap<String, Arc<Cert>>>,
self_signed_certs: RwLock<HashMap<String, Arc<Cert>>>, self_signed_certs: RwLock<HashMap<String, Arc<Cert>>>,
@ -30,6 +31,7 @@ pub struct CertStore {
impl CertStore { impl CertStore {
pub fn new( pub fn new(
consul: Consul, consul: Consul,
node_name: String,
rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>, rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
letsencrypt_email: String, letsencrypt_email: String,
exit_on_err: impl Fn(anyhow::Error) + Send + 'static, exit_on_err: impl Fn(anyhow::Error) + Send + 'static,
@ -38,6 +40,7 @@ impl CertStore {
let cert_store = Arc::new(Self { let cert_store = Arc::new(Self {
consul, consul,
node_name,
certs: RwLock::new(HashMap::new()), certs: RwLock::new(HashMap::new()),
self_signed_certs: RwLock::new(HashMap::new()), self_signed_certs: RwLock::new(HashMap::new()),
rx_proxy_config, rx_proxy_config,
@ -190,7 +193,7 @@ impl CertStore {
// that delay expires // that delay expires
let lock_path = format!("renew_lock/{}", domain); let lock_path = format!("renew_lock/{}", domain);
let lock_name = format!("tricot/renew:{}@{}", domain, self.consul.local_node.clone()); let lock_name = format!("tricot/renew:{}@{}", domain, self.node_name);
let session = self let session = self
.consul .consul
.create_session(&ConsulSessionRequest { .create_session(&ConsulSessionRequest {

View file

@ -1,261 +0,0 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use anyhow::{bail, Result};
use bytes::Bytes;
use log::*;
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
pub struct ConsulConfig {
pub addr: String,
pub ca_cert: Option<String>,
pub tls_skip_verify: bool,
pub client_cert: Option<String>,
pub client_key: Option<String>,
}
// ---- Watch and retrieve Consul catalog ----
//
#[derive(Serialize, Deserialize, Debug)]
pub struct ConsulNode {
#[serde(rename = "Node")]
pub node: String,
#[serde(rename = "Address")]
pub address: String,
#[serde(rename = "Meta")]
pub meta: HashMap<String, String>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ConsulServiceEntry {
#[serde(rename = "Service")]
pub service: String,
#[serde(rename = "Address")]
pub address: String,
#[serde(rename = "Port")]
pub port: u16,
#[serde(rename = "Tags")]
pub tags: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ConsulNodeCatalog {
#[serde(rename = "Node")]
pub node: ConsulNode,
#[serde(rename = "Services")]
pub services: HashMap<String, ConsulServiceEntry>,
}
// ---- Consul session management ----
#[derive(Serialize, Deserialize, Debug)]
pub struct ConsulSessionRequest {
#[serde(rename = "Name")]
pub name: String,
#[serde(rename = "Node")]
pub node: Option<String>,
#[serde(rename = "LockDelay")]
pub lock_delay: Option<String>,
#[serde(rename = "TTL")]
pub ttl: Option<String>,
#[serde(rename = "Behavior")]
pub behavior: Option<String>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ConsulSessionResponse {
#[serde(rename = "ID")]
pub id: String,
}
#[derive(Clone)]
pub struct Consul {
client: reqwest::Client,
url: String,
kv_prefix: String,
pub local_node: String,
}
impl Consul {
pub fn new(config: ConsulConfig, kv_prefix: &str, local_node: &str) -> Result<Self> {
let client = match (&config.client_cert, &config.client_key) {
(Some(client_cert), Some(client_key)) => {
let mut client_cert_buf = vec![];
File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
let mut client_key_buf = vec![];
File::open(client_key)?.read_to_end(&mut client_key_buf)?;
let identity = reqwest::Identity::from_pem(
&[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
)?;
if config.tls_skip_verify {
reqwest::Client::builder()
.use_rustls_tls()
.danger_accept_invalid_certs(true)
.identity(identity)
.build()?
} else if let Some(ca_cert) = &config.ca_cert {
let mut ca_cert_buf = vec![];
File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
reqwest::Client::builder()
.use_rustls_tls()
.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
.identity(identity)
.build()?
} else {
reqwest::Client::builder()
.use_rustls_tls()
.identity(identity)
.build()?
}
}
(None, None) => reqwest::Client::new(),
_ => bail!("Incomplete Consul TLS configuration parameters"),
};
Ok(Self {
client,
url: config.addr.trim_end_matches('/').to_string(),
kv_prefix: kv_prefix.to_string(),
local_node: local_node.into(),
})
}
pub async fn list_nodes(&self) -> Result<Vec<ConsulNode>> {
debug!("list_nodes");
let url = format!("{}/v1/catalog/nodes", self.url);
let http = self.client.get(&url).send().await?;
let resp: Vec<ConsulNode> = http.json().await?;
Ok(resp)
}
pub async fn watch_node(
&self,
host: &str,
idx: Option<usize>,
) -> Result<(ConsulNodeCatalog, usize)> {
debug!("watch_node {} {:?}", host, idx);
let url = match idx {
Some(i) => format!("{}/v1/catalog/node/{}?index={}", self.url, host, i),
None => format!("{}/v1/catalog/node/{}", self.url, host),
};
let http = self.client.get(&url).send().await?;
let new_idx = match http.headers().get("X-Consul-Index") {
Some(v) => v.to_str()?.parse::<usize>()?,
None => bail!("X-Consul-Index header not found"),
};
let resp: ConsulNodeCatalog = http.json().await?;
Ok((resp, new_idx))
}
// ---- KV get and put ----
pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
debug!("kv_get {}", key);
let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
let http = self.client.get(&url).send().await?;
match http.status() {
StatusCode::OK => Ok(Some(http.bytes().await?)),
StatusCode::NOT_FOUND => Ok(None),
_ => Err(anyhow!(
"Consul request failed: {:?}",
http.error_for_status()
)),
}
}
pub async fn kv_get_json<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<Option<T>> {
debug!("kv_get_json {}", key);
let url = format!("{}/v1/kv/{}{}?raw", self.url, self.kv_prefix, key);
let http = self.client.get(&url).send().await?;
match http.status() {
StatusCode::OK => Ok(Some(http.json().await?)),
StatusCode::NOT_FOUND => Ok(None),
_ => Err(anyhow!(
"Consul request failed: {:?}",
http.error_for_status()
)),
}
}
pub async fn kv_put(&self, key: &str, bytes: Bytes) -> Result<()> {
debug!("kv_put {}", key);
let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
let http = self.client.put(&url).body(bytes).send().await?;
http.error_for_status()?;
Ok(())
}
pub async fn kv_put_json<T: Serialize>(&self, key: &str, value: &T) -> Result<()> {
debug!("kv_put_json {}", key);
let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
let http = self.client.put(&url).json(value).send().await?;
http.error_for_status()?;
Ok(())
}
pub async fn kv_delete(&self, key: &str) -> Result<()> {
let url = format!("{}/v1/kv/{}{}", self.url, self.kv_prefix, key);
let http = self.client.delete(&url).send().await?;
http.error_for_status()?;
Ok(())
}
// ---- Locking ----
pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result<String> {
debug!("create_session {:?}", req);
let url = format!("{}/v1/session/create", self.url);
let http = self.client.put(&url).json(req).send().await?;
let resp: ConsulSessionResponse = http.json().await?;
Ok(resp.id)
}
pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result<bool> {
debug!("acquire {}", key);
let url = format!(
"{}/v1/kv/{}{}?acquire={}",
self.url, self.kv_prefix, key, session
);
let http = self.client.put(&url).body(bytes).send().await?;
let resp: bool = http.json().await?;
Ok(resp)
}
pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> {
debug!("release {}", key);
let url = format!(
"{}/v1/kv/{}{}?release={}",
self.url, self.kv_prefix, key, session
);
let http = self.client.put(&url).body(bytes).send().await?;
http.error_for_status()?;
Ok(())
}
}

View file

@ -13,7 +13,6 @@ use tokio::sync::watch;
mod cert; mod cert;
mod cert_store; mod cert_store;
mod consul;
mod http; mod http;
mod https; mod https;
mod metrics; mod metrics;
@ -21,6 +20,7 @@ mod proxy_config;
mod reverse_proxy; mod reverse_proxy;
mod tls_util; mod tls_util;
pub use df_consul as consul;
use proxy_config::ProxyConfig; use proxy_config::ProxyConfig;
#[cfg(feature = "dhat-heap")] #[cfg(feature = "dhat-heap")]
@ -139,13 +139,17 @@ async fn main() {
client_key: opt.consul_client_key.clone(), client_key: opt.consul_client_key.clone(),
}; };
let consul = consul::Consul::new(consul_config, &opt.consul_kv_prefix, &opt.node_name) let consul = consul::Consul::new(consul_config, &opt.consul_kv_prefix)
.expect("Error creating Consul client"); .expect("Error creating Consul client");
let rx_proxy_config = let rx_proxy_config = proxy_config::spawn_proxy_config_task(
proxy_config::spawn_proxy_config_task(consul.clone(), exit_signal.clone()); consul.clone(),
opt.node_name.clone(),
exit_signal.clone(),
);
let cert_store = cert_store::CertStore::new( let cert_store = cert_store::CertStore::new(
consul.clone(), consul.clone(),
opt.node_name.clone(),
rx_proxy_config.clone(), rx_proxy_config.clone(),
opt.letsencrypt_email.clone(), opt.letsencrypt_email.clone(),
exit_on_err.clone(), exit_on_err.clone(),

View file

@ -252,6 +252,7 @@ struct NodeWatchState {
pub fn spawn_proxy_config_task( pub fn spawn_proxy_config_task(
consul: Consul, consul: Consul,
local_node: String,
mut must_exit: watch::Receiver<bool>, mut must_exit: watch::Receiver<bool>,
) -> watch::Receiver<Arc<ProxyConfig>> { ) -> watch::Receiver<Arc<ProxyConfig>> {
let (tx, rx) = watch::channel(Arc::new(ProxyConfig { let (tx, rx) = watch::channel(Arc::new(ProxyConfig {
@ -354,12 +355,11 @@ pub fn spawn_proxy_config_task(
let mut entries = vec![]; let mut entries = vec![];
for (node_name, watch_state) in nodes.iter() { for (node_name, watch_state) in nodes.iter() {
if let Some(catalog) = &watch_state.last_catalog { if let Some(catalog) = &watch_state.last_catalog {
let same_node = *node_name == consul.local_node; let same_node = *node_name == local_node;
let same_site = let same_site = match (node_site.get(node_name), node_site.get(&local_node)) {
match (node_site.get(node_name), node_site.get(&consul.local_node)) { (Some(s1), Some(s2)) => s1 == s2,
(Some(s1), Some(s2)) => s1 == s2, _ => false,
_ => false, };
};
entries.extend(parse_consul_catalog(catalog, same_node, same_site)); entries.extend(parse_consul_catalog(catalog, same_node, same_site));
} }