api servers: kill opened connections after SIGINT after 10s deadline (fix #806) #864
1 changed files with 28 additions and 15 deletions
|
@ -2,6 +2,7 @@ use std::convert::Infallible;
|
|||
use std::fs::{self, Permissions};
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
|
@ -19,6 +20,7 @@ use hyper_util::rt::TokioIo;
|
|||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
|
||||
use tokio::sync::watch;
|
||||
use tokio::time::{sleep_until, Instant};
|
||||
|
||||
use opentelemetry::{
|
||||
global,
|
||||
|
@ -291,7 +293,7 @@ where
|
|||
let connection_collector = tokio::spawn({
|
||||
let server_name = server_name.clone();
|
||||
async move {
|
||||
let mut connections = FuturesUnordered::new();
|
||||
let mut connections = FuturesUnordered::<tokio::task::JoinHandle<()>>::new();
|
||||
loop {
|
||||
let collect_next = async {
|
||||
if connections.is_empty() {
|
||||
|
@ -312,23 +314,34 @@ where
|
|||
}
|
||||
}
|
||||
}
|
||||
if !connections.is_empty() {
|
||||
let deadline = Instant::now() + Duration::from_secs(10);
|
||||
while !connections.is_empty() {
|
||||
info!(
|
||||
"{} server: {} connections still open",
|
||||
"{} server: {} connections still open, deadline in {:.2}s",
|
||||
server_name,
|
||||
connections.len()
|
||||
connections.len(),
|
||||
(deadline - Instant::now()).as_secs_f32(),
|
||||
);
|
||||
while let Some(conn_res) = connections.next().await {
|
||||
tokio::select! {
|
||||
conn_res = connections.next() => {
|
||||
trace!(
|
||||
"{} server: HTTP connection finished: {:?}",
|
||||
server_name,
|
||||
conn_res
|
||||
conn_res.unwrap(),
|
||||
);
|
||||
info!(
|
||||
"{} server: {} connections still open",
|
||||
}
|
||||
_ = sleep_until(deadline) => {
|
||||
warn!("{} server: exit deadline reached with {} connections still open, killing them now",
|
||||
server_name,
|
||||
connections.len()
|
||||
);
|
||||
connections.len());
|
||||
for conn in connections.iter() {
|
||||
conn.abort();
|
||||
}
|
||||
for conn in connections {
|
||||
assert!(conn.await.unwrap_err().is_cancelled());
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue