api servers: kill opened connections after SIGINT after 10s deadline (fix #806) #864

Merged
lx merged 1 commit from exit-deadline into main 2024-08-25 18:34:56 +00:00

View file

@ -2,6 +2,7 @@ use std::convert::Infallible;
use std::fs::{self, Permissions}; use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
@ -19,6 +20,7 @@ use hyper_util::rt::TokioIo;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
use tokio::sync::watch; use tokio::sync::watch;
use tokio::time::{sleep_until, Instant};
use opentelemetry::{ use opentelemetry::{
global, global,
@ -291,7 +293,7 @@ where
let connection_collector = tokio::spawn({ let connection_collector = tokio::spawn({
let server_name = server_name.clone(); let server_name = server_name.clone();
async move { async move {
let mut connections = FuturesUnordered::new(); let mut connections = FuturesUnordered::<tokio::task::JoinHandle<()>>::new();
loop { loop {
let collect_next = async { let collect_next = async {
if connections.is_empty() { if connections.is_empty() {
@ -312,23 +314,34 @@ where
} }
} }
} }
if !connections.is_empty() { let deadline = Instant::now() + Duration::from_secs(10);
while !connections.is_empty() {
info!( info!(
"{} server: {} connections still open", "{} server: {} connections still open, deadline in {:.2}s",
server_name, server_name,
connections.len() connections.len(),
(deadline - Instant::now()).as_secs_f32(),
); );
while let Some(conn_res) = connections.next().await { tokio::select! {
trace!( conn_res = connections.next() => {
"{} server: HTTP connection finished: {:?}", trace!(
server_name, "{} server: HTTP connection finished: {:?}",
conn_res server_name,
); conn_res.unwrap(),
info!( );
"{} server: {} connections still open", }
server_name, _ = sleep_until(deadline) => {
connections.len() warn!("{} server: exit deadline reached with {} connections still open, killing them now",
); server_name,
connections.len());
for conn in connections.iter() {
conn.abort();
}
for conn in connections {
assert!(conn.await.unwrap_err().is_cancelled());
}
break;
}
} }
} }
} }