From 28c015d9ffcb3255295572465654fdca680b1964 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Thu, 24 Jun 2021 01:34:28 +0200 Subject: [PATCH 1/2] add cli parameter to verify local bloc integrity reuse code for listing local blocks add disk i/o speed limit on integrity check --- src/garage/cli/structs.rs | 7 +++ src/garage/repair.rs | 8 +++ src/model/block.rs | 121 +++++++++++++++++++++++++++++--------- 3 files changed, 107 insertions(+), 29 deletions(-) diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index f134cd49..588900a3 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -282,6 +282,13 @@ pub enum RepairWhat { /// Only redo the propagation of version deletions to the block ref table (extremely slow) #[structopt(name = "block_refs")] BlockRefs, + /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) + #[structopt(name = "blocks_integrity")] + BlockIntegrity { + /// Limit on i/o speed, in B/s + #[structopt(name = "limit")] + limit: Option, + }, } #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 8200f1f0..a67bf2e5 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -63,6 +63,14 @@ impl Repair { .await?; } + if let Some(RepairWhat::BlockIntegrity { limit }) = opt.what { + info!("Verifying integrity of stored blocks"); + self.garage + .block_manager + .verify_data_store_integrity(&must_exit, limit) + .await?; + } + Ok(()) } diff --git a/src/model/block.rs b/src/model/block.rs index d1ea1512..c43c0b97 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; +use tokio::time::Instant; use garage_util::data::*; use garage_util::error::Error; @@ -197,10 +198,15 @@ impl BlockManager { } // 2. Repair blocks actually on disk - self.repair_aux_read_dir_rec(&self.data_dir, must_exit) - .await?; - - Ok(()) + // Lists all blocks on disk and adds them to the resync queue. + // This allows us to find blocks we are storing but don't actually need, + // so that we can offload them if necessary and then delete them locally. + self.for_each_file( + (), + move |_, hash| async move { self.put_to_resync(&hash, Duration::from_secs(0)) }, + must_exit, + ) + .await } /// Get lenght of resync queue @@ -485,50 +491,107 @@ impl BlockManager { Ok(()) } - fn repair_aux_read_dir_rec<'a>( + async fn for_each_file( + &self, + state: State, + mut f: F, + must_exit: &watch::Receiver, + ) -> Result<(), Error> + where + F: FnMut(State, Hash) -> Fut + Send, + Fut: Future> + Send, + State: Send, + { + self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit) + .await + .map(|_| ()) + } + + fn for_each_file_rec<'a, F, Fut, State>( &'a self, path: &'a Path, + mut state: State, + f: &'a mut F, must_exit: &'a watch::Receiver, - ) -> BoxFuture<'a, Result<(), Error>> { - // Lists all blocks on disk and adds them to the resync queue. - // This allows us to find blocks we are storing but don't actually need, - // so that we can offload them if necessary and then delete them locally. + ) -> BoxFuture<'a, Result> + where + F: FnMut(State, Hash) -> Fut + Send, + Fut: Future> + Send, + State: Send + 'a, + { async move { let mut ls_data_dir = fs::read_dir(path).await?; - loop { - let data_dir_ent = ls_data_dir.next_entry().await?; - let data_dir_ent = match data_dir_ent { - Some(x) => x, - None => break, - }; + while let Some(data_dir_ent) = ls_data_dir.next_entry().await? { + if *must_exit.borrow() { + break; + } + let name = data_dir_ent.file_name(); - let name = match name.into_string() { - Ok(x) => x, - Err(_) => continue, + let name = if let Ok(n) = name.into_string() { + n + } else { + continue; }; let ent_type = data_dir_ent.file_type().await?; if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { - self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit) + state = self + .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit) .await?; } else if name.len() == 64 { - let hash_bytes = match hex::decode(&name) { - Ok(h) => h, - Err(_) => continue, + let hash_bytes = if let Ok(h) = hex::decode(&name) { + h + } else { + continue; }; let mut hash = [0u8; 32]; hash.copy_from_slice(&hash_bytes[..]); - self.put_to_resync(&hash.into(), Duration::from_secs(0))?; - } - - if *must_exit.borrow() { - break; + state = f(state, hash.into()).await?; } } - Ok(()) + Ok(state) } .boxed() } + + /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by + /// this function. + pub async fn verify_data_store_integrity( + &self, + must_exit: &watch::Receiver, + speed_limit: Option, + ) -> Result<(), Error> { + let last_refill = Instant::now(); + let token_left = speed_limit.unwrap_or(0); + self.for_each_file( + (last_refill, token_left), + move |(last_refill, token_left), hash| { + async move { + let len = match self.read_block(&hash).await { + Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(), + Ok(_) => unreachable!(), + Err(_) => 0, // resync and warn message made by read_block if necessary + }; + + if let Some(speed_limit) = speed_limit { + // throttling logic + if let Some(t) = token_left.checked_sub(len) { + // token bucket not empty yet + Ok((last_refill, t)) + } else { + // token bucket empty. Sleep and refill + tokio::time::sleep_until(last_refill + Duration::from_secs(1)).await; + Ok((Instant::now(), speed_limit)) + } + } else { + Ok((last_refill, token_left)) // actually not used + } + } + }, + must_exit, + ) + .await + } } #[async_trait] @@ -598,7 +661,7 @@ impl BlockManagerLocked { ); let path = mgr.block_path(hash); let mut path2 = path.clone(); - path2.set_extension(".corrupted"); + path2.set_extension("corrupted"); fs::rename(path, path2).await?; mgr.put_to_resync(hash, Duration::from_millis(0))?; Ok(()) -- 2.43.4 From 6b47c294f570b141a2349d5b6da537c0b64d165d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Oct 2021 10:36:04 +0200 Subject: [PATCH 2/2] Refactoring on repair commands --- src/garage/cli/structs.rs | 6 +-- src/garage/repair.rs | 80 +++++++++++++++------------------------ src/model/block.rs | 70 +++++++++++++++------------------- src/util/lib.rs | 1 + src/util/token_bucket.rs | 40 ++++++++++++++++++++ 5 files changed, 105 insertions(+), 92 deletions(-) create mode 100644 src/util/token_bucket.rs diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 588900a3..620be9ef 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -265,7 +265,7 @@ pub struct RepairOpt { pub yes: bool, #[structopt(subcommand)] - pub what: Option, + pub what: RepairWhat, } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] @@ -283,8 +283,8 @@ pub enum RepairWhat { #[structopt(name = "block_refs")] BlockRefs, /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) - #[structopt(name = "blocks_integrity")] - BlockIntegrity { + #[structopt(name = "scrub")] + Scrub { /// Limit on i/o speed, in B/s #[structopt(name = "limit")] limit: Option, diff --git a/src/garage/repair.rs b/src/garage/repair.rs index a67bf2e5..bfe7bf84 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -27,50 +27,38 @@ impl Repair { opt: RepairOpt, must_exit: watch::Receiver, ) -> Result<(), Error> { - let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true); - - if todo(RepairWhat::Tables) { - info!("Launching a full sync of tables"); - self.garage.bucket_table.syncer.add_full_sync(); - self.garage.object_table.syncer.add_full_sync(); - self.garage.version_table.syncer.add_full_sync(); - self.garage.block_ref_table.syncer.add_full_sync(); - self.garage.key_table.syncer.add_full_sync(); + match opt.what { + RepairWhat::Tables => { + info!("Launching a full sync of tables"); + self.garage.bucket_table.syncer.add_full_sync(); + self.garage.object_table.syncer.add_full_sync(); + self.garage.version_table.syncer.add_full_sync(); + self.garage.block_ref_table.syncer.add_full_sync(); + self.garage.key_table.syncer.add_full_sync(); + } + RepairWhat::Versions => { + info!("Repairing the versions table"); + self.repair_versions(&must_exit).await?; + } + RepairWhat::BlockRefs => { + info!("Repairing the block refs table"); + self.repair_block_ref(&must_exit).await?; + } + RepairWhat::Blocks => { + info!("Repairing the stored blocks"); + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + } + RepairWhat::Scrub { limit } => { + info!("Verifying integrity of stored blocks"); + self.garage + .block_manager + .scrub_data_store(&must_exit, limit) + .await?; + } } - - // TODO: wait for full sync to finish before proceeding to the rest? - - if todo(RepairWhat::Versions) { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - - if todo(RepairWhat::BlockRefs) { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - - if opt.what.is_none() { - info!("Repairing the RC"); - self.repair_rc(&must_exit).await?; - } - - if todo(RepairWhat::Blocks) { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - - if let Some(RepairWhat::BlockIntegrity { limit }) = opt.what { - info!("Verifying integrity of stored blocks"); - self.garage - .block_manager - .verify_data_store_integrity(&must_exit, limit) - .await?; - } - Ok(()) } @@ -158,10 +146,4 @@ impl Repair { } Ok(()) } - - async fn repair_rc(&self, _must_exit: &watch::Receiver) -> Result<(), Error> { - // TODO - warn!("repair_rc: not implemented"); - Ok(()) - } } diff --git a/src/model/block.rs b/src/model/block.rs index c43c0b97..35d3871a 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -10,11 +10,11 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; -use tokio::time::Instant; use garage_util::data::*; use garage_util::error::Error; use garage_util::time::*; +use garage_util::token_bucket::TokenBucket; use garage_rpc::system::System; use garage_rpc::*; @@ -209,6 +209,35 @@ impl BlockManager { .await } + /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by + /// this function. + pub async fn scrub_data_store( + &self, + must_exit: &watch::Receiver, + speed_limit: Option, + ) -> Result<(), Error> { + let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64)); + self.for_each_file( + token_bucket, + move |mut token_bucket, hash| { + async move { + let len = match self.read_block(&hash).await { + Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(), + Ok(_) => unreachable!(), + Err(_) => 0, // resync and warn message made by read_block if necessary + }; + + if let Some(tb) = &mut token_bucket { + tb.take(len as u64).await; + } + Ok(token_bucket) + } + }, + must_exit, + ) + .await + } + /// Get lenght of resync queue pub fn resync_queue_len(&self) -> usize { self.resync_queue.len() @@ -553,45 +582,6 @@ impl BlockManager { } .boxed() } - - /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by - /// this function. - pub async fn verify_data_store_integrity( - &self, - must_exit: &watch::Receiver, - speed_limit: Option, - ) -> Result<(), Error> { - let last_refill = Instant::now(); - let token_left = speed_limit.unwrap_or(0); - self.for_each_file( - (last_refill, token_left), - move |(last_refill, token_left), hash| { - async move { - let len = match self.read_block(&hash).await { - Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(), - Ok(_) => unreachable!(), - Err(_) => 0, // resync and warn message made by read_block if necessary - }; - - if let Some(speed_limit) = speed_limit { - // throttling logic - if let Some(t) = token_left.checked_sub(len) { - // token bucket not empty yet - Ok((last_refill, t)) - } else { - // token bucket empty. Sleep and refill - tokio::time::sleep_until(last_refill + Duration::from_secs(1)).await; - Ok((Instant::now(), speed_limit)) - } - } else { - Ok((last_refill, token_left)) // actually not used - } - } - }, - must_exit, - ) - .await - } } #[async_trait] diff --git a/src/util/lib.rs b/src/util/lib.rs index c080e3a3..e2e01785 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -9,3 +9,4 @@ pub mod data; pub mod error; pub mod persister; pub mod time; +pub mod token_bucket; diff --git a/src/util/token_bucket.rs b/src/util/token_bucket.rs new file mode 100644 index 00000000..cc0dfa1f --- /dev/null +++ b/src/util/token_bucket.rs @@ -0,0 +1,40 @@ +use std::time::{Duration, Instant}; + +use tokio::time::sleep; + +pub struct TokenBucket { + // Replenish rate: number of tokens per second + replenish_rate: u64, + // Current number of tokens + tokens: u64, + // Last replenish time + last_replenish: Instant, +} + +impl TokenBucket { + pub fn new(replenish_rate: u64) -> Self { + Self { + replenish_rate, + tokens: 0, + last_replenish: Instant::now(), + } + } + + pub async fn take(&mut self, tokens: u64) { + while self.tokens < tokens { + let needed = tokens - self.tokens; + let delay = (needed as f64) / (self.replenish_rate as f64); + sleep(Duration::from_secs_f64(delay)).await; + self.replenish(); + } + self.tokens -= tokens; + } + + pub fn replenish(&mut self) { + let now = Instant::now(); + let new_tokens = + ((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64; + self.tokens += new_tokens; + self.last_replenish = now; + } +} -- 2.43.4