Refactor block resync loop; make workers infaillible
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Alex 2021-03-15 20:09:44 +01:00
parent 667e4e72a8
commit 4d4117f2b4
7 changed files with 49 additions and 47 deletions

View File

@ -20,6 +20,16 @@ impl Repair {
&self,
opt: RepairOpt,
must_exit: watch::Receiver<bool>,
) {
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
warn!("Repair worker failed with error: {}", e);
}
}
async fn repair_worker_aux(
&self,
opt: RepairOpt,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true);

View File

@ -258,46 +258,48 @@ impl BlockManager {
async fn resync_loop(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let mut n_failures = 0usize;
) {
while !*must_exit.borrow() {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
let now = now_msec();
if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
if let Err(e) = self.resync_iter(&mut must_exit).await {
warn!("Error in block resync loop: {}", e);
tokio::time::delay_for(Duration::from_secs(10)).await;
}
}
}
if let Err(e) = self.resync_iter(&hash).await {
warn!("Failed to resync block {:?}, retrying later: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
n_failures += 1;
if n_failures >= 10 {
warn!("Too many resync failures, throttling.");
tokio::time::delay_for(Duration::from_secs(1)).await;
}
} else {
n_failures = 0;
}
} else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
select! {
_ = delay.fuse() => (),
_ = self.resync_notify.notified().fuse() => (),
_ = must_exit.recv().fuse() => (),
}
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
if let Some(first_item) = self.resync_queue.iter().next() {
let (time_bytes, hash_bytes) = first_item?;
let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
let now = now_msec();
if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
let res = self.resync_block(&hash).await;
if let Err(e) = &res {
warn!("Error when resyncing {:?}: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
}
self.resync_queue.remove(&time_bytes)?;
res?; // propagate error to delay main loop
} else {
let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
select! {
_ = delay.fuse() => (),
_ = self.resync_notify.notified().fuse() => (),
_ = must_exit.recv().fuse() => (),
}
}
} else {
select! {
_ = self.resync_notify.notified().fuse() => (),
_ = must_exit.recv().fuse() => (),
}
}
Ok(())
}
async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> {
async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {
let lock = self.data_dir_lock.lock().await;
let path = self.block_path(hash);

View File

@ -318,9 +318,7 @@ impl System {
let self2 = self.clone();
self.clone()
.background
.spawn_worker(format!("ping loop"), |stop_signal| {
self2.ping_loop(stop_signal).map(Ok)
});
.spawn_worker(format!("ping loop"), |stop_signal| self2.ping_loop(stop_signal));
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
@ -329,7 +327,6 @@ impl System {
.spawn_worker(format!("Consul loop"), |stop_signal| {
self2
.consul_loop(stop_signal, consul_host, consul_service_name)
.map(Ok)
});
}
}

View File

@ -70,7 +70,7 @@ where
gc
}
async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> {
async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
match self.gc_loop_iter().await {
Ok(true) => {
@ -89,7 +89,6 @@ where
_ = must_exit.recv().fuse() => (),
}
}
Ok(())
}
async fn gc_loop_iter(&self) -> Result<bool, Error> {

View File

@ -104,7 +104,7 @@ impl MerkleUpdater {
async fn updater_loop(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
) {
while !*must_exit.borrow() {
if let Some(x) = self.todo.iter().next() {
match x {
@ -131,7 +131,6 @@ impl MerkleUpdater {
}
}
}
Ok(())
}
fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {

View File

@ -136,7 +136,7 @@ where
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
mut busy_rx: mpsc::UnboundedReceiver<bool>,
) -> Result<(), Error> {
) {
let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
let mut nothing_to_do_since = Some(Instant::now());
@ -183,7 +183,6 @@ where
}
}
}
Ok(())
}
pub fn add_full_sync(&self) {
@ -197,11 +196,11 @@ where
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
busy_tx: mpsc::UnboundedSender<bool>,
) -> Result<(), Error> {
) {
while !*must_exit.borrow() {
let task = self.todo.lock().unwrap().pop_task();
if let Some(partition) = task {
busy_tx.send(true)?;
busy_tx.send(true).unwrap();
let res = self
.clone()
.sync_partition(&partition, &mut must_exit)
@ -213,11 +212,10 @@ where
);
}
} else {
busy_tx.send(false)?;
busy_tx.send(false).unwrap();
tokio::time::delay_for(Duration::from_secs(1)).await;
}
}
Ok(())
}
async fn sync_partition(

View File

@ -76,16 +76,13 @@ impl BackgroundRunner {
pub fn spawn_worker<F, T>(&self, name: String, worker: F)
where
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
T: Future<Output = JobOutput> + Send + 'static,
T: Future<Output = ()> + Send + 'static,
{
let mut workers = self.workers.lock().unwrap();
let stop_signal = self.stop_signal.clone();
workers.push(tokio::spawn(async move {
if let Err(e) = worker(stop_signal).await {
error!("Worker stopped with error: {}, error: {}", name, e);
} else {
info!("Worker exited successfully: {}", name);
}
worker(stop_signal).await;
info!("Worker exited: {}", name);
}));
}