Add graceful shutdown and memory tracing
This commit is contained in:
parent
d7511c683d
commit
5e5299a6d0
8 changed files with 258 additions and 39 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1,2 +1,3 @@
|
||||||
/target
|
/target
|
||||||
run_local.sh
|
run_local.sh
|
||||||
|
dhat-heap.json
|
||||||
|
|
96
Cargo.lock
generated
96
Cargo.lock
generated
|
@ -370,6 +370,21 @@ dependencies = [
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "dhat"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "47003dc9f6368a88e85956c3b2573a7e6872746a3e5d762a8885da3a136a0381"
|
||||||
|
dependencies = [
|
||||||
|
"backtrace",
|
||||||
|
"lazy_static",
|
||||||
|
"parking_lot 0.11.2",
|
||||||
|
"rustc-hash",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"thousands",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "discard"
|
name = "discard"
|
||||||
version = "1.0.4"
|
version = "1.0.4"
|
||||||
|
@ -851,6 +866,15 @@ dependencies = [
|
||||||
"hashbrown",
|
"hashbrown",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "instant"
|
||||||
|
version = "0.1.12"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if 1.0.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "iovec"
|
name = "iovec"
|
||||||
version = "0.1.4"
|
version = "0.1.4"
|
||||||
|
@ -921,6 +945,15 @@ dependencies = [
|
||||||
"scopeguard",
|
"scopeguard",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "lock_api"
|
||||||
|
version = "0.4.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109"
|
||||||
|
dependencies = [
|
||||||
|
"scopeguard",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "log"
|
name = "log"
|
||||||
version = "0.4.14"
|
version = "0.4.14"
|
||||||
|
@ -1129,11 +1162,22 @@ version = "0.9.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252"
|
checksum = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"lock_api",
|
"lock_api 0.3.4",
|
||||||
"parking_lot_core",
|
"parking_lot_core 0.6.2",
|
||||||
"rustc_version",
|
"rustc_version",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "parking_lot"
|
||||||
|
version = "0.11.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
|
||||||
|
dependencies = [
|
||||||
|
"instant",
|
||||||
|
"lock_api 0.4.5",
|
||||||
|
"parking_lot_core 0.8.5",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "parking_lot_core"
|
name = "parking_lot_core"
|
||||||
version = "0.6.2"
|
version = "0.6.2"
|
||||||
|
@ -1143,9 +1187,23 @@ dependencies = [
|
||||||
"cfg-if 0.1.10",
|
"cfg-if 0.1.10",
|
||||||
"cloudabi",
|
"cloudabi",
|
||||||
"libc",
|
"libc",
|
||||||
"redox_syscall",
|
"redox_syscall 0.1.57",
|
||||||
"rustc_version",
|
"rustc_version",
|
||||||
"smallvec",
|
"smallvec 0.6.14",
|
||||||
|
"winapi 0.3.9",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "parking_lot_core"
|
||||||
|
version = "0.8.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if 1.0.0",
|
||||||
|
"instant",
|
||||||
|
"libc",
|
||||||
|
"redox_syscall 0.2.10",
|
||||||
|
"smallvec 1.8.0",
|
||||||
"winapi 0.3.9",
|
"winapi 0.3.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -1285,6 +1343,15 @@ version = "0.1.57"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
|
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "redox_syscall"
|
||||||
|
version = "0.2.10"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex"
|
name = "regex"
|
||||||
version = "1.5.4"
|
version = "1.5.4"
|
||||||
|
@ -1359,6 +1426,12 @@ version = "0.1.21"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
|
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rustc-hash"
|
||||||
|
version = "1.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustc_version"
|
name = "rustc_version"
|
||||||
version = "0.2.3"
|
version = "0.2.3"
|
||||||
|
@ -1567,6 +1640,12 @@ dependencies = [
|
||||||
"maybe-uninit",
|
"maybe-uninit",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "smallvec"
|
||||||
|
version = "1.8.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "socket2"
|
name = "socket2"
|
||||||
version = "0.4.2"
|
version = "0.4.2"
|
||||||
|
@ -1721,6 +1800,12 @@ dependencies = [
|
||||||
"unicode-width",
|
"unicode-width",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "thousands"
|
||||||
|
version = "0.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3bf63baf9f5039dadc247375c29eb13706706cfde997d0330d05aa63a77d8820"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "time"
|
name = "time"
|
||||||
version = "0.1.43"
|
version = "0.1.43"
|
||||||
|
@ -1886,7 +1971,7 @@ dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"mio 0.6.23",
|
"mio 0.6.23",
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
"parking_lot",
|
"parking_lot 0.9.0",
|
||||||
"slab",
|
"slab",
|
||||||
"tokio-executor",
|
"tokio-executor",
|
||||||
"tokio-io",
|
"tokio-io",
|
||||||
|
@ -2007,6 +2092,7 @@ dependencies = [
|
||||||
"async-compression",
|
"async-compression",
|
||||||
"bytes 1.1.0",
|
"bytes 1.1.0",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
"dhat",
|
||||||
"envy",
|
"envy",
|
||||||
"futures 0.3.18",
|
"futures 0.3.18",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
|
|
@ -35,3 +35,10 @@ accept-encoding-fork = "0.2.0-alpha.3"
|
||||||
async-compression = { version = "0.3", features = ["tokio", "gzip", "zstd", "deflate", "brotli"] }
|
async-compression = { version = "0.3", features = ["tokio", "gzip", "zstd", "deflate", "brotli"] }
|
||||||
tokio-util = { version = "0.6", features = ["io"] }
|
tokio-util = { version = "0.6", features = ["io"] }
|
||||||
uuid = { version = "0.8.2", features = ["v4"] }
|
uuid = { version = "0.8.2", features = ["v4"] }
|
||||||
|
dhat = { version = "0.3", optional = true }
|
||||||
|
|
||||||
|
[profile.release]
|
||||||
|
debug = 1
|
||||||
|
|
||||||
|
[features]
|
||||||
|
dhat-heap = [ "dhat" ]
|
||||||
|
|
|
@ -4,7 +4,7 @@ use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use futures::TryFutureExt;
|
use futures::{FutureExt, TryFutureExt};
|
||||||
use log::*;
|
use log::*;
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
|
@ -16,7 +16,6 @@ use rustls::sign::CertifiedKey;
|
||||||
|
|
||||||
use crate::cert::{Cert, CertSer};
|
use crate::cert::{Cert, CertSer};
|
||||||
use crate::consul::*;
|
use crate::consul::*;
|
||||||
use crate::exit_on_err;
|
|
||||||
use crate::proxy_config::*;
|
use crate::proxy_config::*;
|
||||||
|
|
||||||
pub struct CertStore {
|
pub struct CertStore {
|
||||||
|
@ -33,6 +32,7 @@ impl CertStore {
|
||||||
consul: Consul,
|
consul: Consul,
|
||||||
rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
|
rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
|
||||||
letsencrypt_email: String,
|
letsencrypt_email: String,
|
||||||
|
exit_on_err: impl Fn(anyhow::Error) + Send + 'static,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
@ -45,7 +45,13 @@ impl CertStore {
|
||||||
tx_need_cert: tx,
|
tx_need_cert: tx,
|
||||||
});
|
});
|
||||||
|
|
||||||
tokio::spawn(cert_store.clone().certificate_loop(rx).map_err(exit_on_err));
|
tokio::spawn(
|
||||||
|
cert_store
|
||||||
|
.clone()
|
||||||
|
.certificate_loop(rx)
|
||||||
|
.map_err(exit_on_err)
|
||||||
|
.then(|_| async { info!("Certificate renewal task exited") }),
|
||||||
|
);
|
||||||
|
|
||||||
cert_store
|
cert_store
|
||||||
}
|
}
|
||||||
|
|
11
src/http.rs
11
src/http.rs
|
@ -4,6 +4,7 @@ use std::sync::Arc;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use log::*;
|
use log::*;
|
||||||
|
|
||||||
|
use futures::future::Future;
|
||||||
use http::uri::Authority;
|
use http::uri::Authority;
|
||||||
use hyper::service::{make_service_fn, service_fn};
|
use hyper::service::{make_service_fn, service_fn};
|
||||||
use hyper::{Body, Request, Response, Server, StatusCode, Uri};
|
use hyper::{Body, Request, Response, Server, StatusCode, Uri};
|
||||||
|
@ -12,7 +13,11 @@ use crate::consul::Consul;
|
||||||
|
|
||||||
const CHALLENGE_PREFIX: &str = "/.well-known/acme-challenge/";
|
const CHALLENGE_PREFIX: &str = "/.well-known/acme-challenge/";
|
||||||
|
|
||||||
pub async fn serve_http(bind_addr: SocketAddr, consul: Consul) -> Result<()> {
|
pub async fn serve_http(
|
||||||
|
bind_addr: SocketAddr,
|
||||||
|
consul: Consul,
|
||||||
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
|
) -> Result<()> {
|
||||||
let consul = Arc::new(consul);
|
let consul = Arc::new(consul);
|
||||||
// For every connection, we must make a `Service` to handle all
|
// For every connection, we must make a `Service` to handle all
|
||||||
// incoming HTTP requests on said connection.
|
// incoming HTTP requests on said connection.
|
||||||
|
@ -30,7 +35,9 @@ pub async fn serve_http(bind_addr: SocketAddr, consul: Consul) -> Result<()> {
|
||||||
});
|
});
|
||||||
|
|
||||||
info!("Listening on http://{}", bind_addr);
|
info!("Listening on http://{}", bind_addr);
|
||||||
let server = Server::bind(&bind_addr).serve(make_svc);
|
let server = Server::bind(&bind_addr)
|
||||||
|
.serve(make_svc)
|
||||||
|
.with_graceful_shutdown(shutdown_signal);
|
||||||
|
|
||||||
server.await?;
|
server.await?;
|
||||||
|
|
||||||
|
|
42
src/https.rs
42
src/https.rs
|
@ -7,6 +7,7 @@ use log::*;
|
||||||
|
|
||||||
use accept_encoding_fork::Encoding;
|
use accept_encoding_fork::Encoding;
|
||||||
use async_compression::tokio::bufread::*;
|
use async_compression::tokio::bufread::*;
|
||||||
|
use futures::stream::FuturesUnordered;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
use http::header::{HeaderName, HeaderValue};
|
use http::header::{HeaderName, HeaderValue};
|
||||||
|
@ -15,6 +16,7 @@ use hyper::server::conn::Http;
|
||||||
use hyper::service::service_fn;
|
use hyper::service::service_fn;
|
||||||
use hyper::{header, Body, Request, Response, StatusCode};
|
use hyper::{header, Body, Request, Response, StatusCode};
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
|
use tokio::select;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
use tokio_rustls::TlsAcceptor;
|
use tokio_rustls::TlsAcceptor;
|
||||||
use tokio_util::io::{ReaderStream, StreamReader};
|
use tokio_util::io::{ReaderStream, StreamReader};
|
||||||
|
@ -33,6 +35,7 @@ pub async fn serve_https(
|
||||||
config: HttpsConfig,
|
config: HttpsConfig,
|
||||||
cert_store: Arc<CertStore>,
|
cert_store: Arc<CertStore>,
|
||||||
rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
|
rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
|
||||||
|
mut must_exit: watch::Receiver<bool>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let config = Arc::new(config);
|
let config = Arc::new(config);
|
||||||
|
|
||||||
|
@ -47,28 +50,43 @@ pub async fn serve_https(
|
||||||
info!("Starting to serve on https://{}.", config.bind_addr);
|
info!("Starting to serve on https://{}.", config.bind_addr);
|
||||||
|
|
||||||
let tcp = TcpListener::bind(config.bind_addr).await?;
|
let tcp = TcpListener::bind(config.bind_addr).await?;
|
||||||
loop {
|
let mut connections = FuturesUnordered::new();
|
||||||
let (socket, remote_addr) = tcp.accept().await?;
|
|
||||||
|
while !*must_exit.borrow() {
|
||||||
|
let (socket, remote_addr) = select! {
|
||||||
|
a = tcp.accept() => a?,
|
||||||
|
_ = connections.next() => continue,
|
||||||
|
_ = must_exit.changed() => continue,
|
||||||
|
};
|
||||||
|
|
||||||
let rx_proxy_config = rx_proxy_config.clone();
|
let rx_proxy_config = rx_proxy_config.clone();
|
||||||
let tls_acceptor = tls_acceptor.clone();
|
let tls_acceptor = tls_acceptor.clone();
|
||||||
let config = config.clone();
|
let config = config.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
let mut must_exit_2 = must_exit.clone();
|
||||||
|
let conn = tokio::spawn(async move {
|
||||||
match tls_acceptor.accept(socket).await {
|
match tls_acceptor.accept(socket).await {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
debug!("TLS handshake was successfull");
|
debug!("TLS handshake was successfull");
|
||||||
let http_result = Http::new()
|
let http_conn = Http::new().serve_connection(
|
||||||
.serve_connection(
|
|
||||||
stream,
|
stream,
|
||||||
service_fn(move |req: Request<Body>| {
|
service_fn(move |req: Request<Body>| {
|
||||||
let https_config = config.clone();
|
let https_config = config.clone();
|
||||||
let proxy_config: Arc<ProxyConfig> =
|
let proxy_config: Arc<ProxyConfig> = rx_proxy_config.borrow().clone();
|
||||||
rx_proxy_config.borrow().clone();
|
|
||||||
handle_outer(remote_addr, req, https_config, proxy_config)
|
handle_outer(remote_addr, req, https_config, proxy_config)
|
||||||
}),
|
}),
|
||||||
|
);
|
||||||
|
tokio::pin!(http_conn);
|
||||||
|
let http_result = loop {
|
||||||
|
select! (
|
||||||
|
r = &mut http_conn => break r,
|
||||||
|
_ = must_exit_2.changed() => {
|
||||||
|
if *must_exit_2.borrow() {
|
||||||
|
http_conn.as_mut().graceful_shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
)
|
)
|
||||||
.await;
|
};
|
||||||
if let Err(http_err) = http_result {
|
if let Err(http_err) = http_result {
|
||||||
warn!("HTTP error: {}", http_err);
|
warn!("HTTP error: {}", http_err);
|
||||||
}
|
}
|
||||||
|
@ -76,7 +94,15 @@ pub async fn serve_https(
|
||||||
Err(e) => warn!("Error in TLS connection: {}", e),
|
Err(e) => warn!("Error in TLS connection: {}", e),
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
connections.push(conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!("HTTPS server shutting down, draining remaining connections...");
|
||||||
|
while !connections.is_empty() {
|
||||||
|
let _ = connections.next().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_outer(
|
async fn handle_outer(
|
||||||
|
|
95
src/main.rs
95
src/main.rs
|
@ -1,9 +1,14 @@
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate anyhow;
|
extern crate anyhow;
|
||||||
|
|
||||||
use futures::TryFutureExt;
|
use log::*;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use futures::{FutureExt, TryFutureExt};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
use tokio::select;
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
mod cert;
|
mod cert;
|
||||||
mod cert_store;
|
mod cert_store;
|
||||||
|
@ -14,7 +19,11 @@ mod proxy_config;
|
||||||
mod reverse_proxy;
|
mod reverse_proxy;
|
||||||
mod tls_util;
|
mod tls_util;
|
||||||
|
|
||||||
use log::*;
|
use proxy_config::ProxyConfig;
|
||||||
|
|
||||||
|
#[cfg(feature = "dhat-heap")]
|
||||||
|
#[global_allocator]
|
||||||
|
static ALLOC: dhat::Alloc = dhat::Alloc;
|
||||||
|
|
||||||
#[derive(StructOpt, Debug)]
|
#[derive(StructOpt, Debug)]
|
||||||
#[structopt(name = "tricot")]
|
#[structopt(name = "tricot")]
|
||||||
|
@ -86,6 +95,9 @@ struct Opt {
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
#[cfg(feature = "dhat-heap")]
|
||||||
|
let _profiler = dhat::Profiler::new_heap();
|
||||||
|
|
||||||
if std::env::var("RUST_LOG").is_err() {
|
if std::env::var("RUST_LOG").is_err() {
|
||||||
std::env::set_var("RUST_LOG", "tricot=info")
|
std::env::set_var("RUST_LOG", "tricot=info")
|
||||||
}
|
}
|
||||||
|
@ -101,6 +113,12 @@ async fn main() {
|
||||||
|
|
||||||
info!("Starting Tricot");
|
info!("Starting Tricot");
|
||||||
|
|
||||||
|
let (exit_signal, provoke_exit) = watch_ctrl_c();
|
||||||
|
let exit_on_err = move |err: anyhow::Error| {
|
||||||
|
error!("Error: {}", err);
|
||||||
|
let _ = provoke_exit.send(true);
|
||||||
|
};
|
||||||
|
|
||||||
let consul_config = consul::ConsulConfig {
|
let consul_config = consul::ConsulConfig {
|
||||||
addr: opt.consul_addr.clone(),
|
addr: opt.consul_addr.clone(),
|
||||||
ca_cert: opt.consul_ca_cert.clone(),
|
ca_cert: opt.consul_ca_cert.clone(),
|
||||||
|
@ -110,15 +128,25 @@ async fn main() {
|
||||||
|
|
||||||
let consul = consul::Consul::new(consul_config, &opt.consul_kv_prefix, &opt.node_name)
|
let consul = consul::Consul::new(consul_config, &opt.consul_kv_prefix, &opt.node_name)
|
||||||
.expect("Error creating Consul client");
|
.expect("Error creating Consul client");
|
||||||
let mut rx_proxy_config = proxy_config::spawn_proxy_config_task(consul.clone());
|
let rx_proxy_config =
|
||||||
|
proxy_config::spawn_proxy_config_task(consul.clone(), exit_signal.clone());
|
||||||
|
|
||||||
let cert_store = cert_store::CertStore::new(
|
let cert_store = cert_store::CertStore::new(
|
||||||
consul.clone(),
|
consul.clone(),
|
||||||
rx_proxy_config.clone(),
|
rx_proxy_config.clone(),
|
||||||
opt.letsencrypt_email.clone(),
|
opt.letsencrypt_email.clone(),
|
||||||
|
exit_on_err.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
tokio::spawn(http::serve_http(opt.http_bind_addr, consul.clone()).map_err(exit_on_err));
|
let http_task = tokio::spawn(
|
||||||
|
http::serve_http(
|
||||||
|
opt.http_bind_addr,
|
||||||
|
consul.clone(),
|
||||||
|
wait_from(exit_signal.clone()),
|
||||||
|
)
|
||||||
|
.map_err(exit_on_err.clone())
|
||||||
|
.then(|_| async { info!("HTTP server exited") }),
|
||||||
|
);
|
||||||
|
|
||||||
let https_config = https::HttpsConfig {
|
let https_config = https::HttpsConfig {
|
||||||
bind_addr: opt.https_bind_addr,
|
bind_addr: opt.https_bind_addr,
|
||||||
|
@ -129,12 +157,38 @@ async fn main() {
|
||||||
.map(|x| x.to_string())
|
.map(|x| x.to_string())
|
||||||
.collect(),
|
.collect(),
|
||||||
};
|
};
|
||||||
tokio::spawn(
|
|
||||||
https::serve_https(https_config, cert_store.clone(), rx_proxy_config.clone())
|
let https_task = tokio::spawn(
|
||||||
.map_err(exit_on_err),
|
https::serve_https(
|
||||||
|
https_config,
|
||||||
|
cert_store.clone(),
|
||||||
|
rx_proxy_config.clone(),
|
||||||
|
exit_signal.clone(),
|
||||||
|
)
|
||||||
|
.map_err(exit_on_err.clone())
|
||||||
|
.then(|_| async { info!("HTTPS server exited") }),
|
||||||
);
|
);
|
||||||
|
|
||||||
while rx_proxy_config.changed().await.is_ok() {
|
let dump_task = tokio::spawn(dump_config_on_change(rx_proxy_config, exit_signal.clone()));
|
||||||
|
|
||||||
|
let _ = http_task.await.expect("Tokio task await failure");
|
||||||
|
let _ = https_task.await.expect("Tokio task await failure");
|
||||||
|
let _ = dump_task.await.expect("Tokio task await failure");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn dump_config_on_change(
|
||||||
|
mut rx_proxy_config: watch::Receiver<Arc<ProxyConfig>>,
|
||||||
|
mut must_exit: watch::Receiver<bool>,
|
||||||
|
) {
|
||||||
|
while !*must_exit.borrow() {
|
||||||
|
select!(
|
||||||
|
c = rx_proxy_config.changed() => {
|
||||||
|
if !c.is_ok() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = must_exit.changed() => continue,
|
||||||
|
);
|
||||||
println!("---- PROXY CONFIGURATION ----");
|
println!("---- PROXY CONFIGURATION ----");
|
||||||
for ent in rx_proxy_config.borrow().entries.iter() {
|
for ent in rx_proxy_config.borrow().entries.iter() {
|
||||||
println!(" {}", ent);
|
println!(" {}", ent);
|
||||||
|
@ -143,7 +197,26 @@ async fn main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn exit_on_err(e: anyhow::Error) {
|
/// Creates a watch that contains `false`, and that changes
|
||||||
error!("{}", e);
|
/// to `true` when a Ctrl+C signal is received.
|
||||||
std::process::exit(1);
|
pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) {
|
||||||
|
let (send_cancel, watch_cancel) = watch::channel(false);
|
||||||
|
let send_cancel = Arc::new(send_cancel);
|
||||||
|
let send_cancel_2 = send_cancel.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
tokio::signal::ctrl_c()
|
||||||
|
.await
|
||||||
|
.expect("failed to install CTRL+C signal handler");
|
||||||
|
info!("Received CTRL+C, shutting down.");
|
||||||
|
send_cancel.send(true).unwrap();
|
||||||
|
});
|
||||||
|
(watch_cancel, send_cancel_2)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_from(mut chan: watch::Receiver<bool>) {
|
||||||
|
while !*chan.borrow() {
|
||||||
|
if chan.changed().await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ use futures::future::BoxFuture;
|
||||||
use futures::stream::{FuturesUnordered, StreamExt};
|
use futures::stream::{FuturesUnordered, StreamExt};
|
||||||
|
|
||||||
use log::*;
|
use log::*;
|
||||||
use tokio::{sync::watch, time::sleep};
|
use tokio::{select, sync::watch, time::sleep};
|
||||||
|
|
||||||
use crate::consul::*;
|
use crate::consul::*;
|
||||||
|
|
||||||
|
@ -231,7 +231,10 @@ struct NodeWatchState {
|
||||||
retries: u32,
|
retries: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfig>> {
|
pub fn spawn_proxy_config_task(
|
||||||
|
consul: Consul,
|
||||||
|
mut must_exit: watch::Receiver<bool>,
|
||||||
|
) -> watch::Receiver<Arc<ProxyConfig>> {
|
||||||
let (tx, rx) = watch::channel(Arc::new(ProxyConfig {
|
let (tx, rx) = watch::channel(Arc::new(ProxyConfig {
|
||||||
entries: Vec::new(),
|
entries: Vec::new(),
|
||||||
}));
|
}));
|
||||||
|
@ -244,8 +247,13 @@ pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfi
|
||||||
|
|
||||||
let mut node_site = HashMap::new();
|
let mut node_site = HashMap::new();
|
||||||
|
|
||||||
loop {
|
while !*must_exit.borrow() {
|
||||||
match consul.list_nodes().await {
|
let list_nodes = select! {
|
||||||
|
ln = consul.list_nodes() => ln,
|
||||||
|
_ = must_exit.changed() => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
match list_nodes {
|
||||||
Ok(consul_nodes) => {
|
Ok(consul_nodes) => {
|
||||||
info!("Watched consul nodes: {:?}", consul_nodes);
|
info!("Watched consul nodes: {:?}", consul_nodes);
|
||||||
for consul_node in consul_nodes {
|
for consul_node in consul_nodes {
|
||||||
|
@ -271,7 +279,12 @@ pub fn spawn_proxy_config_task(consul: Consul) -> watch::Receiver<Arc<ProxyConfi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let (node, res): (String, Result<_>) = match watches.next().await {
|
let next_watch = select! {
|
||||||
|
nw = watches.next() => nw,
|
||||||
|
_ = must_exit.changed() => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (node, res): (String, Result<_>) = match next_watch {
|
||||||
Some(v) => v,
|
Some(v) => v,
|
||||||
None => {
|
None => {
|
||||||
warn!("No nodes currently watched in proxy_config.rs");
|
warn!("No nodes currently watched in proxy_config.rs");
|
||||||
|
|
Loading…
Reference in a new issue