diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index f134cd49..620be9ef 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -265,7 +265,7 @@ pub struct RepairOpt { pub yes: bool, #[structopt(subcommand)] - pub what: Option, + pub what: RepairWhat, } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] @@ -282,6 +282,13 @@ pub enum RepairWhat { /// Only redo the propagation of version deletions to the block ref table (extremely slow) #[structopt(name = "block_refs")] BlockRefs, + /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) + #[structopt(name = "scrub")] + Scrub { + /// Limit on i/o speed, in B/s + #[structopt(name = "limit")] + limit: Option, + }, } #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 8200f1f0..bfe7bf84 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -27,42 +27,38 @@ impl Repair { opt: RepairOpt, must_exit: watch::Receiver, ) -> Result<(), Error> { - let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true); - - if todo(RepairWhat::Tables) { - info!("Launching a full sync of tables"); - self.garage.bucket_table.syncer.add_full_sync(); - self.garage.object_table.syncer.add_full_sync(); - self.garage.version_table.syncer.add_full_sync(); - self.garage.block_ref_table.syncer.add_full_sync(); - self.garage.key_table.syncer.add_full_sync(); + match opt.what { + RepairWhat::Tables => { + info!("Launching a full sync of tables"); + self.garage.bucket_table.syncer.add_full_sync(); + self.garage.object_table.syncer.add_full_sync(); + self.garage.version_table.syncer.add_full_sync(); + self.garage.block_ref_table.syncer.add_full_sync(); + self.garage.key_table.syncer.add_full_sync(); + } + RepairWhat::Versions => { + info!("Repairing the versions table"); + self.repair_versions(&must_exit).await?; + } + RepairWhat::BlockRefs => { + info!("Repairing the block refs table"); + self.repair_block_ref(&must_exit).await?; + } + RepairWhat::Blocks => { + info!("Repairing the stored blocks"); + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + } + RepairWhat::Scrub { limit } => { + info!("Verifying integrity of stored blocks"); + self.garage + .block_manager + .scrub_data_store(&must_exit, limit) + .await?; + } } - - // TODO: wait for full sync to finish before proceeding to the rest? - - if todo(RepairWhat::Versions) { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - - if todo(RepairWhat::BlockRefs) { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - - if opt.what.is_none() { - info!("Repairing the RC"); - self.repair_rc(&must_exit).await?; - } - - if todo(RepairWhat::Blocks) { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - Ok(()) } @@ -150,10 +146,4 @@ impl Repair { } Ok(()) } - - async fn repair_rc(&self, _must_exit: &watch::Receiver) -> Result<(), Error> { - // TODO - warn!("repair_rc: not implemented"); - Ok(()) - } } diff --git a/src/model/block.rs b/src/model/block.rs index d1ea1512..35d3871a 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -14,6 +14,7 @@ use tokio::sync::{watch, Mutex, Notify}; use garage_util::data::*; use garage_util::error::Error; use garage_util::time::*; +use garage_util::token_bucket::TokenBucket; use garage_rpc::system::System; use garage_rpc::*; @@ -197,10 +198,44 @@ impl BlockManager { } // 2. Repair blocks actually on disk - self.repair_aux_read_dir_rec(&self.data_dir, must_exit) - .await?; + // Lists all blocks on disk and adds them to the resync queue. + // This allows us to find blocks we are storing but don't actually need, + // so that we can offload them if necessary and then delete them locally. + self.for_each_file( + (), + move |_, hash| async move { self.put_to_resync(&hash, Duration::from_secs(0)) }, + must_exit, + ) + .await + } - Ok(()) + /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by + /// this function. + pub async fn scrub_data_store( + &self, + must_exit: &watch::Receiver, + speed_limit: Option, + ) -> Result<(), Error> { + let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64)); + self.for_each_file( + token_bucket, + move |mut token_bucket, hash| { + async move { + let len = match self.read_block(&hash).await { + Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(), + Ok(_) => unreachable!(), + Err(_) => 0, // resync and warn message made by read_block if necessary + }; + + if let Some(tb) = &mut token_bucket { + tb.take(len as u64).await; + } + Ok(token_bucket) + } + }, + must_exit, + ) + .await } /// Get lenght of resync queue @@ -485,47 +520,65 @@ impl BlockManager { Ok(()) } - fn repair_aux_read_dir_rec<'a>( + async fn for_each_file( + &self, + state: State, + mut f: F, + must_exit: &watch::Receiver, + ) -> Result<(), Error> + where + F: FnMut(State, Hash) -> Fut + Send, + Fut: Future> + Send, + State: Send, + { + self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit) + .await + .map(|_| ()) + } + + fn for_each_file_rec<'a, F, Fut, State>( &'a self, path: &'a Path, + mut state: State, + f: &'a mut F, must_exit: &'a watch::Receiver, - ) -> BoxFuture<'a, Result<(), Error>> { - // Lists all blocks on disk and adds them to the resync queue. - // This allows us to find blocks we are storing but don't actually need, - // so that we can offload them if necessary and then delete them locally. + ) -> BoxFuture<'a, Result> + where + F: FnMut(State, Hash) -> Fut + Send, + Fut: Future> + Send, + State: Send + 'a, + { async move { let mut ls_data_dir = fs::read_dir(path).await?; - loop { - let data_dir_ent = ls_data_dir.next_entry().await?; - let data_dir_ent = match data_dir_ent { - Some(x) => x, - None => break, - }; + while let Some(data_dir_ent) = ls_data_dir.next_entry().await? { + if *must_exit.borrow() { + break; + } + let name = data_dir_ent.file_name(); - let name = match name.into_string() { - Ok(x) => x, - Err(_) => continue, + let name = if let Ok(n) = name.into_string() { + n + } else { + continue; }; let ent_type = data_dir_ent.file_type().await?; if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { - self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit) + state = self + .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit) .await?; } else if name.len() == 64 { - let hash_bytes = match hex::decode(&name) { - Ok(h) => h, - Err(_) => continue, + let hash_bytes = if let Ok(h) = hex::decode(&name) { + h + } else { + continue; }; let mut hash = [0u8; 32]; hash.copy_from_slice(&hash_bytes[..]); - self.put_to_resync(&hash.into(), Duration::from_secs(0))?; - } - - if *must_exit.borrow() { - break; + state = f(state, hash.into()).await?; } } - Ok(()) + Ok(state) } .boxed() } @@ -598,7 +651,7 @@ impl BlockManagerLocked { ); let path = mgr.block_path(hash); let mut path2 = path.clone(); - path2.set_extension(".corrupted"); + path2.set_extension("corrupted"); fs::rename(path, path2).await?; mgr.put_to_resync(hash, Duration::from_millis(0))?; Ok(()) diff --git a/src/util/lib.rs b/src/util/lib.rs index c080e3a3..e2e01785 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -9,3 +9,4 @@ pub mod data; pub mod error; pub mod persister; pub mod time; +pub mod token_bucket; diff --git a/src/util/token_bucket.rs b/src/util/token_bucket.rs new file mode 100644 index 00000000..cc0dfa1f --- /dev/null +++ b/src/util/token_bucket.rs @@ -0,0 +1,40 @@ +use std::time::{Duration, Instant}; + +use tokio::time::sleep; + +pub struct TokenBucket { + // Replenish rate: number of tokens per second + replenish_rate: u64, + // Current number of tokens + tokens: u64, + // Last replenish time + last_replenish: Instant, +} + +impl TokenBucket { + pub fn new(replenish_rate: u64) -> Self { + Self { + replenish_rate, + tokens: 0, + last_replenish: Instant::now(), + } + } + + pub async fn take(&mut self, tokens: u64) { + while self.tokens < tokens { + let needed = tokens - self.tokens; + let delay = (needed as f64) / (self.replenish_rate as f64); + sleep(Duration::from_secs_f64(delay)).await; + self.replenish(); + } + self.tokens -= tokens; + } + + pub fn replenish(&mut self) { + let now = Instant::now(); + let new_tokens = + ((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64; + self.tokens += new_tokens; + self.last_replenish = now; + } +}