Merge pull request 'api servers: kill opened connections after SIGINT after 10s deadline (fix #806)' (#864) from exit-deadline into main
Some checks failed
ci/woodpecker/push/debug Pipeline failed
ci/woodpecker/cron/release/3 Pipeline was successful
ci/woodpecker/cron/debug Pipeline was successful
ci/woodpecker/cron/release/2 Pipeline was successful
ci/woodpecker/cron/release/4 Pipeline was successful
ci/woodpecker/cron/release/1 Pipeline was successful
ci/woodpecker/cron/publish Pipeline was successful

Reviewed-on: #864
This commit is contained in:
Alex 2024-08-25 18:34:55 +00:00
commit 182b2af7e5

View file

@ -2,6 +2,7 @@ use std::convert::Infallible;
use std::fs::{self, Permissions}; use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
@ -19,6 +20,7 @@ use hyper_util::rt::TokioIo;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
use tokio::sync::watch; use tokio::sync::watch;
use tokio::time::{sleep_until, Instant};
use opentelemetry::{ use opentelemetry::{
global, global,
@ -291,7 +293,7 @@ where
let connection_collector = tokio::spawn({ let connection_collector = tokio::spawn({
let server_name = server_name.clone(); let server_name = server_name.clone();
async move { async move {
let mut connections = FuturesUnordered::new(); let mut connections = FuturesUnordered::<tokio::task::JoinHandle<()>>::new();
loop { loop {
let collect_next = async { let collect_next = async {
if connections.is_empty() { if connections.is_empty() {
@ -312,23 +314,34 @@ where
} }
} }
} }
if !connections.is_empty() { let deadline = Instant::now() + Duration::from_secs(10);
while !connections.is_empty() {
info!( info!(
"{} server: {} connections still open", "{} server: {} connections still open, deadline in {:.2}s",
server_name, server_name,
connections.len() connections.len(),
(deadline - Instant::now()).as_secs_f32(),
); );
while let Some(conn_res) = connections.next().await { tokio::select! {
conn_res = connections.next() => {
trace!( trace!(
"{} server: HTTP connection finished: {:?}", "{} server: HTTP connection finished: {:?}",
server_name, server_name,
conn_res conn_res.unwrap(),
); );
info!( }
"{} server: {} connections still open", _ = sleep_until(deadline) => {
warn!("{} server: exit deadline reached with {} connections still open, killing them now",
server_name, server_name,
connections.len() connections.len());
); for conn in connections.iter() {
conn.abort();
}
for conn in connections {
assert!(conn.await.unwrap_err().is_cancelled());
}
break;
}
} }
} }
} }