Compare commits

..

2 commits

Author SHA1 Message Date
182b2af7e5 Merge pull request 'api servers: kill opened connections after SIGINT after 10s deadline (fix #806)' (#864) from exit-deadline into main
Some checks failed
ci/woodpecker/push/debug Pipeline failed
ci/woodpecker/cron/release/3 Pipeline was successful
ci/woodpecker/cron/debug Pipeline was successful
ci/woodpecker/cron/release/2 Pipeline was successful
ci/woodpecker/cron/release/4 Pipeline was successful
ci/woodpecker/cron/release/1 Pipeline was successful
ci/woodpecker/cron/publish Pipeline was successful
Reviewed-on: #864
2024-08-25 18:34:55 +00:00
baf32c9575
api servers: kill opened connections after SIGINT after 10s deadline (fix #806)
All checks were successful
ci/woodpecker/push/debug Pipeline was successful
ci/woodpecker/pr/debug Pipeline was successful
2024-08-25 20:04:56 +02:00

View file

@ -2,6 +2,7 @@ use std::convert::Infallible;
use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
@ -19,6 +20,7 @@ use hyper_util::rt::TokioIo;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
use tokio::sync::watch;
use tokio::time::{sleep_until, Instant};
use opentelemetry::{
global,
@ -291,7 +293,7 @@ where
let connection_collector = tokio::spawn({
let server_name = server_name.clone();
async move {
let mut connections = FuturesUnordered::new();
let mut connections = FuturesUnordered::<tokio::task::JoinHandle<()>>::new();
loop {
let collect_next = async {
if connections.is_empty() {
@ -312,23 +314,34 @@ where
}
}
}
if !connections.is_empty() {
let deadline = Instant::now() + Duration::from_secs(10);
while !connections.is_empty() {
info!(
"{} server: {} connections still open",
"{} server: {} connections still open, deadline in {:.2}s",
server_name,
connections.len()
connections.len(),
(deadline - Instant::now()).as_secs_f32(),
);
while let Some(conn_res) = connections.next().await {
trace!(
"{} server: HTTP connection finished: {:?}",
server_name,
conn_res
);
info!(
"{} server: {} connections still open",
server_name,
connections.len()
);
tokio::select! {
conn_res = connections.next() => {
trace!(
"{} server: HTTP connection finished: {:?}",
server_name,
conn_res.unwrap(),
);
}
_ = sleep_until(deadline) => {
warn!("{} server: exit deadline reached with {} connections still open, killing them now",
server_name,
connections.len());
for conn in connections.iter() {
conn.abort();
}
for conn in connections {
assert!(conn.await.unwrap_err().is_cancelled());
}
break;
}
}
}
}