add cli parameter to verify local bloc integrity
This commit is contained in:
parent
30a7dee920
commit
d87f481df0
3 changed files with 114 additions and 0 deletions
|
@ -283,6 +283,9 @@ pub enum RepairWhat {
|
|||
/// Only redo the propagation of version deletions to the block ref table (extremely slow)
|
||||
#[structopt(name = "block_refs")]
|
||||
BlockRefs,
|
||||
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
|
||||
#[structopt(name = "blocks_integrity")]
|
||||
BlockIntegrity,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||
|
|
|
@ -63,6 +63,14 @@ impl Repair {
|
|||
.await?;
|
||||
}
|
||||
|
||||
if todo(RepairWhat::BlockIntegrity) {
|
||||
info!("Verifyin integrity of stored blocks");
|
||||
self.garage
|
||||
.block_manager
|
||||
.verify_data_store_integrity(&must_exit, None)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
|
|||
use tokio::fs;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::sync::{watch, Mutex, Notify};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error;
|
||||
|
@ -521,6 +522,108 @@ impl BlockManager {
|
|||
.boxed()
|
||||
}
|
||||
|
||||
async fn for_each_file<F, Fut, State>(
|
||||
&self,
|
||||
state: State,
|
||||
mut f: F,
|
||||
must_exit: &watch::Receiver<bool>,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
F: FnMut(State, Hash) -> Fut + Send,
|
||||
Fut: Future<Output = Result<State, Error>> + Send,
|
||||
State: Send,
|
||||
{
|
||||
self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
|
||||
.await
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
fn for_each_file_rec<'a, F, Fut, State>(
|
||||
&'a self,
|
||||
path: &'a Path,
|
||||
mut state: State,
|
||||
f: &'a mut F,
|
||||
must_exit: &'a watch::Receiver<bool>,
|
||||
) -> BoxFuture<'a, Result<State, Error>>
|
||||
where
|
||||
F: FnMut(State, Hash) -> Fut + Send,
|
||||
Fut: Future<Output = Result<State, Error>> + Send,
|
||||
State: Send + 'a,
|
||||
{
|
||||
async move {
|
||||
let mut ls_data_dir = fs::read_dir(path).await?;
|
||||
while let Some(data_dir_ent) = ls_data_dir.next_entry().await? {
|
||||
if *must_exit.borrow() {
|
||||
break;
|
||||
}
|
||||
|
||||
let name = data_dir_ent.file_name();
|
||||
let name = if let Ok(n) = name.into_string() {
|
||||
n
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
let ent_type = data_dir_ent.file_type().await?;
|
||||
|
||||
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
|
||||
state = self
|
||||
.for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
|
||||
.await?;
|
||||
} else if name.len() == 64 {
|
||||
let hash_bytes = if let Ok(h) = hex::decode(&name) {
|
||||
h
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
let mut hash = [0u8; 32];
|
||||
hash.copy_from_slice(&hash_bytes[..]);
|
||||
state = f(state, hash.into()).await?;
|
||||
}
|
||||
}
|
||||
Ok(state)
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
/// Verify integrity of each block on disc. Use `speed_limit` to limit the load generated by
|
||||
/// this function.
|
||||
pub async fn verify_data_store_integrity(
|
||||
&self,
|
||||
must_exit: &watch::Receiver<bool>,
|
||||
speed_limit: Option<usize>,
|
||||
) -> Result<(), Error> {
|
||||
let last_refill = Instant::now();
|
||||
let token_left = speed_limit.unwrap_or(0);
|
||||
self.for_each_file(
|
||||
(last_refill, token_left),
|
||||
move |(last_refill, token_left), hash| {
|
||||
async move {
|
||||
let len = match self.read_block(&hash).await {
|
||||
Ok(Message::PutBlock(PutBlockMessage { data, .. })) => data.len(),
|
||||
Ok(_) => unreachable!(),
|
||||
Err(_) => todo!("log error"),
|
||||
};
|
||||
|
||||
if let Some(speed_limit) = speed_limit {
|
||||
// throttling logic
|
||||
if let Some(t) = token_left.checked_sub(len) {
|
||||
// token bucket not empty yet
|
||||
Ok((last_refill, t))
|
||||
} else {
|
||||
// token bucket empty. Sleep and refill
|
||||
tokio::time::sleep_until(last_refill + Duration::from_secs(1)).await;
|
||||
Ok((Instant::now(), speed_limit))
|
||||
}
|
||||
} else {
|
||||
Ok((last_refill, token_left)) // actually not used
|
||||
}
|
||||
}
|
||||
},
|
||||
must_exit,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get lenght of resync queue
|
||||
pub fn resync_queue_len(&self) -> usize {
|
||||
self.resync_queue.len()
|
||||
|
|
Loading…
Reference in a new issue