K2V: Fix #305 #306

Merged
lx merged 2 commits from fix-k2v-305 into main 2022-05-17 11:10:39 +00:00
Showing only changes of commit 7b474855e3 - Show all commits

View file

@ -6,7 +6,9 @@ use std::time::Duration;
use futures::future::*; use futures::future::*;
use futures::select; use futures::select;
use tokio::sync::{mpsc, watch, Mutex}; use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
use crate::error::Error; use crate::error::Error;
@ -30,26 +32,31 @@ impl BackgroundRunner {
let stop_signal_2 = stop_signal.clone(); let stop_signal_2 = stop_signal.clone();
let await_all_done = tokio::spawn(async move { let await_all_done = tokio::spawn(async move {
let mut workers = FuturesUnordered::new();
let mut shutdown_timer = 0;
loop { loop {
let wkr = { let closed = match worker_out.try_recv() {
select! { Ok(wkr) => {
item = worker_out.recv().fuse() => { workers.push(wkr);
match item { false
Some(x) => x,
None => break,
}
}
_ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
if *stop_signal_2.borrow() {
break;
} else {
continue;
}
}
} }
Err(TryRecvError::Empty) => false,
Err(TryRecvError::Disconnected) => true,
}; };
if let Err(e) = wkr.await { select! {
error!("Error while awaiting for worker: {}", e); res = workers.next() => {
if let Some(Err(e)) = res {
error!("Worker exited with error: {}", e);
}
}
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
if closed || *stop_signal_2.borrow() {
shutdown_timer += 1;
if shutdown_timer >= 10 {
break;
}
}
}
} }
} }
}); });