diff --git a/src/block/manager.rs b/src/block/manager.rs index 32ba0431..8a131270 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -9,9 +9,9 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use futures::future::*; -use futures::select; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::select; use tokio::sync::{watch, Mutex, Notify}; use opentelemetry::{ @@ -22,6 +22,7 @@ use opentelemetry::{ use garage_db as db; use garage_db::counted_tree_hack::CountedTree; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; @@ -110,6 +111,12 @@ pub struct BlockManager { // it INSIDE a Mutex. struct BlockManagerLocked(); +enum BlockIterResult { + BusyDidSomething, + BusyDidNothing, + IdleFor(Duration), +} + impl BlockManager { pub fn new( db: &db::Db, @@ -557,11 +564,14 @@ impl BlockManager { fn spawn_background_worker(self: Arc) { // Launch a background workers for background resync loop processing let background = self.system.background.clone(); + let worker = BlockResyncWorker { + manager: self, + tranquilizer: Tranquilizer::new(30), + next_delay: Duration::from_secs(10), + }; tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(10)).await; - background.spawn_worker("block resync worker".into(), move |must_exit| { - self.resync_loop(must_exit) - }); + background.spawn_worker(worker); }); } @@ -579,37 +589,7 @@ impl BlockManager { Ok(()) } - async fn resync_loop(self: Arc, mut must_exit: watch::Receiver) { - let mut tranquilizer = Tranquilizer::new(30); - - while !*must_exit.borrow() { - match self.resync_iter(&mut must_exit).await { - Ok(true) => { - tranquilizer.tranquilize(self.background_tranquility).await; - } - Ok(false) => { - tranquilizer.reset(); - } - Err(e) => { - // The errors that we have here are only Sled errors - // We don't really know how to handle them so just ¯\_(ツ)_/¯ - // (there is kind of an assumption that Sled won't error on us, - // if it does there is not much we can do -- TODO should we just panic?) - error!( - "Could not do a resync iteration: {} (this is a very bad error)", - e - ); - tranquilizer.reset(); - } - } - } - } - - // The result of resync_iter is: - // - Ok(true) -> a block was processed (successfully or not) - // - Ok(false) -> no block was processed, but we are ready for the next iteration - // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors - async fn resync_iter(&self, must_exit: &mut watch::Receiver) -> Result { + async fn resync_iter(&self) -> Result { if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? { let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let now = now_msec(); @@ -629,7 +609,7 @@ impl BlockManager { // (we want to do the remove after the insert to ensure // that the item is not lost if we crash in-between) self.resync_queue.remove(time_bytes)?; - return Ok(false); + return Ok(BlockIterResult::BusyDidNothing); } } @@ -676,15 +656,11 @@ impl BlockManager { self.resync_queue.remove(time_bytes)?; } - Ok(true) + Ok(BlockIterResult::BusyDidSomething) } else { - let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); - select! { - _ = delay.fuse() => {}, - _ = self.resync_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - Ok(false) + Ok(BlockIterResult::IdleFor(Duration::from_millis( + time_msec - now, + ))) } } else { // Here we wait either for a notification that an item has been @@ -693,13 +669,7 @@ impl BlockManager { // between the time we checked the queue and the first poll // to resync_notify.notified(): if that happens, we'll just loop // back 10 seconds later, which is fine. - let delay = tokio::time::sleep(Duration::from_secs(10)); - select! { - _ = delay.fuse() => {}, - _ = self.resync_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - Ok(false) + Ok(BlockIterResult::IdleFor(Duration::from_secs(10))) } } @@ -898,6 +868,58 @@ impl EndpointHandler for BlockManager { } } +struct BlockResyncWorker { + manager: Arc, + tranquilizer: Tranquilizer, + next_delay: Duration, +} + +#[async_trait] +impl Worker for BlockResyncWorker { + fn name(&self) -> String { + "Block resync worker".into() + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver, + ) -> Result { + self.tranquilizer.reset(); + match self.manager.resync_iter().await { + Ok(BlockIterResult::BusyDidSomething) => { + self.tranquilizer + .tranquilize(self.manager.background_tranquility) + .await; + Ok(WorkerStatus::Busy) + } + Ok(BlockIterResult::BusyDidNothing) => Ok(WorkerStatus::Busy), + Ok(BlockIterResult::IdleFor(delay)) => { + self.next_delay = delay; + Ok(WorkerStatus::Idle) + } + Err(e) => { + // The errors that we have here are only Sled errors + // We don't really know how to handle them so just ¯\_(ツ)_/¯ + // (there is kind of an assumption that Sled won't error on us, + // if it does there is not much we can do -- TODO should we just panic?) + error!( + "Could not do a resync iteration: {} (this is a very bad error)", + e + ); + Err(e.into()) + } + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerStatus { + select! { + _ = tokio::time::sleep(self.next_delay) => (), + _ = self.manager.resync_notify.notified() => (), + }; + WorkerStatus::Busy + } +} + struct BlockStatus { exists: bool, needed: RcEntry,