add cli command to verify local block integrity #94

Closed
trinity-1686a wants to merge 3 commits from trinity-1686a:cli-verify-integrity into main
3 changed files with 110 additions and 32 deletions

View file

@ -262,7 +262,7 @@ pub struct RepairOpt {
pub all_nodes: bool, pub all_nodes: bool,
/// Confirm the launch of the repair operation /// Confirm the launch of the repair operation
#[structopt(long = "yes")] #[structopt(short = "y", long = "yes")]
pub yes: bool, pub yes: bool,
#[structopt(subcommand)] #[structopt(subcommand)]
@ -283,6 +283,13 @@ pub enum RepairWhat {
/// Only redo the propagation of version deletions to the block ref table (extremely slow) /// Only redo the propagation of version deletions to the block ref table (extremely slow)
#[structopt(name = "block_refs")] #[structopt(name = "block_refs")]
BlockRefs, BlockRefs,
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
#[structopt(name = "blocks_integrity")]
BlockIntegrity {
/// Limit on i/o speed, in B/s
#[structopt(name = "limit")]
limit: Option<usize>,
},
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]

View file

@ -63,6 +63,14 @@ impl Repair {
.await?; .await?;
} }
if let Some(RepairWhat::BlockIntegrity { limit }) = opt.what {
info!("Verifying integrity of stored blocks");
self.garage
.block_manager
.verify_data_store_integrity(&must_exit, limit)
.await?;
}
Ok(()) Ok(())
} }

View file

@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs; use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify}; use tokio::sync::{watch, Mutex, Notify};
use tokio::time::Instant;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
@ -198,7 +199,7 @@ impl BlockManager {
hash hash
); );
let mut path2 = path.clone(); let mut path2 = path.clone();
path2.set_extension(".corrupted"); path2.set_extension("corrupted");
Review

files where getting named <hash>..corrupted

files where getting named `<hash>..corrupted`
fs::rename(path, path2).await?; fs::rename(path, path2).await?;
self.put_to_resync(&hash, Duration::from_millis(0))?; self.put_to_resync(&hash, Duration::from_millis(0))?;
return Err(Error::CorruptData(*hash)); return Err(Error::CorruptData(*hash));
@ -470,57 +471,119 @@ impl BlockManager {
} }
// 2. Repair blocks actually on disk // 2. Repair blocks actually on disk
self.repair_aux_read_dir_rec(&self.data_dir, must_exit)
.await?;
Ok(())
}
fn repair_aux_read_dir_rec<'a>(
&'a self,
path: &'a Path,
must_exit: &'a watch::Receiver<bool>,
) -> BoxFuture<'a, Result<(), Error>> {
// Lists all blocks on disk and adds them to the resync queue. // Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need, // This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally. // so that we can offload them if necessary and then delete them locally.
self.for_each_file(
(),
move |_, hash| async move { self.put_to_resync(&hash, Duration::from_secs(0)) },
must_exit,
)
.await
}
async fn for_each_file<F, Fut, State>(
&self,
state: State,
mut f: F,
must_exit: &watch::Receiver<bool>,
) -> Result<(), Error>
where
F: FnMut(State, Hash) -> Fut + Send,
Fut: Future<Output = Result<State, Error>> + Send,
State: Send,
{
self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
.await
.map(|_| ())
}
fn for_each_file_rec<'a, F, Fut, State>(
&'a self,
path: &'a Path,
mut state: State,
f: &'a mut F,
must_exit: &'a watch::Receiver<bool>,
) -> BoxFuture<'a, Result<State, Error>>
where
F: FnMut(State, Hash) -> Fut + Send,
Fut: Future<Output = Result<State, Error>> + Send,
State: Send + 'a,
{
async move { async move {
let mut ls_data_dir = fs::read_dir(path).await?; let mut ls_data_dir = fs::read_dir(path).await?;
loop { while let Some(data_dir_ent) = ls_data_dir.next_entry().await? {
let data_dir_ent = ls_data_dir.next_entry().await?; if *must_exit.borrow() {
let data_dir_ent = match data_dir_ent { break;
Some(x) => x, }
None => break,
};
let name = data_dir_ent.file_name(); let name = data_dir_ent.file_name();
let name = match name.into_string() { let name = if let Ok(n) = name.into_string() {
Ok(x) => x, n
Err(_) => continue, } else {
continue;
}; };
let ent_type = data_dir_ent.file_type().await?; let ent_type = data_dir_ent.file_type().await?;
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit) state = self
.for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
.await?; .await?;
} else if name.len() == 64 { } else if name.len() == 64 {
let hash_bytes = match hex::decode(&name) { let hash_bytes = if let Ok(h) = hex::decode(&name) {
Ok(h) => h, h
Err(_) => continue, } else {
continue;
}; };
let mut hash = [0u8; 32]; let mut hash = [0u8; 32];
hash.copy_from_slice(&hash_bytes[..]); hash.copy_from_slice(&hash_bytes[..]);
self.put_to_resync(&hash.into(), Duration::from_secs(0))?; state = f(state, hash.into()).await?;
}
if *must_exit.borrow() {
break;
} }
} }
Ok(()) Ok(state)
} }
.boxed() .boxed()
} }
/// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
/// this function.
pub async fn verify_data_store_integrity(
&self,
must_exit: &watch::Receiver<bool>,
speed_limit: Option<usize>,
) -> Result<(), Error> {
let last_refill = Instant::now();
let token_left = speed_limit.unwrap_or(0);
self.for_each_file(
(last_refill, token_left),
move |(last_refill, token_left), hash| {
async move {
let len = match self.read_block(&hash).await {
Ok(Message::PutBlock(PutBlockMessage { data, .. })) => data.len(),
Ok(_) => unreachable!(),
Err(_) => 0, // resync and warn message made by read_block if necessary
};
if let Some(speed_limit) = speed_limit {
// throttling logic
if let Some(t) = token_left.checked_sub(len) {
// token bucket not empty yet
Ok((last_refill, t))
} else {
// token bucket empty. Sleep and refill
tokio::time::sleep_until(last_refill + Duration::from_secs(1)).await;
Ok((Instant::now(), speed_limit))
}
} else {
Ok((last_refill, token_left)) // actually not used
}
}
},
must_exit,
)
.await
}
/// Get lenght of resync queue /// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize { pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len() self.resync_queue.len()