diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 588900a..620be9e 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -265,7 +265,7 @@ pub struct RepairOpt { pub yes: bool, #[structopt(subcommand)] - pub what: Option, + pub what: RepairWhat, } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] @@ -283,8 +283,8 @@ pub enum RepairWhat { #[structopt(name = "block_refs")] BlockRefs, /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) - #[structopt(name = "blocks_integrity")] - BlockIntegrity { + #[structopt(name = "scrub")] + Scrub { /// Limit on i/o speed, in B/s #[structopt(name = "limit")] limit: Option, diff --git a/src/garage/repair.rs b/src/garage/repair.rs index a67bf2e..bfe7bf8 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -27,50 +27,38 @@ impl Repair { opt: RepairOpt, must_exit: watch::Receiver, ) -> Result<(), Error> { - let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true); - - if todo(RepairWhat::Tables) { - info!("Launching a full sync of tables"); - self.garage.bucket_table.syncer.add_full_sync(); - self.garage.object_table.syncer.add_full_sync(); - self.garage.version_table.syncer.add_full_sync(); - self.garage.block_ref_table.syncer.add_full_sync(); - self.garage.key_table.syncer.add_full_sync(); + match opt.what { + RepairWhat::Tables => { + info!("Launching a full sync of tables"); + self.garage.bucket_table.syncer.add_full_sync(); + self.garage.object_table.syncer.add_full_sync(); + self.garage.version_table.syncer.add_full_sync(); + self.garage.block_ref_table.syncer.add_full_sync(); + self.garage.key_table.syncer.add_full_sync(); + } + RepairWhat::Versions => { + info!("Repairing the versions table"); + self.repair_versions(&must_exit).await?; + } + RepairWhat::BlockRefs => { + info!("Repairing the block refs table"); + self.repair_block_ref(&must_exit).await?; + } + RepairWhat::Blocks => { + info!("Repairing the stored blocks"); + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + } + RepairWhat::Scrub { limit } => { + info!("Verifying integrity of stored blocks"); + self.garage + .block_manager + .scrub_data_store(&must_exit, limit) + .await?; + } } - - // TODO: wait for full sync to finish before proceeding to the rest? - - if todo(RepairWhat::Versions) { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - - if todo(RepairWhat::BlockRefs) { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - - if opt.what.is_none() { - info!("Repairing the RC"); - self.repair_rc(&must_exit).await?; - } - - if todo(RepairWhat::Blocks) { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - - if let Some(RepairWhat::BlockIntegrity { limit }) = opt.what { - info!("Verifying integrity of stored blocks"); - self.garage - .block_manager - .verify_data_store_integrity(&must_exit, limit) - .await?; - } - Ok(()) } @@ -158,10 +146,4 @@ impl Repair { } Ok(()) } - - async fn repair_rc(&self, _must_exit: &watch::Receiver) -> Result<(), Error> { - // TODO - warn!("repair_rc: not implemented"); - Ok(()) - } } diff --git a/src/model/block.rs b/src/model/block.rs index c43c0b9..35d3871 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -10,11 +10,11 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; -use tokio::time::Instant; use garage_util::data::*; use garage_util::error::Error; use garage_util::time::*; +use garage_util::token_bucket::TokenBucket; use garage_rpc::system::System; use garage_rpc::*; @@ -209,6 +209,35 @@ impl BlockManager { .await } + /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by + /// this function. + pub async fn scrub_data_store( + &self, + must_exit: &watch::Receiver, + speed_limit: Option, + ) -> Result<(), Error> { + let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64)); + self.for_each_file( + token_bucket, + move |mut token_bucket, hash| { + async move { + let len = match self.read_block(&hash).await { + Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(), + Ok(_) => unreachable!(), + Err(_) => 0, // resync and warn message made by read_block if necessary + }; + + if let Some(tb) = &mut token_bucket { + tb.take(len as u64).await; + } + Ok(token_bucket) + } + }, + must_exit, + ) + .await + } + /// Get lenght of resync queue pub fn resync_queue_len(&self) -> usize { self.resync_queue.len() @@ -553,45 +582,6 @@ impl BlockManager { } .boxed() } - - /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by - /// this function. - pub async fn verify_data_store_integrity( - &self, - must_exit: &watch::Receiver, - speed_limit: Option, - ) -> Result<(), Error> { - let last_refill = Instant::now(); - let token_left = speed_limit.unwrap_or(0); - self.for_each_file( - (last_refill, token_left), - move |(last_refill, token_left), hash| { - async move { - let len = match self.read_block(&hash).await { - Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(), - Ok(_) => unreachable!(), - Err(_) => 0, // resync and warn message made by read_block if necessary - }; - - if let Some(speed_limit) = speed_limit { - // throttling logic - if let Some(t) = token_left.checked_sub(len) { - // token bucket not empty yet - Ok((last_refill, t)) - } else { - // token bucket empty. Sleep and refill - tokio::time::sleep_until(last_refill + Duration::from_secs(1)).await; - Ok((Instant::now(), speed_limit)) - } - } else { - Ok((last_refill, token_left)) // actually not used - } - } - }, - must_exit, - ) - .await - } } #[async_trait] diff --git a/src/util/lib.rs b/src/util/lib.rs index c080e3a..e2e0178 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -9,3 +9,4 @@ pub mod data; pub mod error; pub mod persister; pub mod time; +pub mod token_bucket; diff --git a/src/util/token_bucket.rs b/src/util/token_bucket.rs new file mode 100644 index 0000000..cc0dfa1 --- /dev/null +++ b/src/util/token_bucket.rs @@ -0,0 +1,40 @@ +use std::time::{Duration, Instant}; + +use tokio::time::sleep; + +pub struct TokenBucket { + // Replenish rate: number of tokens per second + replenish_rate: u64, + // Current number of tokens + tokens: u64, + // Last replenish time + last_replenish: Instant, +} + +impl TokenBucket { + pub fn new(replenish_rate: u64) -> Self { + Self { + replenish_rate, + tokens: 0, + last_replenish: Instant::now(), + } + } + + pub async fn take(&mut self, tokens: u64) { + while self.tokens < tokens { + let needed = tokens - self.tokens; + let delay = (needed as f64) / (self.replenish_rate as f64); + sleep(Duration::from_secs_f64(delay)).await; + self.replenish(); + } + self.tokens -= tokens; + } + + pub fn replenish(&mut self) { + let now = Instant::now(); + let new_tokens = + ((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64; + self.tokens += new_tokens; + self.last_replenish = now; + } +}