diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 620be9e..0df6ef8 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -285,9 +285,9 @@ pub enum RepairWhat { /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) #[structopt(name = "scrub")] Scrub { - /// Limit on i/o speed, in B/s - #[structopt(name = "limit")] - limit: Option, + /// Tranquility factor (see tranquilizer documentation) + #[structopt(name = "tranquility", default_value = "2")] + tranquility: u32, }, } diff --git a/src/garage/repair.rs b/src/garage/repair.rs index bfe7bf8..a786f1f 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -51,11 +51,11 @@ impl Repair { .repair_data_store(&must_exit) .await?; } - RepairWhat::Scrub { limit } => { + RepairWhat::Scrub { tranquility } => { info!("Verifying integrity of stored blocks"); self.garage .block_manager - .scrub_data_store(&must_exit, limit) + .scrub_data_store(&must_exit, tranquility) .await?; } } diff --git a/src/model/block.rs b/src/model/block.rs index 0800291..406abf7 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -14,7 +14,7 @@ use tokio::sync::{watch, Mutex, Notify}; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; -use garage_util::token_bucket::TokenBucket; +use garage_util::tranquilizer::Tranquilizer; use garage_rpc::system::System; use garage_rpc::*; @@ -29,6 +29,7 @@ use crate::garage::Garage; pub const INLINE_THRESHOLD: usize = 3072; pub const BACKGROUND_WORKERS: u64 = 1; +pub const BACKGROUND_TRANQUILITY: u32 = 3; const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); @@ -214,24 +215,15 @@ impl BlockManager { pub async fn scrub_data_store( &self, must_exit: &watch::Receiver, - speed_limit: Option, + tranquility: u32, ) -> Result<(), Error> { - let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64)); + let tranquilizer = Tranquilizer::new(30); self.for_each_file( - token_bucket, - move |mut token_bucket, hash| { - async move { - let len = match self.read_block(&hash).await { - Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(), - Ok(_) => unreachable!(), - Err(_) => 0, // resync and warn message made by read_block if necessary - }; - - if let Some(tb) = &mut token_bucket { - tb.take(len as u64).await; - } - Ok(token_bucket) - } + tranquilizer, + move |mut tranquilizer, hash| async move { + let _ = self.read_block(&hash).await; + tranquilizer.tranquilize(tranquility).await; + Ok(tranquilizer) }, must_exit, ) @@ -381,18 +373,32 @@ impl BlockManager { } async fn resync_loop(self: Arc, mut must_exit: watch::Receiver) { + let mut tranquilizer = Tranquilizer::new(30); + while !*must_exit.borrow() { - if let Err(e) = self.resync_iter(&mut must_exit).await { - warn!("Error in block resync loop: {}", e); - select! { - _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {}, - _ = must_exit.changed().fuse() => {}, + match self.resync_iter(&mut must_exit).await { + Ok(true) => { + tranquilizer.tranquilize(BACKGROUND_TRANQUILITY).await; + } + Ok(false) => { + tranquilizer.reset(); + } + Err(e) => { + // The errors that we have here are only Sled errors + // We don't really know how to handle them so just ¯\_(ツ)_/¯ + // (there is kind of an assumption that Sled won't error on us, + // if it does there is not much we can do -- TODO should we just panic?) + error!( + "Could not do a resync iteration: {} (this is a very bad error)", + e + ); + tranquilizer.reset(); } } } } - async fn resync_iter(&self, must_exit: &mut watch::Receiver) -> Result<(), Error> { + async fn resync_iter(&self, must_exit: &mut watch::Receiver) -> Result { if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { let time_msec = u64_from_be_bytes(&time_bytes[0..8]); let now = now_msec(); @@ -403,7 +409,7 @@ impl BlockManager { warn!("Error when resyncing {:?}: {}", hash, e); self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; } - res?; // propagate error to delay main loop + Ok(true) } else { self.resync_queue.insert(time_bytes, hash_bytes)?; let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); @@ -412,14 +418,15 @@ impl BlockManager { _ = self.resync_notify.notified().fuse() => {}, _ = must_exit.changed().fuse() => {}, } + Ok(false) } } else { select! { _ = self.resync_notify.notified().fuse() => {}, _ = must_exit.changed().fuse() => {}, } + Ok(false) } - Ok(()) } async fn resync_block(&self, hash: &Hash) -> Result<(), Error> { diff --git a/src/util/lib.rs b/src/util/lib.rs index e2e0178..478b9ea 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -10,3 +10,4 @@ pub mod error; pub mod persister; pub mod time; pub mod token_bucket; +pub mod tranquilizer; diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs new file mode 100644 index 0000000..2871138 --- /dev/null +++ b/src/util/tranquilizer.rs @@ -0,0 +1,57 @@ +use std::collections::VecDeque; +use std::time::{Duration, Instant}; + +use tokio::time::sleep; + +/// A tranquilizer is a helper object that is used to make +/// background operations not take up too much time. +/// +/// Background operations are done in a loop that does the following: +/// - do one step of the background process +/// - tranquilize, i.e. wait some time to not overload the system +/// +/// The tranquilizer observes how long the steps take, and keeps +/// in memory a number of observations. The tranquilize operation +/// simply sleeps k * avg(observed step times), where k is +/// the tranquility factor. For instance with a tranquility of 2, +/// the tranquilizer will sleep on average 2 units of time for every +/// 1 unit of time spent doing the background task. +pub struct Tranquilizer { + n_observations: usize, + observations: VecDeque, + sum_observations: Duration, + last_step_begin: Instant, +} + +impl Tranquilizer { + pub fn new(n_observations: usize) -> Self { + Self { + n_observations, + observations: VecDeque::with_capacity(n_observations + 1), + sum_observations: Duration::ZERO, + last_step_begin: Instant::now(), + } + } + + pub async fn tranquilize(&mut self, tranquility: u32) { + let observation = Instant::now() - self.last_step_begin; + + self.observations.push_back(observation); + self.sum_observations += observation; + + while self.observations.len() > self.n_observations { + self.sum_observations -= self.observations.pop_front().unwrap(); + } + + if !self.observations.is_empty() { + let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32); + sleep(delay).await; + } + + self.reset(); + } + + pub fn reset(&mut self) { + self.last_step_begin = Instant::now(); + } +}