api servers: kill opened connections after SIGINT after 10s deadline (fix #806) #864

Merged
lx merged 1 commit from exit-deadline into main 2024-08-25 18:34:56 +00:00

View file

@ -2,6 +2,7 @@ use std::convert::Infallible;
use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
@ -19,6 +20,7 @@ use hyper_util::rt::TokioIo;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
use tokio::sync::watch;
use tokio::time::{sleep_until, Instant};
use opentelemetry::{
global,
@ -291,7 +293,7 @@ where
let connection_collector = tokio::spawn({
let server_name = server_name.clone();
async move {
let mut connections = FuturesUnordered::new();
let mut connections = FuturesUnordered::<tokio::task::JoinHandle<()>>::new();
loop {
let collect_next = async {
if connections.is_empty() {
@ -312,23 +314,34 @@ where
}
}
}
if !connections.is_empty() {
let deadline = Instant::now() + Duration::from_secs(10);
while !connections.is_empty() {
info!(
"{} server: {} connections still open",
"{} server: {} connections still open, deadline in {:.2}s",
server_name,
connections.len()
connections.len(),
(deadline - Instant::now()).as_secs_f32(),
);
while let Some(conn_res) = connections.next().await {
tokio::select! {
conn_res = connections.next() => {
trace!(
"{} server: HTTP connection finished: {:?}",
server_name,
conn_res
conn_res.unwrap(),
);
info!(
"{} server: {} connections still open",
}
_ = sleep_until(deadline) => {
warn!("{} server: exit deadline reached with {} connections still open, killing them now",
server_name,
connections.len()
);
connections.len());
for conn in connections.iter() {
conn.abort();
}
for conn in connections {
assert!(conn.await.unwrap_err().is_cancelled());
}
break;
}
}
}
}