api servers: kill opened connections after SIGINT after 10s deadline (fix #806) #864
1 changed files with 28 additions and 15 deletions
|
@ -2,6 +2,7 @@ use std::convert::Infallible;
|
||||||
use std::fs::{self, Permissions};
|
use std::fs::{self, Permissions};
|
||||||
use std::os::unix::fs::PermissionsExt;
|
use std::os::unix::fs::PermissionsExt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
@ -19,6 +20,7 @@ use hyper_util::rt::TokioIo;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
|
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
use tokio::time::{sleep_until, Instant};
|
||||||
|
|
||||||
use opentelemetry::{
|
use opentelemetry::{
|
||||||
global,
|
global,
|
||||||
|
@ -291,7 +293,7 @@ where
|
||||||
let connection_collector = tokio::spawn({
|
let connection_collector = tokio::spawn({
|
||||||
let server_name = server_name.clone();
|
let server_name = server_name.clone();
|
||||||
async move {
|
async move {
|
||||||
let mut connections = FuturesUnordered::new();
|
let mut connections = FuturesUnordered::<tokio::task::JoinHandle<()>>::new();
|
||||||
loop {
|
loop {
|
||||||
let collect_next = async {
|
let collect_next = async {
|
||||||
if connections.is_empty() {
|
if connections.is_empty() {
|
||||||
|
@ -312,23 +314,34 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !connections.is_empty() {
|
let deadline = Instant::now() + Duration::from_secs(10);
|
||||||
|
while !connections.is_empty() {
|
||||||
info!(
|
info!(
|
||||||
"{} server: {} connections still open",
|
"{} server: {} connections still open, deadline in {:.2}s",
|
||||||
server_name,
|
server_name,
|
||||||
connections.len()
|
connections.len(),
|
||||||
|
(deadline - Instant::now()).as_secs_f32(),
|
||||||
);
|
);
|
||||||
while let Some(conn_res) = connections.next().await {
|
tokio::select! {
|
||||||
trace!(
|
conn_res = connections.next() => {
|
||||||
"{} server: HTTP connection finished: {:?}",
|
trace!(
|
||||||
server_name,
|
"{} server: HTTP connection finished: {:?}",
|
||||||
conn_res
|
server_name,
|
||||||
);
|
conn_res.unwrap(),
|
||||||
info!(
|
);
|
||||||
"{} server: {} connections still open",
|
}
|
||||||
server_name,
|
_ = sleep_until(deadline) => {
|
||||||
connections.len()
|
warn!("{} server: exit deadline reached with {} connections still open, killing them now",
|
||||||
);
|
server_name,
|
||||||
|
connections.len());
|
||||||
|
for conn in connections.iter() {
|
||||||
|
conn.abort();
|
||||||
|
}
|
||||||
|
for conn in connections {
|
||||||
|
assert!(conn.await.unwrap_err().is_cancelled());
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue