Compare commits

..

No commits in common. "6b47c294f570b141a2349d5b6da537c0b64d165d" and "4e8af1d95645447ee879c9681775c35db1764d58" have entirely different histories.

5 changed files with 70 additions and 161 deletions

View file

@ -265,7 +265,7 @@ pub struct RepairOpt {
pub yes: bool, pub yes: bool,
#[structopt(subcommand)] #[structopt(subcommand)]
pub what: RepairWhat, pub what: Option<RepairWhat>,
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
@ -282,13 +282,6 @@ pub enum RepairWhat {
/// Only redo the propagation of version deletions to the block ref table (extremely slow) /// Only redo the propagation of version deletions to the block ref table (extremely slow)
#[structopt(name = "block_refs")] #[structopt(name = "block_refs")]
BlockRefs, BlockRefs,
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
#[structopt(name = "scrub")]
Scrub {
/// Limit on i/o speed, in B/s
#[structopt(name = "limit")]
limit: Option<usize>,
},
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]

View file

@ -27,38 +27,42 @@ impl Repair {
opt: RepairOpt, opt: RepairOpt,
must_exit: watch::Receiver<bool>, must_exit: watch::Receiver<bool>,
) -> Result<(), Error> { ) -> Result<(), Error> {
match opt.what { let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true);
RepairWhat::Tables => {
info!("Launching a full sync of tables"); if todo(RepairWhat::Tables) {
self.garage.bucket_table.syncer.add_full_sync(); info!("Launching a full sync of tables");
self.garage.object_table.syncer.add_full_sync(); self.garage.bucket_table.syncer.add_full_sync();
self.garage.version_table.syncer.add_full_sync(); self.garage.object_table.syncer.add_full_sync();
self.garage.block_ref_table.syncer.add_full_sync(); self.garage.version_table.syncer.add_full_sync();
self.garage.key_table.syncer.add_full_sync(); self.garage.block_ref_table.syncer.add_full_sync();
} self.garage.key_table.syncer.add_full_sync();
RepairWhat::Versions => {
info!("Repairing the versions table");
self.repair_versions(&must_exit).await?;
}
RepairWhat::BlockRefs => {
info!("Repairing the block refs table");
self.repair_block_ref(&must_exit).await?;
}
RepairWhat::Blocks => {
info!("Repairing the stored blocks");
self.garage
.block_manager
.repair_data_store(&must_exit)
.await?;
}
RepairWhat::Scrub { limit } => {
info!("Verifying integrity of stored blocks");
self.garage
.block_manager
.scrub_data_store(&must_exit, limit)
.await?;
}
} }
// TODO: wait for full sync to finish before proceeding to the rest?
if todo(RepairWhat::Versions) {
info!("Repairing the versions table");
self.repair_versions(&must_exit).await?;
}
if todo(RepairWhat::BlockRefs) {
info!("Repairing the block refs table");
self.repair_block_ref(&must_exit).await?;
}
if opt.what.is_none() {
info!("Repairing the RC");
self.repair_rc(&must_exit).await?;
}
if todo(RepairWhat::Blocks) {
info!("Repairing the stored blocks");
self.garage
.block_manager
.repair_data_store(&must_exit)
.await?;
}
Ok(()) Ok(())
} }
@ -146,4 +150,10 @@ impl Repair {
} }
Ok(()) Ok(())
} }
async fn repair_rc(&self, _must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// TODO
warn!("repair_rc: not implemented");
Ok(())
}
} }

View file

@ -14,7 +14,6 @@ use tokio::sync::{watch, Mutex, Notify};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_util::time::*; use garage_util::time::*;
use garage_util::token_bucket::TokenBucket;
use garage_rpc::system::System; use garage_rpc::system::System;
use garage_rpc::*; use garage_rpc::*;
@ -198,44 +197,10 @@ impl BlockManager {
} }
// 2. Repair blocks actually on disk // 2. Repair blocks actually on disk
// Lists all blocks on disk and adds them to the resync queue. self.repair_aux_read_dir_rec(&self.data_dir, must_exit)
// This allows us to find blocks we are storing but don't actually need, .await?;
// so that we can offload them if necessary and then delete them locally.
self.for_each_file(
(),
move |_, hash| async move { self.put_to_resync(&hash, Duration::from_secs(0)) },
must_exit,
)
.await
}
/// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by Ok(())
/// this function.
pub async fn scrub_data_store(
&self,
must_exit: &watch::Receiver<bool>,
speed_limit: Option<usize>,
) -> Result<(), Error> {
let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64));
self.for_each_file(
token_bucket,
move |mut token_bucket, hash| {
async move {
let len = match self.read_block(&hash).await {
Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
Ok(_) => unreachable!(),
Err(_) => 0, // resync and warn message made by read_block if necessary
};
if let Some(tb) = &mut token_bucket {
tb.take(len as u64).await;
}
Ok(token_bucket)
}
},
must_exit,
)
.await
} }
/// Get lenght of resync queue /// Get lenght of resync queue
@ -520,65 +485,47 @@ impl BlockManager {
Ok(()) Ok(())
} }
async fn for_each_file<F, Fut, State>( fn repair_aux_read_dir_rec<'a>(
&self,
state: State,
mut f: F,
must_exit: &watch::Receiver<bool>,
) -> Result<(), Error>
where
F: FnMut(State, Hash) -> Fut + Send,
Fut: Future<Output = Result<State, Error>> + Send,
State: Send,
{
self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
.await
.map(|_| ())
}
fn for_each_file_rec<'a, F, Fut, State>(
&'a self, &'a self,
path: &'a Path, path: &'a Path,
mut state: State,
f: &'a mut F,
must_exit: &'a watch::Receiver<bool>, must_exit: &'a watch::Receiver<bool>,
) -> BoxFuture<'a, Result<State, Error>> ) -> BoxFuture<'a, Result<(), Error>> {
where // Lists all blocks on disk and adds them to the resync queue.
F: FnMut(State, Hash) -> Fut + Send, // This allows us to find blocks we are storing but don't actually need,
Fut: Future<Output = Result<State, Error>> + Send, // so that we can offload them if necessary and then delete them locally.
State: Send + 'a,
{
async move { async move {
let mut ls_data_dir = fs::read_dir(path).await?; let mut ls_data_dir = fs::read_dir(path).await?;
while let Some(data_dir_ent) = ls_data_dir.next_entry().await? { loop {
if *must_exit.borrow() { let data_dir_ent = ls_data_dir.next_entry().await?;
break; let data_dir_ent = match data_dir_ent {
} Some(x) => x,
None => break,
};
let name = data_dir_ent.file_name(); let name = data_dir_ent.file_name();
let name = if let Ok(n) = name.into_string() { let name = match name.into_string() {
n Ok(x) => x,
} else { Err(_) => continue,
continue;
}; };
let ent_type = data_dir_ent.file_type().await?; let ent_type = data_dir_ent.file_type().await?;
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
state = self self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit)
.for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
.await?; .await?;
} else if name.len() == 64 { } else if name.len() == 64 {
let hash_bytes = if let Ok(h) = hex::decode(&name) { let hash_bytes = match hex::decode(&name) {
h Ok(h) => h,
} else { Err(_) => continue,
continue;
}; };
let mut hash = [0u8; 32]; let mut hash = [0u8; 32];
hash.copy_from_slice(&hash_bytes[..]); hash.copy_from_slice(&hash_bytes[..]);
state = f(state, hash.into()).await?; self.put_to_resync(&hash.into(), Duration::from_secs(0))?;
}
if *must_exit.borrow() {
break;
} }
} }
Ok(state) Ok(())
} }
.boxed() .boxed()
} }
@ -651,7 +598,7 @@ impl BlockManagerLocked {
); );
let path = mgr.block_path(hash); let path = mgr.block_path(hash);
let mut path2 = path.clone(); let mut path2 = path.clone();
path2.set_extension("corrupted"); path2.set_extension(".corrupted");
fs::rename(path, path2).await?; fs::rename(path, path2).await?;
mgr.put_to_resync(hash, Duration::from_millis(0))?; mgr.put_to_resync(hash, Duration::from_millis(0))?;
Ok(()) Ok(())

View file

@ -9,4 +9,3 @@ pub mod data;
pub mod error; pub mod error;
pub mod persister; pub mod persister;
pub mod time; pub mod time;
pub mod token_bucket;

View file

@ -1,40 +0,0 @@
use std::time::{Duration, Instant};
use tokio::time::sleep;
pub struct TokenBucket {
// Replenish rate: number of tokens per second
replenish_rate: u64,
// Current number of tokens
tokens: u64,
// Last replenish time
last_replenish: Instant,
}
impl TokenBucket {
pub fn new(replenish_rate: u64) -> Self {
Self {
replenish_rate,
tokens: 0,
last_replenish: Instant::now(),
}
}
pub async fn take(&mut self, tokens: u64) {
while self.tokens < tokens {
let needed = tokens - self.tokens;
let delay = (needed as f64) / (self.replenish_rate as f64);
sleep(Duration::from_secs_f64(delay)).await;
self.replenish();
}
self.tokens -= tokens;
}
pub fn replenish(&mut self) {
let now = Instant::now();
let new_tokens =
((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64;
self.tokens += new_tokens;
self.last_replenish = now;
}
}