Make background runner terminate correctly

This commit is contained in:
Alex 2022-05-05 10:56:44 +02:00
parent 633958c7b1
commit 99fcfa3844
Signed by: lx
GPG key ID: 0E496D15096376BE
2 changed files with 23 additions and 15 deletions

View file

@ -110,6 +110,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
// Remove RPC handlers for system to break reference cycles
garage.system.netapp.drop_all_handlers();
opentelemetry::global::shutdown_tracer_provider();
// Await for netapp RPC system to end
run_system.await?;

View file

@ -6,7 +6,9 @@ use std::time::Duration;
use futures::future::*;
use futures::select;
use tokio::sync::{mpsc, watch, Mutex};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
use crate::error::Error;
@ -30,26 +32,31 @@ impl BackgroundRunner {
let stop_signal_2 = stop_signal.clone();
let await_all_done = tokio::spawn(async move {
let mut workers = FuturesUnordered::new();
let mut shutdown_timer = 0;
loop {
let wkr = {
select! {
item = worker_out.recv().fuse() => {
match item {
Some(x) => x,
None => break,
}
let closed = match worker_out.try_recv() {
Ok(wkr) => {
workers.push(wkr);
false
}
Err(TryRecvError::Empty) => false,
Err(TryRecvError::Disconnected) => true,
};
select! {
res = workers.next() => {
if let Some(Err(e)) = res {
error!("Worker exited with error: {}", e);
}
_ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
if *stop_signal_2.borrow() {
}
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
if closed || *stop_signal_2.borrow() {
shutdown_timer += 1;
if shutdown_timer >= 10 {
break;
} else {
continue;
}
}
}
};
if let Err(e) = wkr.await {
error!("Error while awaiting for worker: {}", e);
}
}
});