add cli command to verify local block integrity #94
1 changed files with 8 additions and 48 deletions
|
@ -471,55 +471,15 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Repair blocks actually on disk
|
// 2. Repair blocks actually on disk
|
||||||
self.repair_aux_read_dir_rec(&self.data_dir, must_exit)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn repair_aux_read_dir_rec<'a>(
|
|
||||||
&'a self,
|
|
||||||
path: &'a Path,
|
|
||||||
must_exit: &'a watch::Receiver<bool>,
|
|
||||||
) -> BoxFuture<'a, Result<(), Error>> {
|
|
||||||
// Lists all blocks on disk and adds them to the resync queue.
|
// Lists all blocks on disk and adds them to the resync queue.
|
||||||
// This allows us to find blocks we are storing but don't actually need,
|
// This allows us to find blocks we are storing but don't actually need,
|
||||||
// so that we can offload them if necessary and then delete them locally.
|
// so that we can offload them if necessary and then delete them locally.
|
||||||
async move {
|
self.for_each_file(
|
||||||
let mut ls_data_dir = fs::read_dir(path).await?;
|
(),
|
||||||
loop {
|
move |_, hash| async move { self.put_to_resync(&hash, Duration::from_secs(0)) },
|
||||||
let data_dir_ent = ls_data_dir.next_entry().await?;
|
must_exit,
|
||||||
let data_dir_ent = match data_dir_ent {
|
)
|
||||||
Some(x) => x,
|
.await
|
||||||
None => break,
|
|
||||||
};
|
|
||||||
let name = data_dir_ent.file_name();
|
|
||||||
let name = match name.into_string() {
|
|
||||||
Ok(x) => x,
|
|
||||||
Err(_) => continue,
|
|
||||||
};
|
|
||||||
let ent_type = data_dir_ent.file_type().await?;
|
|
||||||
|
|
||||||
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
|
|
||||||
self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit)
|
|
||||||
.await?;
|
|
||||||
} else if name.len() == 64 {
|
|
||||||
let hash_bytes = match hex::decode(&name) {
|
|
||||||
Ok(h) => h,
|
|
||||||
Err(_) => continue,
|
|
||||||
};
|
|
||||||
let mut hash = [0u8; 32];
|
|
||||||
hash.copy_from_slice(&hash_bytes[..]);
|
|
||||||
self.put_to_resync(&hash.into(), Duration::from_secs(0))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if *must_exit.borrow() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
.boxed()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn for_each_file<F, Fut, State>(
|
async fn for_each_file<F, Fut, State>(
|
||||||
|
@ -585,7 +545,7 @@ impl BlockManager {
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Verify integrity of each block on disc. Use `speed_limit` to limit the load generated by
|
/// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
|
||||||
/// this function.
|
/// this function.
|
||||||
pub async fn verify_data_store_integrity(
|
pub async fn verify_data_store_integrity(
|
||||||
&self,
|
&self,
|
||||||
|
@ -601,7 +561,7 @@ impl BlockManager {
|
||||||
let len = match self.read_block(&hash).await {
|
let len = match self.read_block(&hash).await {
|
||||||
Ok(Message::PutBlock(PutBlockMessage { data, .. })) => data.len(),
|
Ok(Message::PutBlock(PutBlockMessage { data, .. })) => data.len(),
|
||||||
Ok(_) => unreachable!(),
|
Ok(_) => unreachable!(),
|
||||||
Err(_) => todo!("log error"),
|
Err(_) => 0, // resync and warn message made by read_block if necessary
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(speed_limit) = speed_limit {
|
if let Some(speed_limit) = speed_limit {
|
||||||
|
|
Loading…
Reference in a new issue