Compare commits
23 commits
Author | SHA1 | Date | |
---|---|---|---|
9bb505d977 | |||
dff06115cc | |||
d31212e56b | |||
5a326222c4 | |||
388d5b2275 | |||
b9b035034f | |||
753903ee02 | |||
ca449ebff4 | |||
b04c2bfb0a | |||
b76b6dcbcc | |||
2b3f934247 | |||
f11592926b | |||
de72d6037f | |||
14325395f6 | |||
e943fd3772 | |||
ec247ec4c6 | |||
f211085cd2 | |||
4818e8cccc | |||
46d517b2f7 | |||
f54ab6e51c | |||
30ce3a97c9 | |||
a3602eac82 | |||
b5e8d1fcd8 |
15 changed files with 2406 additions and 2127 deletions
|
@ -1,9 +1,10 @@
|
||||||
---
|
when:
|
||||||
kind: pipeline
|
event:
|
||||||
name: default
|
- push
|
||||||
|
- pull_request
|
||||||
node:
|
- tag
|
||||||
nix-daemon: 1
|
- cron
|
||||||
|
- manual
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: check formatting
|
- name: check formatting
|
||||||
|
@ -23,18 +24,3 @@ steps:
|
||||||
commands:
|
commands:
|
||||||
- nix build --extra-experimental-features nix-command --extra-experimental-features flakes .#test.x86_64-linux.tricot
|
- nix build --extra-experimental-features nix-command --extra-experimental-features flakes .#test.x86_64-linux.tricot
|
||||||
- ./result-bin/bin/tricot-*
|
- ./result-bin/bin/tricot-*
|
||||||
|
|
||||||
trigger:
|
|
||||||
event:
|
|
||||||
- custom
|
|
||||||
- push
|
|
||||||
- pull_request
|
|
||||||
- tag
|
|
||||||
- cron
|
|
||||||
|
|
||||||
|
|
||||||
---
|
|
||||||
kind: signature
|
|
||||||
hmac: 49cde53ec25364cc3b3f041092c8e658fe9252342253757d86814ca12d5cb0f7
|
|
||||||
|
|
||||||
...
|
|
1166
Cargo.lock
generated
1166
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
18
Cargo.toml
18
Cargo.toml
|
@ -10,8 +10,8 @@ authors = ["Alex Auvolat <alex@adnab.me>"]
|
||||||
anyhow = "1.0.66"
|
anyhow = "1.0.66"
|
||||||
envy = "0.4"
|
envy = "0.4"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
log = "0.4"
|
tracing = { version = "0.1.30" }
|
||||||
pretty_env_logger = "0.4"
|
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||||
regex = "1"
|
regex = "1"
|
||||||
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls-manual-roots" ] }
|
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls-manual-roots" ] }
|
||||||
serde = { version = "1.0.149", features = ["derive"] }
|
serde = { version = "1.0.149", features = ["derive"] }
|
||||||
|
@ -19,23 +19,23 @@ serde_json = "1.0.89"
|
||||||
tokio = { version = "1.22", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
|
tokio = { version = "1.22", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
acme-micro = "0.12"
|
acme-micro = "0.12"
|
||||||
rustls = { version = "0.20", features = [ "dangerous_configuration" ] }
|
rustls = { version = "0.21", features = [ "dangerous_configuration" ] }
|
||||||
rustls-pemfile = "1.0"
|
rustls-pemfile = "1.0"
|
||||||
chrono = { version = "0.4", features = [ "serde" ] }
|
chrono = { version = "0.4", features = [ "serde" ] }
|
||||||
hyper = { version = "0.14", features = [ "full" ] }
|
hyper = { version = "0.14", features = [ "full" ] }
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
tokio-rustls = "0.23"
|
tokio-rustls = "0.24"
|
||||||
hyper-rustls = "0.23"
|
hyper-rustls = "0.24"
|
||||||
http = "0.2"
|
http = "0.2"
|
||||||
structopt = "0.3"
|
structopt = "0.3"
|
||||||
glob = "0.3"
|
glob = "0.3"
|
||||||
rcgen = "0.10"
|
rcgen = "0.11"
|
||||||
accept-encoding-fork = "0.2.0-alpha.3"
|
accept-encoding-fork = "0.2.0-alpha.3"
|
||||||
async-compression = { version = "0.3", features = ["tokio", "gzip", "zstd", "deflate", "brotli"] }
|
async-compression = { version = "0.4", features = ["tokio", "gzip", "zstd", "deflate", "brotli"] }
|
||||||
tokio-util = { version = "0.7", features = ["io"] }
|
tokio-util = { version = "0.7", features = ["io"] }
|
||||||
uuid = { version = "1.2", features = ["v4"] }
|
uuid = { version = "1.2", features = ["v4"] }
|
||||||
opentelemetry = "0.17"
|
opentelemetry = "0.20"
|
||||||
opentelemetry-prometheus = "0.10"
|
opentelemetry-prometheus = "0.13"
|
||||||
prometheus = "0.13"
|
prometheus = "0.13"
|
||||||
df-consul = "0.3.5"
|
df-consul = "0.3.5"
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
FROM rust:1.65-buster as builder
|
FROM rust:1.68-buster as builder
|
||||||
|
|
||||||
RUN apt-get update && \
|
RUN apt-get update && \
|
||||||
apt-get install -y libssl-dev pkg-config
|
apt-get install -y libssl-dev pkg-config
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
# Tricot
|
# Tricot
|
||||||
|
|
||||||
[![Build Status](https://drone.deuxfleurs.fr/api/badges/Deuxfleurs/tricot/status.svg)](https://drone.deuxfleurs.fr/Deuxfleurs/tricot)
|
[![status-badge](https://woodpecker.deuxfleurs.fr/api/badges/36/status.svg)](https://woodpecker.deuxfleurs.fr/repos/36)
|
||||||
|
|
||||||
Tricot is a reverse-proxy for exposing your services via TLS that integrates well with Consul and Nomad.
|
Tricot is a reverse-proxy for exposing your services via TLS that integrates well with Consul and Nomad.
|
||||||
|
|
||||||
|
@ -42,6 +42,7 @@ Backends are configured by adding tags of the following form to the services in
|
||||||
- `tricot myapp.example.com/path/to_subresource 10`: combining the previous two examples
|
- `tricot myapp.example.com/path/to_subresource 10`: combining the previous two examples
|
||||||
- `tricot-https myapp.example.com`: same, but indicates that the backend service handling the request expects an HTTPS request and not an HTTP request. In this case, Tricot will do everything in its power to NOT verify the backend's TLS certificate (ignore self-signed certificate, ignore TLS hostname, etc).
|
- `tricot-https myapp.example.com`: same, but indicates that the backend service handling the request expects an HTTPS request and not an HTTP request. In this case, Tricot will do everything in its power to NOT verify the backend's TLS certificate (ignore self-signed certificate, ignore TLS hostname, etc).
|
||||||
- `tricot-add-header Access-Control-Allow-Origin *`: add the `Access-Control-Allow-Origin: *` header to all of the HTTP responses when they are proxied back to the client
|
- `tricot-add-header Access-Control-Allow-Origin *`: add the `Access-Control-Allow-Origin: *` header to all of the HTTP responses when they are proxied back to the client
|
||||||
|
- `tricot-add-redirect old.example.com/maybe_subpath new.example.com/new/subpath 301`: redirects paths that match the first pattern to the second pattern with the given HTTP status code. More info in [PR#10](https://git.deuxfleurs.fr/Deuxfleurs/tricot/pulls/10).
|
||||||
- `tricot-global-lb`: load-balance incoming requests to all matching backends
|
- `tricot-global-lb`: load-balance incoming requests to all matching backends
|
||||||
- `tricot-site-lb`: load-balance incoming requests to all matching backends that are in the same site (geographical location); when site information about nodes is not available, this is equivalent to `tricot-global-lb`
|
- `tricot-site-lb`: load-balance incoming requests to all matching backends that are in the same site (geographical location); when site information about nodes is not available, this is equivalent to `tricot-global-lb`
|
||||||
|
|
||||||
|
|
67
flake.lock
67
flake.lock
|
@ -55,7 +55,41 @@
|
||||||
"type": "github"
|
"type": "github"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"flake-utils_2": {
|
||||||
|
"inputs": {
|
||||||
|
"systems": "systems"
|
||||||
|
},
|
||||||
|
"locked": {
|
||||||
|
"lastModified": 1694529238,
|
||||||
|
"narHash": "sha256-zsNZZGTGnMOf9YpHKJqMSsa0dXbfmxeoJ7xHlrt+xmY=",
|
||||||
|
"owner": "numtide",
|
||||||
|
"repo": "flake-utils",
|
||||||
|
"rev": "ff7b65b44d01cf9ba6a71320833626af21126384",
|
||||||
|
"type": "github"
|
||||||
|
},
|
||||||
|
"original": {
|
||||||
|
"owner": "numtide",
|
||||||
|
"repo": "flake-utils",
|
||||||
|
"type": "github"
|
||||||
|
}
|
||||||
|
},
|
||||||
"nixpkgs": {
|
"nixpkgs": {
|
||||||
|
"locked": {
|
||||||
|
"lastModified": 1696234590,
|
||||||
|
"narHash": "sha256-mgOzQYTvaTT4bFopVOadlndy2RPwLy60rDjIWOGujwo=",
|
||||||
|
"owner": "NixOS",
|
||||||
|
"repo": "nixpkgs",
|
||||||
|
"rev": "f902cb49892d300ff15cb237e48aa1cad79d68c3",
|
||||||
|
"type": "github"
|
||||||
|
},
|
||||||
|
"original": {
|
||||||
|
"owner": "NixOS",
|
||||||
|
"ref": "nixpkgs-unstable",
|
||||||
|
"repo": "nixpkgs",
|
||||||
|
"type": "github"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nixpkgs_2": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1665657542,
|
"lastModified": 1665657542,
|
||||||
"narHash": "sha256-mojxNyzbvmp8NtVtxqiHGhRfjCALLfk9i/Uup68Y5q8=",
|
"narHash": "sha256-mojxNyzbvmp8NtVtxqiHGhRfjCALLfk9i/Uup68Y5q8=",
|
||||||
|
@ -74,26 +108,20 @@
|
||||||
"root": {
|
"root": {
|
||||||
"inputs": {
|
"inputs": {
|
||||||
"cargo2nix": "cargo2nix",
|
"cargo2nix": "cargo2nix",
|
||||||
"nixpkgs": "nixpkgs"
|
"nixpkgs": "nixpkgs_2"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"rust-overlay": {
|
"rust-overlay": {
|
||||||
"inputs": {
|
"inputs": {
|
||||||
"flake-utils": [
|
"flake-utils": "flake-utils_2",
|
||||||
"cargo2nix",
|
"nixpkgs": "nixpkgs"
|
||||||
"flake-utils"
|
|
||||||
],
|
|
||||||
"nixpkgs": [
|
|
||||||
"cargo2nix",
|
|
||||||
"nixpkgs"
|
|
||||||
]
|
|
||||||
},
|
},
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1664247556,
|
"lastModified": 1682389182,
|
||||||
"narHash": "sha256-J4vazHU3609ekn7dr+3wfqPo5WGlZVAgV7jfux352L0=",
|
"narHash": "sha256-8t2nmFnH+8V48+IJsf8AK51ebXNlVbOSVYOpiqJKvJE=",
|
||||||
"owner": "oxalica",
|
"owner": "oxalica",
|
||||||
"repo": "rust-overlay",
|
"repo": "rust-overlay",
|
||||||
"rev": "524db9c9ea7bc7743bb74cdd45b6d46ea3fcc2ab",
|
"rev": "74f1a64dd28faeeb85ef081f32cad2989850322c",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
|
@ -101,6 +129,21 @@
|
||||||
"repo": "rust-overlay",
|
"repo": "rust-overlay",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
"systems": {
|
||||||
|
"locked": {
|
||||||
|
"lastModified": 1681028828,
|
||||||
|
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
||||||
|
"owner": "nix-systems",
|
||||||
|
"repo": "default",
|
||||||
|
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
||||||
|
"type": "github"
|
||||||
|
},
|
||||||
|
"original": {
|
||||||
|
"owner": "nix-systems",
|
||||||
|
"repo": "default",
|
||||||
|
"type": "github"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"root": "root",
|
"root": "root",
|
||||||
|
|
|
@ -5,6 +5,11 @@
|
||||||
inputs.cargo2nix = {
|
inputs.cargo2nix = {
|
||||||
# As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection
|
# As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection
|
||||||
url = "github:Alexis211/cargo2nix/a7a61179b66054904ef6a195d8da736eaaa06c36";
|
url = "github:Alexis211/cargo2nix/a7a61179b66054904ef6a195d8da736eaaa06c36";
|
||||||
|
|
||||||
|
# Rust overlay as of 2023-04-25
|
||||||
|
inputs.rust-overlay.url =
|
||||||
|
"github:oxalica/rust-overlay/74f1a64dd28faeeb85ef081f32cad2989850322c";
|
||||||
|
|
||||||
inputs.nixpkgs.follows = "nixpkgs";
|
inputs.nixpkgs.follows = "nixpkgs";
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -15,7 +20,7 @@
|
||||||
overlays = [ cargo2nix.overlays.default ];
|
overlays = [ cargo2nix.overlays.default ];
|
||||||
};
|
};
|
||||||
packageFun = import ./Cargo.nix;
|
packageFun = import ./Cargo.nix;
|
||||||
rustVersion = "1.63.0";
|
rustVersion = "1.68.0";
|
||||||
|
|
||||||
compile = args: compileMode:
|
compile = args: compileMode:
|
||||||
let
|
let
|
||||||
|
|
|
@ -5,10 +5,10 @@ use std::time::{Duration, Instant};
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use futures::{FutureExt, TryFutureExt};
|
use futures::{FutureExt, TryFutureExt};
|
||||||
use log::*;
|
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
use tokio::task::block_in_place;
|
use tokio::task::block_in_place;
|
||||||
|
use tracing::*;
|
||||||
|
|
||||||
use acme_micro::create_p384_key;
|
use acme_micro::create_p384_key;
|
||||||
use acme_micro::{Directory, DirectoryUrl};
|
use acme_micro::{Directory, DirectoryUrl};
|
||||||
|
@ -22,12 +22,19 @@ pub struct CertStore {
|
||||||
consul: Consul,
|
consul: Consul,
|
||||||
node_name: String,
|
node_name: String,
|
||||||
letsencrypt_email: String,
|
letsencrypt_email: String,
|
||||||
|
|
||||||
certs: RwLock<HashMap<String, Arc<Cert>>>,
|
certs: RwLock<HashMap<String, Arc<Cert>>>,
|
||||||
self_signed_certs: RwLock<HashMap<String, Arc<Cert>>>,
|
self_signed_certs: RwLock<HashMap<String, Arc<Cert>>>,
|
||||||
|
|
||||||
rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
|
rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
|
||||||
tx_need_cert: mpsc::UnboundedSender<String>,
|
tx_need_cert: mpsc::UnboundedSender<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ProcessedDomains {
|
||||||
|
static_domains: HashSet<String>,
|
||||||
|
on_demand_domains: Vec<(glob::Pattern, Option<String>)>,
|
||||||
|
}
|
||||||
|
|
||||||
impl CertStore {
|
impl CertStore {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
consul: Consul,
|
consul: Consul,
|
||||||
|
@ -41,10 +48,10 @@ impl CertStore {
|
||||||
let cert_store = Arc::new(Self {
|
let cert_store = Arc::new(Self {
|
||||||
consul,
|
consul,
|
||||||
node_name,
|
node_name,
|
||||||
|
letsencrypt_email,
|
||||||
certs: RwLock::new(HashMap::new()),
|
certs: RwLock::new(HashMap::new()),
|
||||||
self_signed_certs: RwLock::new(HashMap::new()),
|
self_signed_certs: RwLock::new(HashMap::new()),
|
||||||
rx_proxy_config,
|
rx_proxy_config,
|
||||||
letsencrypt_email,
|
|
||||||
tx_need_cert: tx,
|
tx_need_cert: tx,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -66,46 +73,72 @@ impl CertStore {
|
||||||
let mut rx_proxy_config = self.rx_proxy_config.clone();
|
let mut rx_proxy_config = self.rx_proxy_config.clone();
|
||||||
|
|
||||||
let mut t_last_check: HashMap<String, Instant> = HashMap::new();
|
let mut t_last_check: HashMap<String, Instant> = HashMap::new();
|
||||||
|
let mut proc_domains: Option<ProcessedDomains> = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut domains: HashSet<String> = HashSet::new();
|
let domains = select! {
|
||||||
|
// Refresh some internal states, schedule static_domains for renew
|
||||||
select! {
|
|
||||||
res = rx_proxy_config.changed() => {
|
res = rx_proxy_config.changed() => {
|
||||||
if res.is_err() {
|
if res.is_err() {
|
||||||
bail!("rx_proxy_config closed");
|
bail!("rx_proxy_config closed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut static_domains: HashSet<String> = HashSet::new();
|
||||||
|
let mut on_demand_domains: Vec<(glob::Pattern, Option<String>)> = vec![];
|
||||||
|
|
||||||
let proxy_config: Arc<ProxyConfig> = rx_proxy_config.borrow().clone();
|
let proxy_config: Arc<ProxyConfig> = rx_proxy_config.borrow().clone();
|
||||||
|
|
||||||
for ent in proxy_config.entries.iter() {
|
for ent in proxy_config.entries.iter() {
|
||||||
if let HostDescription::Hostname(domain) = &ent.host {
|
// Eagerly generate certificates for domains that
|
||||||
|
// are not patterns
|
||||||
|
match &ent.url_prefix.host {
|
||||||
|
HostDescription::Hostname(domain) => {
|
||||||
if let Some((host, _port)) = domain.split_once(':') {
|
if let Some((host, _port)) = domain.split_once(':') {
|
||||||
domains.insert(host.to_string());
|
static_domains.insert(host.to_string());
|
||||||
} else {
|
} else {
|
||||||
domains.insert(domain.clone());
|
static_domains.insert(domain.clone());
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
}
|
HostDescription::Pattern(pattern) => {
|
||||||
}
|
on_demand_domains.push((pattern.clone(), ent.on_demand_tls_ask.clone()));
|
||||||
need_cert = rx_need_cert.recv() => {
|
},
|
||||||
match need_cert {
|
|
||||||
Some(dom) => {
|
|
||||||
domains.insert(dom);
|
|
||||||
while let Ok(dom2) = rx_need_cert.try_recv() {
|
|
||||||
domains.insert(dom2);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => bail!("rx_need_cert closed"),
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// only static_domains are refreshed
|
||||||
|
proc_domains = Some(ProcessedDomains { static_domains: static_domains.clone(), on_demand_domains });
|
||||||
|
self.domain_validation(static_domains, proc_domains.as_ref()).await
|
||||||
|
}
|
||||||
|
// renew static and on-demand domains
|
||||||
|
need_cert = rx_need_cert.recv() => {
|
||||||
|
match need_cert {
|
||||||
|
Some(dom) => {
|
||||||
|
let mut candidates: HashSet<String> = HashSet::new();
|
||||||
|
|
||||||
|
// collect certificates as much as possible
|
||||||
|
candidates.insert(dom);
|
||||||
|
while let Ok(dom2) = rx_need_cert.try_recv() {
|
||||||
|
candidates.insert(dom2);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.domain_validation(candidates, proc_domains.as_ref()).await
|
||||||
|
}
|
||||||
|
None => bail!("rx_need_cert closed"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Now that we have our list of domains to check,
|
||||||
|
// actually do something
|
||||||
for dom in domains.iter() {
|
for dom in domains.iter() {
|
||||||
|
// Exclude from the list domains that were checked less than 60
|
||||||
|
// seconds ago
|
||||||
match t_last_check.get(dom) {
|
match t_last_check.get(dom) {
|
||||||
Some(t) if Instant::now() - *t < Duration::from_secs(60) => continue,
|
Some(t) if Instant::now() - *t < Duration::from_secs(60) => continue,
|
||||||
_ => t_last_check.insert(dom.to_string(), Instant::now()),
|
_ => t_last_check.insert(dom.to_string(), Instant::now()),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Actual Let's Encrypt calls are done here (in sister function)
|
||||||
debug!("Checking cert for domain: {}", dom);
|
debug!("Checking cert for domain: {}", dom);
|
||||||
if let Err(e) = self.check_cert(dom).await {
|
if let Err(e) = self.check_cert(dom).await {
|
||||||
warn!("({}) Could not get certificate: {}", dom, e);
|
warn!("({}) Could not get certificate: {}", dom, e);
|
||||||
|
@ -114,18 +147,82 @@ impl CertStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_cert_for_https(self: &Arc<Self>, domain: &str) -> Result<Arc<Cert>> {
|
async fn domain_validation(
|
||||||
// Check if domain is authorized
|
&self,
|
||||||
if !self
|
candidates: HashSet<String>,
|
||||||
.rx_proxy_config
|
maybe_proc_domains: Option<&ProcessedDomains>,
|
||||||
.borrow()
|
) -> HashSet<String> {
|
||||||
.entries
|
let mut domains: HashSet<String> = HashSet::new();
|
||||||
.iter()
|
|
||||||
.any(|ent| ent.host.matches(domain))
|
// Handle initialization
|
||||||
{
|
let proc_domains = match maybe_proc_domains {
|
||||||
bail!("Domain {} should not have a TLS certificate.", domain);
|
None => {
|
||||||
|
warn!("Proxy config is not yet loaded, refusing all certificate generation");
|
||||||
|
return domains;
|
||||||
|
}
|
||||||
|
Some(proc) => proc,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Filter certificates...
|
||||||
|
'outer: for candidate in candidates.into_iter() {
|
||||||
|
// Disallow obvious wrong domains...
|
||||||
|
if !candidate.contains('.') || candidate.ends_with(".local") {
|
||||||
|
warn!("{} is probably not a publicly accessible domain, skipping (a self-signed certificate will be used)", candidate);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Try to register domain as a static domain
|
||||||
|
if proc_domains.static_domains.contains(&candidate) {
|
||||||
|
trace!("domain {} validated as static domain", candidate);
|
||||||
|
domains.insert(candidate);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// It's not a static domain, maybe an on-demand domain?
|
||||||
|
for (pattern, maybe_check_url) in proc_domains.on_demand_domains.iter() {
|
||||||
|
// check glob pattern
|
||||||
|
if pattern.matches(&candidate) {
|
||||||
|
// if no check url is set, accept domain as long as it matches the pattern
|
||||||
|
let check_url = match maybe_check_url {
|
||||||
|
None => {
|
||||||
|
trace!(
|
||||||
|
"domain {} validated on glob pattern {} only",
|
||||||
|
candidate,
|
||||||
|
pattern
|
||||||
|
);
|
||||||
|
domains.insert(candidate);
|
||||||
|
continue 'outer;
|
||||||
|
}
|
||||||
|
Some(url) => url,
|
||||||
|
};
|
||||||
|
|
||||||
|
// if a check url is set, call it
|
||||||
|
// -- avoid DDoSing a backend
|
||||||
|
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||||
|
match self.on_demand_tls_ask(check_url, &candidate).await {
|
||||||
|
Ok(()) => {
|
||||||
|
trace!(
|
||||||
|
"domain {} validated on glob pattern {} and on check url {}",
|
||||||
|
candidate,
|
||||||
|
pattern,
|
||||||
|
check_url
|
||||||
|
);
|
||||||
|
domains.insert(candidate);
|
||||||
|
continue 'outer;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("domain {} validation refused on glob pattern {} and on check url {} with error: {}", candidate, pattern, check_url, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return domains;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This function is also in charge of the refresh of the domain names
|
||||||
|
fn get_cert_for_https(self: &Arc<Self>, domain: &str) -> Result<Arc<Cert>> {
|
||||||
// Check in local memory if it exists
|
// Check in local memory if it exists
|
||||||
if let Some(cert) = self.certs.read().unwrap().get(domain) {
|
if let Some(cert) = self.certs.read().unwrap().get(domain) {
|
||||||
if cert.is_old() {
|
if cert.is_old() {
|
||||||
|
@ -157,14 +254,15 @@ impl CertStore {
|
||||||
consul_certs.len()
|
consul_certs.len()
|
||||||
);
|
);
|
||||||
let mut loaded_certs: usize = 0;
|
let mut loaded_certs: usize = 0;
|
||||||
for (domain, cert) in consul_certs {
|
for (key, cert) in consul_certs {
|
||||||
let certser: CertSer = match serde_json::from_slice(&cert) {
|
let certser: CertSer = match serde_json::from_slice(&cert) {
|
||||||
Ok(cs) => cs,
|
Ok(cs) => cs,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Could not deserialize CertSer for {domain}: {e}");
|
warn!("Could not deserialize CertSer for {key}: {e}");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let domain = certser.hostname.clone();
|
||||||
|
|
||||||
let cert = match Cert::new(certser) {
|
let cert = match Cert::new(certser) {
|
||||||
Ok(c) => c,
|
Ok(c) => c,
|
||||||
|
@ -186,6 +284,15 @@ impl CertStore {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check certificate ensure that the certificate is in the memory store
|
||||||
|
/// and that it does not need to be renewed.
|
||||||
|
///
|
||||||
|
/// If it's not in the memory store, it tries to load it from Consul,
|
||||||
|
/// if it's not in Consul, it calls Let's Encrypt.
|
||||||
|
///
|
||||||
|
/// If the certificate is outdated in the memory store, it tries to load
|
||||||
|
/// a more recent version in Consul, if the Consul version is also outdated,
|
||||||
|
/// it tries to renew it
|
||||||
pub async fn check_cert(self: &Arc<Self>, domain: &str) -> Result<()> {
|
pub async fn check_cert(self: &Arc<Self>, domain: &str) -> Result<()> {
|
||||||
// First, try locally.
|
// First, try locally.
|
||||||
{
|
{
|
||||||
|
@ -226,15 +333,10 @@ impl CertStore {
|
||||||
self.renew_cert(domain).await
|
self.renew_cert(domain).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This is the place where certificates are generated or renewed
|
||||||
pub async fn renew_cert(self: &Arc<Self>, domain: &str) -> Result<()> {
|
pub async fn renew_cert(self: &Arc<Self>, domain: &str) -> Result<()> {
|
||||||
info!("({}) Renewing certificate", domain);
|
info!("({}) Renewing certificate", domain);
|
||||||
|
|
||||||
// Basic sanity check (we could add more kinds of checks here)
|
|
||||||
// This is just to help avoid getting rate-limited against ACME server
|
|
||||||
if !domain.contains('.') || domain.ends_with(".local") {
|
|
||||||
bail!("Probably not a publicly accessible domain, skipping (a self-signed certificate will be used)");
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---- Acquire lock ----
|
// ---- Acquire lock ----
|
||||||
// the lock is acquired for half an hour,
|
// the lock is acquired for half an hour,
|
||||||
// so that in case of an error we won't retry before
|
// so that in case of an error we won't retry before
|
||||||
|
@ -350,6 +452,19 @@ impl CertStore {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn on_demand_tls_ask(&self, check_url: &str, domain: &str) -> Result<()> {
|
||||||
|
let httpcli = reqwest::Client::new();
|
||||||
|
let chall_url = format!("{}?domain={}", check_url, domain);
|
||||||
|
info!("({}) On-demand TLS check", domain);
|
||||||
|
|
||||||
|
let httpresp = httpcli.get(&chall_url).send().await?;
|
||||||
|
if httpresp.status() != reqwest::StatusCode::OK {
|
||||||
|
bail!("{} is not authorized for on-demand TLS", domain);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn check_domain_accessibility(&self, domain: &str, session: &str) -> Result<()> {
|
async fn check_domain_accessibility(&self, domain: &str, session: &str) -> Result<()> {
|
||||||
// Returns Ok(()) only if domain is a correct domain name that
|
// Returns Ok(()) only if domain is a correct domain name that
|
||||||
// redirects to this server
|
// redirects to this server
|
||||||
|
|
|
@ -2,7 +2,7 @@ use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use log::*;
|
use tracing::*;
|
||||||
|
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
use http::uri::Authority;
|
use http::uri::Authority;
|
||||||
|
|
69
src/https.rs
69
src/https.rs
|
@ -4,7 +4,7 @@ use std::sync::{atomic::Ordering, Arc};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use log::*;
|
use tracing::*;
|
||||||
|
|
||||||
use accept_encoding_fork::Encoding;
|
use accept_encoding_fork::Encoding;
|
||||||
use async_compression::tokio::bufread::*;
|
use async_compression::tokio::bufread::*;
|
||||||
|
@ -24,7 +24,7 @@ use tokio_util::io::{ReaderStream, StreamReader};
|
||||||
use opentelemetry::{metrics, KeyValue};
|
use opentelemetry::{metrics, KeyValue};
|
||||||
|
|
||||||
use crate::cert_store::{CertStore, StoreResolver};
|
use crate::cert_store::{CertStore, StoreResolver};
|
||||||
use crate::proxy_config::{ProxyConfig, ProxyEntry};
|
use crate::proxy_config::{HostDescription, ProxyConfig, ProxyEntry};
|
||||||
use crate::reverse_proxy;
|
use crate::reverse_proxy;
|
||||||
|
|
||||||
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(24 * 3600);
|
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(24 * 3600);
|
||||||
|
@ -41,7 +41,7 @@ pub struct HttpsConfig {
|
||||||
struct HttpsMetrics {
|
struct HttpsMetrics {
|
||||||
requests_received: metrics::Counter<u64>,
|
requests_received: metrics::Counter<u64>,
|
||||||
requests_served: metrics::Counter<u64>,
|
requests_served: metrics::Counter<u64>,
|
||||||
request_proxy_duration: metrics::ValueRecorder<f64>,
|
request_proxy_duration: metrics::Histogram<f64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn serve_https(
|
pub async fn serve_https(
|
||||||
|
@ -63,7 +63,7 @@ pub async fn serve_https(
|
||||||
.with_description("Total number of requests served over HTTPS")
|
.with_description("Total number of requests served over HTTPS")
|
||||||
.init(),
|
.init(),
|
||||||
request_proxy_duration: meter
|
request_proxy_duration: meter
|
||||||
.f64_value_recorder("https_request_proxy_duration")
|
.f64_histogram("https_request_proxy_duration")
|
||||||
.with_description("Duration between time when request was received, and time when backend returned status code and headers")
|
.with_description("Duration between time when request was received, and time when backend returned status code and headers")
|
||||||
.init(),
|
.init(),
|
||||||
});
|
});
|
||||||
|
@ -233,8 +233,10 @@ async fn select_target_and_proxy(
|
||||||
.entries
|
.entries
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|ent| {
|
.filter(|ent| {
|
||||||
ent.host.matches(host)
|
ent.flags.healthy
|
||||||
|
&& ent.url_prefix.host.matches(host)
|
||||||
&& ent
|
&& ent
|
||||||
|
.url_prefix
|
||||||
.path_prefix
|
.path_prefix
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|prefix| path.starts_with(prefix))
|
.map(|prefix| path.starts_with(prefix))
|
||||||
|
@ -243,7 +245,8 @@ async fn select_target_and_proxy(
|
||||||
.max_by_key(|ent| {
|
.max_by_key(|ent| {
|
||||||
(
|
(
|
||||||
ent.priority,
|
ent.priority,
|
||||||
ent.path_prefix
|
ent.url_prefix
|
||||||
|
.path_prefix
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|x| x.len() as i32)
|
.map(|x| x.len() as i32)
|
||||||
.unwrap_or(0),
|
.unwrap_or(0),
|
||||||
|
@ -269,15 +272,22 @@ async fn select_target_and_proxy(
|
||||||
);
|
);
|
||||||
proxy_to.calls_in_progress.fetch_add(1, Ordering::SeqCst);
|
proxy_to.calls_in_progress.fetch_add(1, Ordering::SeqCst);
|
||||||
|
|
||||||
|
// Forward to backend
|
||||||
debug!("{}{} -> {}", host, path, proxy_to);
|
debug!("{}{} -> {}", host, path, proxy_to);
|
||||||
trace!("Request: {:?}", req);
|
trace!("Request: {:?}", req);
|
||||||
|
|
||||||
let response = match do_proxy(https_config, remote_addr, req, proxy_to).await {
|
let response = if let Some(http_res) = try_redirect(host, path, proxy_to) {
|
||||||
|
// redirection middleware
|
||||||
|
http_res
|
||||||
|
} else {
|
||||||
|
// proxying to backend
|
||||||
|
match do_proxy(https_config, remote_addr, req, proxy_to).await {
|
||||||
Ok(resp) => resp,
|
Ok(resp) => resp,
|
||||||
Err(e) => Response::builder()
|
Err(e) => Response::builder()
|
||||||
.status(StatusCode::BAD_GATEWAY)
|
.status(StatusCode::BAD_GATEWAY)
|
||||||
.body(Body::from(format!("Proxy error: {}", e)))
|
.body(Body::from(format!("Proxy error: {}", e)))
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
proxy_to.calls_in_progress.fetch_sub(1, Ordering::SeqCst);
|
proxy_to.calls_in_progress.fetch_sub(1, Ordering::SeqCst);
|
||||||
|
@ -299,6 +309,51 @@ async fn select_target_and_proxy(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn try_redirect(req_host: &str, req_path: &str, proxy_to: &ProxyEntry) -> Option<Response<Body>> {
|
||||||
|
let maybe_redirect = proxy_to.redirects.iter().find(|(src, _, _)| {
|
||||||
|
let mut matched: bool = src.host.matches(req_host);
|
||||||
|
|
||||||
|
if let Some(path) = &src.path_prefix {
|
||||||
|
matched &= req_path.starts_with(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
matched
|
||||||
|
});
|
||||||
|
|
||||||
|
let (src_prefix, dst_prefix, code) = match maybe_redirect {
|
||||||
|
None => return None,
|
||||||
|
Some(redirect) => redirect,
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_host = match &dst_prefix.host {
|
||||||
|
HostDescription::Hostname(h) => h,
|
||||||
|
_ => unreachable!(), // checked when ProxyEntry is created
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_prefix = dst_prefix.path_prefix.as_deref().unwrap_or("");
|
||||||
|
let original_prefix = src_prefix.path_prefix.as_deref().unwrap_or("");
|
||||||
|
let suffix = &req_path[original_prefix.len()..];
|
||||||
|
|
||||||
|
let uri = format!("https://{}{}{}", new_host, new_prefix, suffix);
|
||||||
|
|
||||||
|
let status = match StatusCode::from_u16(*code) {
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
"Couldn't redirect {}{} to {} as code {} in invalid: {}",
|
||||||
|
req_host, req_path, uri, code, e
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Ok(sc) => sc,
|
||||||
|
};
|
||||||
|
|
||||||
|
Response::builder()
|
||||||
|
.header("Location", uri.clone())
|
||||||
|
.status(status)
|
||||||
|
.body(Body::from(uri))
|
||||||
|
.ok()
|
||||||
|
}
|
||||||
|
|
||||||
async fn do_proxy(
|
async fn do_proxy(
|
||||||
https_config: &HttpsConfig,
|
https_config: &HttpsConfig,
|
||||||
remote_addr: SocketAddr,
|
remote_addr: SocketAddr,
|
||||||
|
|
42
src/main.rs
42
src/main.rs
|
@ -1,9 +1,10 @@
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate anyhow;
|
extern crate anyhow;
|
||||||
|
|
||||||
use log::*;
|
use std::collections::BTreeMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
use tracing::*;
|
||||||
|
|
||||||
use futures::{FutureExt, TryFutureExt};
|
use futures::{FutureExt, TryFutureExt};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
@ -117,7 +118,10 @@ async fn main() {
|
||||||
if std::env::var("RUST_LOG").is_err() {
|
if std::env::var("RUST_LOG").is_err() {
|
||||||
std::env::set_var("RUST_LOG", "tricot=info")
|
std::env::set_var("RUST_LOG", "tricot=info")
|
||||||
}
|
}
|
||||||
pretty_env_logger::init();
|
tracing_subscriber::fmt()
|
||||||
|
.with_writer(std::io::stderr)
|
||||||
|
.with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
|
||||||
|
.init();
|
||||||
|
|
||||||
// Abort on panic (same behavior as in Go)
|
// Abort on panic (same behavior as in Go)
|
||||||
std::panic::set_hook(Box::new(|panic_info| {
|
std::panic::set_hook(Box::new(|panic_info| {
|
||||||
|
@ -218,6 +222,8 @@ async fn dump_config_on_change(
|
||||||
mut rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
|
mut rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
|
||||||
mut must_exit: watch::Receiver<bool>,
|
mut must_exit: watch::Receiver<bool>,
|
||||||
) {
|
) {
|
||||||
|
let mut old_cfg: Arc<ProxyConfig> = rx_proxy_config.borrow().clone();
|
||||||
|
|
||||||
while !*must_exit.borrow() {
|
while !*must_exit.borrow() {
|
||||||
select!(
|
select!(
|
||||||
c = rx_proxy_config.changed() => {
|
c = rx_proxy_config.changed() => {
|
||||||
|
@ -227,11 +233,37 @@ async fn dump_config_on_change(
|
||||||
}
|
}
|
||||||
_ = must_exit.changed() => continue,
|
_ = must_exit.changed() => continue,
|
||||||
);
|
);
|
||||||
println!("---- PROXY CONFIGURATION ----");
|
|
||||||
for ent in rx_proxy_config.borrow().entries.iter() {
|
let cfg: Arc<ProxyConfig> = rx_proxy_config.borrow().clone();
|
||||||
println!(" {}", ent);
|
if cfg != old_cfg {
|
||||||
|
let mut cfg_map = BTreeMap::<_, Vec<_>>::new();
|
||||||
|
for ent in cfg.entries.iter() {
|
||||||
|
cfg_map
|
||||||
|
.entry((&ent.url_prefix.host, &ent.url_prefix.path_prefix))
|
||||||
|
.or_default()
|
||||||
|
.push(ent);
|
||||||
|
}
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"---- PROXY CONFIGURATION at {} ----",
|
||||||
|
chrono::offset::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
|
||||||
|
);
|
||||||
|
for ((host, prefix), ents) in cfg_map.iter_mut() {
|
||||||
|
println!("{}{}:", host, prefix.as_deref().unwrap_or_default());
|
||||||
|
for ent in ents.iter() {
|
||||||
|
print!(" ");
|
||||||
|
if !ent.flags.healthy {
|
||||||
|
print!("/!\\ ");
|
||||||
|
} else {
|
||||||
|
print!(" ");
|
||||||
|
}
|
||||||
|
println!("{}", ent);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
println!();
|
println!();
|
||||||
|
|
||||||
|
old_cfg = cfg;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,34 +4,37 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use futures::future::*;
|
use futures::future::*;
|
||||||
use log::*;
|
use tracing::*;
|
||||||
|
|
||||||
use hyper::{
|
use hyper::{
|
||||||
header::CONTENT_TYPE,
|
header::CONTENT_TYPE,
|
||||||
service::{make_service_fn, service_fn},
|
service::{make_service_fn, service_fn},
|
||||||
Body, Method, Request, Response, Server,
|
Body, Method, Request, Response, Server,
|
||||||
};
|
};
|
||||||
use opentelemetry_prometheus::PrometheusExporter;
|
use opentelemetry::sdk::metrics;
|
||||||
use prometheus::{Encoder, TextEncoder};
|
use prometheus::{Encoder, TextEncoder};
|
||||||
|
|
||||||
pub struct MetricsServer {
|
pub struct MetricsServer {
|
||||||
bind_addr: Option<SocketAddr>,
|
bind_addr: Option<SocketAddr>,
|
||||||
exporter: PrometheusExporter,
|
registry: prometheus::Registry,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MetricsServer {
|
impl MetricsServer {
|
||||||
pub fn init(bind_addr: Option<SocketAddr>) -> MetricsServer {
|
pub fn init(bind_addr: Option<SocketAddr>) -> MetricsServer {
|
||||||
|
let registry = prometheus::Registry::new();
|
||||||
let exporter = opentelemetry_prometheus::exporter()
|
let exporter = opentelemetry_prometheus::exporter()
|
||||||
.with_default_summary_quantiles(vec![0.25, 0.5, 0.75, 0.9, 0.95, 0.99])
|
.with_registry(registry.clone())
|
||||||
.with_default_histogram_boundaries(vec![
|
.with_aggregation_selector(AggregationSelector)
|
||||||
0.001, 0.0015, 0.002, 0.003, 0.005, 0.007, 0.01, 0.015, 0.02, 0.03, 0.05, 0.07,
|
.without_counter_suffixes()
|
||||||
0.1, 0.15, 0.2, 0.3, 0.5, 0.7, 1., 1.5, 2., 3., 5., 7., 10., 15., 20., 30., 40.,
|
.build()
|
||||||
50., 60., 70., 100.,
|
.expect("build prometheus registry");
|
||||||
])
|
let mp = metrics::MeterProvider::builder()
|
||||||
.init();
|
.with_reader(exporter)
|
||||||
|
.build();
|
||||||
|
opentelemetry::global::set_meter_provider(mp);
|
||||||
Self {
|
Self {
|
||||||
bind_addr,
|
bind_addr,
|
||||||
exporter,
|
registry,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,7 +73,7 @@ impl MetricsServer {
|
||||||
(&Method::GET, "/metrics") => {
|
(&Method::GET, "/metrics") => {
|
||||||
let mut buffer = vec![];
|
let mut buffer = vec![];
|
||||||
let encoder = TextEncoder::new();
|
let encoder = TextEncoder::new();
|
||||||
let metric_families = self.exporter.registry().gather();
|
let metric_families = self.registry.gather();
|
||||||
encoder.encode(&metric_families, &mut buffer).unwrap();
|
encoder.encode(&metric_families, &mut buffer).unwrap();
|
||||||
|
|
||||||
Response::builder()
|
Response::builder()
|
||||||
|
@ -88,3 +91,21 @@ impl MetricsServer {
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct AggregationSelector;
|
||||||
|
|
||||||
|
impl metrics::reader::AggregationSelector for AggregationSelector {
|
||||||
|
fn aggregation(&self, kind: metrics::InstrumentKind) -> metrics::Aggregation {
|
||||||
|
match kind {
|
||||||
|
metrics::InstrumentKind::Histogram => metrics::Aggregation::ExplicitBucketHistogram {
|
||||||
|
boundaries: vec![
|
||||||
|
0.001, 0.0015, 0.002, 0.003, 0.005, 0.007, 0.01, 0.015, 0.02, 0.03, 0.05, 0.07,
|
||||||
|
0.1, 0.15, 0.2, 0.3, 0.5, 0.7, 1., 1.5, 2., 3., 5., 7., 10., 15., 20., 30.,
|
||||||
|
40., 50., 60., 70., 100.,
|
||||||
|
],
|
||||||
|
record_min_max: true,
|
||||||
|
},
|
||||||
|
_ => metrics::reader::DefaultAggregationSelector::new().aggregation(kind),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,22 +1,19 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::{atomic, Arc};
|
use std::sync::{atomic, Arc};
|
||||||
use std::{cmp, time::Duration};
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use opentelemetry::{metrics, KeyValue};
|
use opentelemetry::{metrics, KeyValue};
|
||||||
|
|
||||||
use futures::future::BoxFuture;
|
use tokio::{select, sync::watch};
|
||||||
use futures::stream::{FuturesUnordered, StreamExt};
|
use tracing::*;
|
||||||
|
|
||||||
use log::*;
|
|
||||||
use tokio::{select, sync::watch, time::sleep};
|
|
||||||
|
|
||||||
use crate::consul;
|
use crate::consul;
|
||||||
|
|
||||||
// ---- Extract proxy config from Consul catalog ----
|
// ---- Extract proxy config from Consul catalog ----
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||||
pub enum HostDescription {
|
pub enum HostDescription {
|
||||||
Hostname(String),
|
Hostname(String),
|
||||||
Pattern(glob::Pattern),
|
Pattern(glob::Pattern),
|
||||||
|
@ -43,17 +40,53 @@ impl std::fmt::Display for HostDescription {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
HostDescription::Hostname(h) => write!(f, "{}", h),
|
HostDescription::Hostname(h) => write!(f, "{}", h),
|
||||||
HostDescription::Pattern(p) => write!(f, "Pattern('{}')", p.as_str()),
|
HostDescription::Pattern(p) => write!(f, "[{}]", p.as_str()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct UrlPrefix {
|
||||||
|
/// Publicly exposed TLS hostnames for matching this rule
|
||||||
|
pub host: HostDescription,
|
||||||
|
|
||||||
|
/// Path prefix for matching this rule
|
||||||
|
pub path_prefix: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for UrlPrefix {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.host == other.host && self.path_prefix == other.path_prefix
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl Eq for UrlPrefix {}
|
||||||
|
|
||||||
|
impl UrlPrefix {
|
||||||
|
fn new(raw_prefix: &str) -> Option<Self> {
|
||||||
|
let (raw_host, path_prefix) = match raw_prefix.find('/') {
|
||||||
|
Some(i) => {
|
||||||
|
let (host, pp) = raw_prefix.split_at(i);
|
||||||
|
(host, Some(pp.to_string()))
|
||||||
|
}
|
||||||
|
None => (raw_prefix, None),
|
||||||
|
};
|
||||||
|
|
||||||
|
let host = match HostDescription::new(raw_host) {
|
||||||
|
Ok(h) => h,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Invalid hostname pattern {}: {}", raw_host, e);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(Self { host, path_prefix })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ProxyEntry {
|
pub struct ProxyEntry {
|
||||||
/// Publicly exposed TLS hostnames for matching this rule
|
/// An Url prefix is made of a host and maybe a path prefix
|
||||||
pub host: HostDescription,
|
pub url_prefix: UrlPrefix,
|
||||||
/// Path prefix for matching this rule
|
|
||||||
pub path_prefix: Option<String>,
|
|
||||||
/// Priority with which this rule is considered (highest first)
|
/// Priority with which this rule is considered (highest first)
|
||||||
pub priority: u32,
|
pub priority: u32,
|
||||||
|
|
||||||
|
@ -71,14 +104,85 @@ pub struct ProxyEntry {
|
||||||
/// when matching this rule
|
/// when matching this rule
|
||||||
pub add_headers: Vec<(String, String)>,
|
pub add_headers: Vec<(String, String)>,
|
||||||
|
|
||||||
|
/// Try to match all these redirection before forwarding to the backend
|
||||||
|
/// when matching this rule
|
||||||
|
pub redirects: Vec<(UrlPrefix, UrlPrefix, u16)>,
|
||||||
|
|
||||||
|
/// Wether or not the domain must be validated before asking a certificate
|
||||||
|
/// to let's encrypt (only for Glob patterns)
|
||||||
|
pub on_demand_tls_ask: Option<String>,
|
||||||
|
|
||||||
/// Number of calls in progress, used to deprioritize slow back-ends
|
/// Number of calls in progress, used to deprioritize slow back-ends
|
||||||
pub calls_in_progress: atomic::AtomicI64,
|
pub calls_in_progress: atomic::AtomicI64,
|
||||||
/// Time of last call, used for round-robin selection
|
/// Time of last call, used for round-robin selection
|
||||||
pub last_call: atomic::AtomicI64,
|
pub last_call: atomic::AtomicI64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
impl PartialEq for ProxyEntry {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.url_prefix == other.url_prefix
|
||||||
|
&& self.priority == other.priority
|
||||||
|
&& self.service_name == other.service_name
|
||||||
|
&& self.target_addr == other.target_addr
|
||||||
|
&& self.https_target == other.https_target
|
||||||
|
&& self.flags == other.flags
|
||||||
|
&& self.add_headers == other.add_headers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl Eq for ProxyEntry {}
|
||||||
|
|
||||||
|
impl ProxyEntry {
|
||||||
|
fn new(
|
||||||
|
service_name: String,
|
||||||
|
frontend: MatchTag,
|
||||||
|
target_addr: SocketAddr,
|
||||||
|
middleware: &[ConfigTag],
|
||||||
|
flags: ProxyEntryFlags,
|
||||||
|
) -> Self {
|
||||||
|
let (url_prefix, priority, https_target) = match frontend {
|
||||||
|
MatchTag::Http(u, p) => (u, p, false),
|
||||||
|
MatchTag::HttpWithTls(u, p) => (u, p, true),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut add_headers = vec![];
|
||||||
|
let mut redirects = vec![];
|
||||||
|
let mut on_demand_tls_ask: Option<String> = None;
|
||||||
|
for mid in middleware.into_iter() {
|
||||||
|
// LocalLb and GlobalLb are handled in the parent function
|
||||||
|
match mid {
|
||||||
|
ConfigTag::AddHeader(k, v) => add_headers.push((k.to_string(), v.clone())),
|
||||||
|
ConfigTag::AddRedirect(m, r, c) => redirects.push(((*m).clone(), (*r).clone(), *c)),
|
||||||
|
ConfigTag::OnDemandTlsAsk(url) => on_demand_tls_ask = Some(url.to_string()),
|
||||||
|
ConfigTag::LocalLb | ConfigTag::GlobalLb => (),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
ProxyEntry {
|
||||||
|
// id
|
||||||
|
service_name,
|
||||||
|
// frontend
|
||||||
|
url_prefix,
|
||||||
|
priority,
|
||||||
|
// backend
|
||||||
|
target_addr,
|
||||||
|
https_target,
|
||||||
|
// middleware
|
||||||
|
flags,
|
||||||
|
add_headers,
|
||||||
|
redirects,
|
||||||
|
on_demand_tls_ask,
|
||||||
|
// internal
|
||||||
|
last_call: atomic::AtomicI64::from(0),
|
||||||
|
calls_in_progress: atomic::AtomicI64::from(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
|
||||||
pub struct ProxyEntryFlags {
|
pub struct ProxyEntryFlags {
|
||||||
|
/// Is the target healthy?
|
||||||
|
pub healthy: bool,
|
||||||
|
|
||||||
/// Is the target the same node as we are running on?
|
/// Is the target the same node as we are running on?
|
||||||
/// (if yes priorize it over other matching targets)
|
/// (if yes priorize it over other matching targets)
|
||||||
pub same_node: bool,
|
pub same_node: bool,
|
||||||
|
@ -101,10 +205,13 @@ impl std::fmt::Display for ProxyEntry {
|
||||||
write!(
|
write!(
|
||||||
f,
|
f,
|
||||||
"{}{} {}",
|
"{}{} {}",
|
||||||
self.host,
|
self.url_prefix.host,
|
||||||
self.path_prefix.as_deref().unwrap_or_default(),
|
self.url_prefix.path_prefix.as_deref().unwrap_or_default(),
|
||||||
self.priority
|
self.priority
|
||||||
)?;
|
)?;
|
||||||
|
if !self.flags.healthy {
|
||||||
|
write!(f, " UNHEALTHY")?;
|
||||||
|
}
|
||||||
if self.flags.same_node {
|
if self.flags.same_node {
|
||||||
write!(f, " OURSELF")?;
|
write!(f, " OURSELF")?;
|
||||||
} else if self.flags.same_site {
|
} else if self.flags.same_site {
|
||||||
|
@ -122,136 +229,176 @@ impl std::fmt::Display for ProxyEntry {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub struct ProxyConfig {
|
pub struct ProxyConfig {
|
||||||
pub entries: Vec<ProxyEntry>,
|
pub entries: Vec<ProxyEntry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn retry_to_time(retries: u32, max_time: Duration) -> Duration {
|
#[derive(Debug)]
|
||||||
// 1.2^x seems to be a good value to exponentially increase time at a good pace
|
enum ParsedTag<'a> {
|
||||||
// eg. 1.2^32 = 341 seconds ~= 5 minutes - ie. after 32 retries we wait 5
|
Frontend(MatchTag),
|
||||||
// minutes
|
Middleware(ConfigTag<'a>),
|
||||||
Duration::from_secs(cmp::min(
|
|
||||||
max_time.as_secs(),
|
|
||||||
1.2f64.powf(retries as f64) as u64,
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_tricot_tag(
|
#[derive(Debug)]
|
||||||
service_name: String,
|
enum MatchTag {
|
||||||
tag: &str,
|
/// HTTP backend (plain text)
|
||||||
target_addr: SocketAddr,
|
Http(UrlPrefix, u32),
|
||||||
add_headers: &[(String, String)],
|
/// HTTPS backend (TLS encrypted)
|
||||||
flags: ProxyEntryFlags,
|
HttpWithTls(UrlPrefix, u32),
|
||||||
) -> Option<ProxyEntry> {
|
}
|
||||||
let splits = tag.split(' ').collect::<Vec<_>>();
|
|
||||||
if (splits.len() != 2 && splits.len() != 3)
|
#[derive(Debug)]
|
||||||
|| (splits[0] != "tricot" && splits[0] != "tricot-https")
|
enum ConfigTag<'a> {
|
||||||
{
|
AddHeader(&'a str, String),
|
||||||
return None;
|
AddRedirect(UrlPrefix, UrlPrefix, u16),
|
||||||
|
OnDemandTlsAsk(&'a str),
|
||||||
|
GlobalLb,
|
||||||
|
LocalLb,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_tricot_tags(tag: &str) -> Option<ParsedTag> {
|
||||||
|
let splits = tag.splitn(4, ' ').collect::<Vec<_>>();
|
||||||
|
let parsed_tag = match splits.as_slice() {
|
||||||
|
["tricot", raw_prefix, maybe_priority @ ..] => {
|
||||||
|
// priority is set to 100 when value is invalid or missing
|
||||||
|
let priority: u32 = maybe_priority
|
||||||
|
.iter()
|
||||||
|
.next()
|
||||||
|
.map_or(Ok(100), |x| x.parse::<u32>())
|
||||||
|
.unwrap_or(100);
|
||||||
|
UrlPrefix::new(raw_prefix)
|
||||||
|
.map(|prefix| ParsedTag::Frontend(MatchTag::Http(prefix, priority)))
|
||||||
}
|
}
|
||||||
|
["tricot-https", raw_prefix, maybe_priority @ ..] => {
|
||||||
let (host, path_prefix) = match splits[1].find('/') {
|
// priority is set to 100 when value is invalid or missing
|
||||||
Some(i) => {
|
let priority: u32 = maybe_priority
|
||||||
let (host, pp) = splits[1].split_at(i);
|
.iter()
|
||||||
(host, Some(pp.to_string()))
|
.next()
|
||||||
|
.map_or(Ok(100), |x| x.parse::<u32>())
|
||||||
|
.unwrap_or(100);
|
||||||
|
UrlPrefix::new(raw_prefix)
|
||||||
|
.map(|prefix| ParsedTag::Frontend(MatchTag::HttpWithTls(prefix, priority)))
|
||||||
}
|
}
|
||||||
None => (splits[1], None),
|
["tricot-add-header", header_key, header_values @ ..] => Some(ParsedTag::Middleware(
|
||||||
};
|
ConfigTag::AddHeader(header_key, header_values.join(" ")),
|
||||||
|
)),
|
||||||
let priority = match splits.len() {
|
["tricot-add-redirect", raw_match, raw_replace, maybe_raw_code @ ..] => {
|
||||||
3 => splits[2].parse().ok()?,
|
let (p_match, p_replace) =
|
||||||
_ => 100,
|
match (UrlPrefix::new(raw_match), UrlPrefix::new(raw_replace)) {
|
||||||
};
|
(Some(m), Some(r)) => (m, r),
|
||||||
|
_ => {
|
||||||
let host = match HostDescription::new(host) {
|
debug!(
|
||||||
Ok(h) => h,
|
"tag {} is ignored, one of the url prefix can't be parsed",
|
||||||
Err(e) => {
|
tag
|
||||||
warn!("Invalid hostname pattern {}: {}", host, e);
|
);
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Some(ProxyEntry {
|
if matches!(p_replace.host, HostDescription::Pattern(_)) {
|
||||||
service_name,
|
debug!(
|
||||||
target_addr,
|
"tag {} ignored as redirect to a glob pattern is not supported",
|
||||||
https_target: (splits[0] == "tricot-https"),
|
tag
|
||||||
host,
|
);
|
||||||
flags,
|
return None;
|
||||||
path_prefix,
|
|
||||||
priority,
|
|
||||||
add_headers: add_headers.to_vec(),
|
|
||||||
last_call: atomic::AtomicI64::from(0),
|
|
||||||
calls_in_progress: atomic::AtomicI64::from(0),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_tricot_add_header_tag(tag: &str) -> Option<(String, String)> {
|
|
||||||
let splits = tag.splitn(3, ' ').collect::<Vec<_>>();
|
|
||||||
if splits.len() == 3 && splits[0] == "tricot-add-header" {
|
|
||||||
Some((splits[1].to_string(), splits[2].to_string()))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let maybe_parsed_code = maybe_raw_code
|
||||||
|
.iter()
|
||||||
|
.next()
|
||||||
|
.map(|c| c.parse::<u16>().ok())
|
||||||
|
.flatten();
|
||||||
|
let http_code = match maybe_parsed_code {
|
||||||
|
Some(301) => 301,
|
||||||
|
Some(302) => 302,
|
||||||
|
Some(303) => 303,
|
||||||
|
Some(307) => 307,
|
||||||
|
_ => {
|
||||||
|
debug!(
|
||||||
|
"tag {} has a missing or invalid http code, setting it to 302",
|
||||||
|
tag
|
||||||
|
);
|
||||||
|
302
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(ParsedTag::Middleware(ConfigTag::AddRedirect(
|
||||||
|
p_match, p_replace, http_code,
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
["tricot-on-demand-tls-ask", url, ..] => {
|
||||||
|
Some(ParsedTag::Middleware(ConfigTag::OnDemandTlsAsk(url)))
|
||||||
|
}
|
||||||
|
["tricot-global-lb", ..] => Some(ParsedTag::Middleware(ConfigTag::GlobalLb)),
|
||||||
|
["tricot-local-lb", ..] => Some(ParsedTag::Middleware(ConfigTag::LocalLb)),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
trace!("tag {} parsed as {:?}", tag, parsed_tag);
|
||||||
|
parsed_tag
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_consul_catalog(
|
fn parse_consul_service(
|
||||||
catalog: &consul::catalog::CatalogNode,
|
s: &consul::catalog::HealthServiceNode,
|
||||||
same_node: bool,
|
mut flags: ProxyEntryFlags,
|
||||||
same_site: bool,
|
|
||||||
) -> Vec<ProxyEntry> {
|
) -> Vec<ProxyEntry> {
|
||||||
trace!("Parsing node catalog: {:#?}", catalog);
|
trace!("Parsing service: {:#?}", s);
|
||||||
|
|
||||||
let mut entries = vec![];
|
let ip_addr = match s.service.address.parse() {
|
||||||
|
|
||||||
for (_, svc) in catalog.services.iter() {
|
|
||||||
let ip_addr = match svc.address.parse() {
|
|
||||||
Ok(ip) => ip,
|
Ok(ip) => ip,
|
||||||
_ => match catalog.node.address.parse() {
|
_ => match s.node.address.parse() {
|
||||||
Ok(ip) => ip,
|
Ok(ip) => ip,
|
||||||
_ => {
|
_ => {
|
||||||
warn!(
|
warn!(
|
||||||
"Could not get address for service {} at node {}",
|
"Could not get address for service {} at node {}",
|
||||||
svc.service, catalog.node.node
|
s.service.service, s.node.node
|
||||||
);
|
);
|
||||||
continue;
|
return vec![];
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
let addr = SocketAddr::new(ip_addr, svc.port);
|
let addr = SocketAddr::new(ip_addr, s.service.port);
|
||||||
|
|
||||||
let (site_lb, global_lb) = if svc.tags.contains(&"tricot-global-lb".into()) {
|
// tag parsing
|
||||||
(false, true)
|
let mut collected_middleware = vec![];
|
||||||
} else if svc.tags.contains(&"tricot-site-lb".into()) {
|
let mut collected_frontends = vec![];
|
||||||
(true, false)
|
for tag in s.service.tags.iter() {
|
||||||
} else {
|
match parse_tricot_tags(tag) {
|
||||||
(false, false)
|
Some(ParsedTag::Frontend(x)) => collected_frontends.push(x),
|
||||||
|
Some(ParsedTag::Middleware(y)) => collected_middleware.push(y),
|
||||||
|
_ => trace!(
|
||||||
|
"service {}: tag '{}' could not be parsed",
|
||||||
|
s.service.service,
|
||||||
|
tag
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// some legacy processing that would need a refactor later
|
||||||
|
for mid in collected_middleware.iter() {
|
||||||
|
match mid {
|
||||||
|
ConfigTag::GlobalLb => flags.global_lb = true,
|
||||||
|
ConfigTag::LocalLb => flags.site_lb = true,
|
||||||
|
_ => (),
|
||||||
};
|
};
|
||||||
|
|
||||||
let flags = ProxyEntryFlags {
|
|
||||||
same_node,
|
|
||||||
same_site,
|
|
||||||
site_lb,
|
|
||||||
global_lb,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut add_headers = vec![];
|
|
||||||
for tag in svc.tags.iter() {
|
|
||||||
if let Some(pair) = parse_tricot_add_header_tag(tag) {
|
|
||||||
add_headers.push(pair);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for tag in svc.tags.iter() {
|
// build proxy entries
|
||||||
if let Some(ent) =
|
let entries = collected_frontends
|
||||||
parse_tricot_tag(svc.service.clone(), tag, addr, &add_headers[..], flags)
|
.into_iter()
|
||||||
{
|
.map(|frt| {
|
||||||
entries.push(ent);
|
ProxyEntry::new(
|
||||||
}
|
s.service.service.clone(),
|
||||||
}
|
frt,
|
||||||
}
|
addr,
|
||||||
|
collected_middleware.as_ref(),
|
||||||
|
flags,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
trace!("Result of parsing catalog:");
|
trace!("Result of parsing service:");
|
||||||
for ent in entries.iter() {
|
for ent in entries.iter() {
|
||||||
trace!(" {}", ent);
|
trace!(" {}", ent);
|
||||||
}
|
}
|
||||||
|
@ -259,13 +406,6 @@ fn parse_consul_catalog(
|
||||||
entries
|
entries
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct NodeWatchState {
|
|
||||||
last_idx: Option<usize>,
|
|
||||||
last_catalog: Option<consul::catalog::CatalogNode>,
|
|
||||||
retries: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn spawn_proxy_config_task(
|
pub fn spawn_proxy_config_task(
|
||||||
consul: consul::Consul,
|
consul: consul::Consul,
|
||||||
local_node: String,
|
local_node: String,
|
||||||
|
@ -279,110 +419,55 @@ pub fn spawn_proxy_config_task(
|
||||||
let consul = Arc::new(consul);
|
let consul = Arc::new(consul);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut nodes = HashMap::new();
|
let mut catalog_rx = consul.watch_all_service_health(Duration::from_secs(300));
|
||||||
let mut watches = FuturesUnordered::<BoxFuture<'static, (String, Result<_>)>>::new();
|
let mut local_node_site = None;
|
||||||
|
|
||||||
let mut node_site = HashMap::new();
|
|
||||||
|
|
||||||
while !*must_exit.borrow() {
|
while !*must_exit.borrow() {
|
||||||
let list_nodes = select! {
|
select! {
|
||||||
ln = consul.catalog_node_list(None) => ln,
|
_ = catalog_rx.changed() => (),
|
||||||
_ = must_exit.changed() => continue,
|
_ = must_exit.changed() => continue,
|
||||||
};
|
};
|
||||||
|
|
||||||
match list_nodes {
|
let services = catalog_rx.borrow_and_update().clone();
|
||||||
Ok(consul_nodes) => {
|
if local_node_site.is_none() {
|
||||||
info!("Watched consul nodes: {:?}", consul_nodes);
|
for (_, svcnodes) in services.iter() {
|
||||||
for consul_node in consul_nodes.into_inner() {
|
for svcnode in svcnodes.iter() {
|
||||||
let node = &consul_node.node;
|
if svcnode.node.node == local_node {
|
||||||
if !nodes.contains_key(node) {
|
if let Some(site) = svcnode.node.meta.get("site") {
|
||||||
nodes.insert(node.clone(), NodeWatchState::default());
|
local_node_site = Some(site.to_string());
|
||||||
|
|
||||||
let node = node.to_string();
|
|
||||||
let consul = consul.clone();
|
|
||||||
|
|
||||||
watches.push(Box::pin(async move {
|
|
||||||
let res = consul.catalog_node(&node, None).await;
|
|
||||||
(node, res)
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
if let Some(site) = consul_node.meta.get("site") {
|
|
||||||
node_site.insert(node.clone(), site.clone());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
|
||||||
warn!("Could not get Consul node list: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let next_watch = select! {
|
|
||||||
nw = watches.next() => nw,
|
|
||||||
_ = must_exit.changed() => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
let (node, res): (String, Result<_>) = match next_watch {
|
|
||||||
Some(v) => v,
|
|
||||||
None => {
|
|
||||||
warn!("No nodes currently watched in proxy_config.rs");
|
|
||||||
sleep(Duration::from_secs(10)).await;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match res {
|
|
||||||
Ok(res) => {
|
|
||||||
let new_idx = res.index();
|
|
||||||
let catalog = res.into_inner();
|
|
||||||
|
|
||||||
let mut watch_state = nodes.get_mut(&node).unwrap();
|
|
||||||
watch_state.last_idx = Some(new_idx);
|
|
||||||
watch_state.last_catalog = catalog;
|
|
||||||
watch_state.retries = 0;
|
|
||||||
|
|
||||||
let idx = watch_state.last_idx;
|
|
||||||
let consul = consul.clone();
|
|
||||||
watches.push(Box::pin(async move {
|
|
||||||
let res = consul.catalog_node(&node, idx).await;
|
|
||||||
(node, res)
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
let mut watch_state = nodes.get_mut(&node).unwrap();
|
|
||||||
watch_state.retries += 1;
|
|
||||||
watch_state.last_idx = None;
|
|
||||||
|
|
||||||
let will_retry_in =
|
|
||||||
retry_to_time(watch_state.retries, Duration::from_secs(600));
|
|
||||||
error!(
|
|
||||||
"Failed to query consul for node {}. Will retry in {}s. {}",
|
|
||||||
node,
|
|
||||||
will_retry_in.as_secs(),
|
|
||||||
e
|
|
||||||
);
|
|
||||||
|
|
||||||
let consul = consul.clone();
|
|
||||||
watches.push(Box::pin(async move {
|
|
||||||
sleep(will_retry_in).await;
|
|
||||||
let res = consul.catalog_node(&node, None).await;
|
|
||||||
(node, res)
|
|
||||||
}));
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut entries = vec![];
|
let mut entries = vec![];
|
||||||
for (node_name, watch_state) in nodes.iter() {
|
|
||||||
if let Some(catalog) = &watch_state.last_catalog {
|
for (_service, svcnodes) in services.iter() {
|
||||||
let same_node = *node_name == local_node;
|
for svcnode in svcnodes.iter() {
|
||||||
let same_site = match (node_site.get(node_name), node_site.get(&local_node)) {
|
let healthy = !svcnode.checks.iter().any(|x| x.status == "critical");
|
||||||
|
|
||||||
|
let same_node = svcnode.node.node == local_node;
|
||||||
|
let same_site = match (svcnode.node.meta.get("site"), local_node_site.as_ref())
|
||||||
|
{
|
||||||
(Some(s1), Some(s2)) => s1 == s2,
|
(Some(s1), Some(s2)) => s1 == s2,
|
||||||
_ => false,
|
_ => false,
|
||||||
};
|
};
|
||||||
|
|
||||||
entries.extend(parse_consul_catalog(catalog, same_node, same_site));
|
let flags = ProxyEntryFlags {
|
||||||
|
healthy,
|
||||||
|
same_node,
|
||||||
|
same_site,
|
||||||
|
site_lb: false,
|
||||||
|
global_lb: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
entries.extend(parse_consul_service(&svcnode, flags));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
entries.sort_by_cached_key(|ent| ent.to_string());
|
||||||
|
|
||||||
let config = ProxyConfig { entries };
|
let config = ProxyConfig { entries };
|
||||||
|
|
||||||
tx.send(Arc::new(config)).expect("Internal error");
|
tx.send(Arc::new(config)).expect("Internal error");
|
||||||
|
@ -397,7 +482,7 @@ pub fn spawn_proxy_config_task(
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
struct ProxyConfigMetrics {
|
struct ProxyConfigMetrics {
|
||||||
_proxy_config_entries: metrics::ValueObserver<u64>,
|
_proxy_config_entries: metrics::ObservableGauge<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ProxyConfigMetrics {
|
impl ProxyConfigMetrics {
|
||||||
|
@ -405,12 +490,13 @@ impl ProxyConfigMetrics {
|
||||||
let meter = opentelemetry::global::meter("tricot");
|
let meter = opentelemetry::global::meter("tricot");
|
||||||
Self {
|
Self {
|
||||||
_proxy_config_entries: meter
|
_proxy_config_entries: meter
|
||||||
.u64_value_observer("proxy_config_entries", move |observer| {
|
.u64_observable_gauge("proxy_config_entries")
|
||||||
|
.with_callback(move |observer| {
|
||||||
let mut patterns = HashMap::new();
|
let mut patterns = HashMap::new();
|
||||||
for ent in rx.borrow().entries.iter() {
|
for ent in rx.borrow().entries.iter() {
|
||||||
let attrs = (
|
let attrs = (
|
||||||
ent.host.to_string(),
|
ent.url_prefix.host.to_string(),
|
||||||
ent.path_prefix.clone().unwrap_or_default(),
|
ent.url_prefix.path_prefix.clone().unwrap_or_default(),
|
||||||
ent.service_name.clone(),
|
ent.service_name.clone(),
|
||||||
);
|
);
|
||||||
*patterns.entry(attrs).or_default() += 1;
|
*patterns.entry(attrs).or_default() += 1;
|
||||||
|
@ -440,8 +526,8 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_parse_tricot_add_header_tag() {
|
fn test_parse_tricot_add_header_tag() {
|
||||||
match parse_tricot_add_header_tag("tricot-add-header Content-Security-Policy default-src 'none'; img-src 'self'; script-src 'self'; style-src 'self'") {
|
match parse_tricot_tags("tricot-add-header Content-Security-Policy default-src 'none'; img-src 'self'; script-src 'self'; style-src 'self'") {
|
||||||
Some((name, value)) => {
|
Some(ParsedTag::Middleware(ConfigTag::AddHeader(name, value))) => {
|
||||||
assert_eq!(name, "Content-Security-Policy");
|
assert_eq!(name, "Content-Security-Policy");
|
||||||
assert_eq!(value, "default-src 'none'; img-src 'self'; script-src 'self'; style-src 'self'");
|
assert_eq!(value, "default-src 'none'; img-src 'self'; script-src 'self'; style-src 'self'");
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ use std::sync::Arc;
|
||||||
use std::time::{Duration, SystemTime};
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use log::*;
|
use tracing::*;
|
||||||
|
|
||||||
use http::{header::HeaderName, StatusCode};
|
use http::{header::HeaderName, StatusCode};
|
||||||
use hyper::header::{HeaderMap, HeaderValue};
|
use hyper::header::{HeaderMap, HeaderValue};
|
||||||
|
|
Loading…
Reference in a new issue