Locking to avoid flooding Let's encrypt

This commit is contained in:
Alex 2021-12-07 17:05:25 +01:00
parent ccb4e87658
commit bb77e7c459
No known key found for this signature in database
GPG key ID: EDABF9711E244EB1
6 changed files with 99 additions and 18 deletions

7
Cargo.lock generated
View file

@ -1799,7 +1799,6 @@ dependencies = [
"tokio 1.14.0", "tokio 1.14.0",
"tokio-rustls", "tokio-rustls",
"unicase", "unicase",
"uuid",
] ]
[[package]] [[package]]
@ -1875,12 +1874,6 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "uuid"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
[[package]] [[package]]
name = "vcpkg" name = "vcpkg"
version = "0.2.15" version = "0.2.15"

View file

@ -19,7 +19,6 @@ serde_json = "1.0.53"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
bytes = "1" bytes = "1"
acme-micro = "0.12" acme-micro = "0.12"
uuid = "0.8"
rustls = "0.20" rustls = "0.20"
rustls-pemfile = "0.2" rustls-pemfile = "0.2"
chrono = { version = "0.4", features = [ "serde" ] } chrono = { version = "0.4", features = [ "serde" ] }

View file

@ -13,7 +13,7 @@ use acme_micro::{Directory, DirectoryUrl};
use rustls::sign::CertifiedKey; use rustls::sign::CertifiedKey;
use crate::cert::{Cert, CertSer}; use crate::cert::{Cert, CertSer};
use crate::consul::Consul; use crate::consul::*;
use crate::proxy_config::ProxyConfig; use crate::proxy_config::ProxyConfig;
pub struct CertStore { pub struct CertStore {
@ -86,6 +86,30 @@ impl CertStore {
pub async fn renew_cert(self: &Arc<Self>, domain: &str) -> Result<Arc<Cert>> { pub async fn renew_cert(self: &Arc<Self>, domain: &str) -> Result<Arc<Cert>> {
info!("Renewing certificate for {}", domain); info!("Renewing certificate for {}", domain);
// ---- Acquire lock ----
let lock_path = format!("renew_lock/{}", domain);
let lock_name = format!("tricot/renew:{}@{}", domain, self.consul.local_node.clone());
let session = self
.consul
.create_session(&ConsulSessionRequest {
name: lock_name.clone(),
node: Some(self.consul.local_node.clone()),
lock_delay: Some("30s".into()),
ttl: Some("1m".into()),
behavior: Some("delete".into()),
})
.await?;
if !self
.consul
.acquire(&lock_path, lock_name.clone().into(), &session)
.await?
{
bail!("Lock is already taken, not renewing for now.");
}
// ---- Do let's encrypt stuff ----
let dir = Directory::from_url(DirectoryUrl::LetsEncrypt)?; let dir = Directory::from_url(DirectoryUrl::LetsEncrypt)?;
let contact = vec!["mailto:alex@adnab.me".to_string()]; let contact = vec!["mailto:alex@adnab.me".to_string()];
@ -149,6 +173,7 @@ impl CertStore {
self.consul self.consul
.kv_put_json(&format!("certs/{}", domain), &certser) .kv_put_json(&format!("certs/{}", domain), &certser)
.await?; .await?;
self.consul.release(&lock_path, "".into(), &session).await?;
let cert = Arc::new(Cert::new(certser)?); let cert = Arc::new(Cert::new(certser)?);
self.certs self.certs

View file

@ -26,21 +26,52 @@ pub struct ConsulNodeCatalog {
pub services: HashMap<String, ConsulServiceEntry>, pub services: HashMap<String, ConsulServiceEntry>,
} }
// ---- Consul session management ----
#[derive(Serialize, Deserialize, Debug)]
pub struct ConsulSessionRequest {
#[serde(rename = "Name")]
pub name: String,
#[serde(rename = "Node")]
pub node: Option<String>,
#[serde(rename = "LockDelay")]
pub lock_delay: Option<String>,
#[serde(rename = "TTL")]
pub ttl: Option<String>,
#[serde(rename = "Behavior")]
pub behavior: Option<String>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ConsulSessionResponse {
#[serde(rename = "ID")]
pub id: String,
}
#[derive(Clone)] #[derive(Clone)]
pub struct Consul { pub struct Consul {
client: reqwest::Client, client: reqwest::Client,
url: String, url: String,
kv_prefix: String, kv_prefix: String,
idx: Option<u64>, idx: Option<u64>,
pub local_node: String,
} }
impl Consul { impl Consul {
pub fn new(url: &str, kv_prefix: &str) -> Self { pub fn new(url: &str, kv_prefix: &str, local_node: &str) -> Self {
return Self { return Self {
client: reqwest::Client::new(), client: reqwest::Client::new(),
url: url.to_string(), url: url.to_string(),
kv_prefix: kv_prefix.to_string(), kv_prefix: kv_prefix.to_string(),
idx: None, idx: None,
local_node: local_node.into(),
}; };
} }
@ -64,6 +95,8 @@ impl Consul {
return Ok(resp); return Ok(resp);
} }
// ---- KV get and put ----
pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> { pub async fn kv_get(&self, key: &str) -> Result<Option<Bytes>> {
debug!("kv_get {}", key); debug!("kv_get {}", key);
@ -118,4 +151,39 @@ impl Consul {
http.error_for_status()?; http.error_for_status()?;
Ok(()) Ok(())
} }
// ---- Locking ----
pub async fn create_session(&self, req: &ConsulSessionRequest) -> Result<String> {
debug!("create_session {:?}", req);
let url = format!("{}/v1/session/create", self.url);
let http = self.client.put(&url).json(req).send().await?;
let resp: ConsulSessionResponse = http.json().await?;
Ok(resp.id)
}
pub async fn acquire(&self, key: &str, bytes: Bytes, session: &str) -> Result<bool> {
debug!("acquire {}", key);
let url = format!(
"{}/v1/kv/{}{}?acquire={}",
self.url, self.kv_prefix, key, session
);
let http = self.client.put(&url).body(bytes).send().await?;
let resp: bool = http.json().await?;
Ok(resp)
}
pub async fn release(&self, key: &str, bytes: Bytes, session: &str) -> Result<()> {
debug!("release {}", key);
let url = format!(
"{}/v1/kv/{}{}?release={}",
self.url, self.kv_prefix, key, session
);
let http = self.client.put(&url).body(bytes).send().await?;
http.error_for_status()?;
Ok(())
}
} }

View file

@ -19,8 +19,8 @@ async fn main() {
pretty_env_logger::init(); pretty_env_logger::init();
info!("Starting Tricot"); info!("Starting Tricot");
let consul = consul::Consul::new("http://10.42.0.21:8500", "tricot/"); let consul = consul::Consul::new("http://10.42.0.21:8500", "tricot/", "carcajou");
let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone(), "carcajou"); let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone());
let cert_store = cert_store::CertStore::new(consul.clone()); let cert_store = cert_store::CertStore::new(consul.clone());
tokio::spawn( tokio::spawn(

View file

@ -82,18 +82,14 @@ fn parse_consul_catalog(catalog: &ConsulNodeCatalog) -> ProxyConfig {
ProxyConfig { entries } ProxyConfig { entries }
} }
pub fn spawn_proxy_config_task( pub fn spawn_proxy_config_task(mut consul: Consul) -> watch::Receiver<Arc<ProxyConfig>> {
mut consul: Consul,
node: &str,
) -> watch::Receiver<Arc<ProxyConfig>> {
let (tx, rx) = watch::channel(Arc::new(ProxyConfig { let (tx, rx) = watch::channel(Arc::new(ProxyConfig {
entries: Vec::new(), entries: Vec::new(),
})); }));
let node = node.to_string();
tokio::spawn(async move { tokio::spawn(async move {
let mut retries = 0; let mut retries = 0;
let node = consul.local_node.clone();
loop { loop {
let catalog = match consul.watch_node(&node).await { let catalog = match consul.watch_node(&node).await {