|
|
|
@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
|
|
|
|
|
use tokio::fs;
|
|
|
|
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
|
|
|
use tokio::sync::{watch, Mutex, Notify};
|
|
|
|
|
use tokio::time::Instant;
|
|
|
|
|
|
|
|
|
|
use garage_util::data::*;
|
|
|
|
|
use garage_util::error::Error;
|
|
|
|
@ -197,10 +198,15 @@ impl BlockManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 2. Repair blocks actually on disk
|
|
|
|
|
self.repair_aux_read_dir_rec(&self.data_dir, must_exit)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
// Lists all blocks on disk and adds them to the resync queue.
|
|
|
|
|
// This allows us to find blocks we are storing but don't actually need,
|
|
|
|
|
// so that we can offload them if necessary and then delete them locally.
|
|
|
|
|
self.for_each_file(
|
|
|
|
|
(),
|
|
|
|
|
move |_, hash| async move { self.put_to_resync(&hash, Duration::from_secs(0)) },
|
|
|
|
|
must_exit,
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get lenght of resync queue
|
|
|
|
@ -485,50 +491,107 @@ impl BlockManager {
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn repair_aux_read_dir_rec<'a>(
|
|
|
|
|
async fn for_each_file<F, Fut, State>(
|
|
|
|
|
&self,
|
|
|
|
|
state: State,
|
|
|
|
|
mut f: F,
|
|
|
|
|
must_exit: &watch::Receiver<bool>,
|
|
|
|
|
) -> Result<(), Error>
|
|
|
|
|
where
|
|
|
|
|
F: FnMut(State, Hash) -> Fut + Send,
|
|
|
|
|
Fut: Future<Output = Result<State, Error>> + Send,
|
|
|
|
|
State: Send,
|
|
|
|
|
{
|
|
|
|
|
self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
|
|
|
|
|
.await
|
|
|
|
|
.map(|_| ())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn for_each_file_rec<'a, F, Fut, State>(
|
|
|
|
|
&'a self,
|
|
|
|
|
path: &'a Path,
|
|
|
|
|
mut state: State,
|
|
|
|
|
f: &'a mut F,
|
|
|
|
|
must_exit: &'a watch::Receiver<bool>,
|
|
|
|
|
) -> BoxFuture<'a, Result<(), Error>> {
|
|
|
|
|
// Lists all blocks on disk and adds them to the resync queue.
|
|
|
|
|
// This allows us to find blocks we are storing but don't actually need,
|
|
|
|
|
// so that we can offload them if necessary and then delete them locally.
|
|
|
|
|
) -> BoxFuture<'a, Result<State, Error>>
|
|
|
|
|
where
|
|
|
|
|
F: FnMut(State, Hash) -> Fut + Send,
|
|
|
|
|
Fut: Future<Output = Result<State, Error>> + Send,
|
|
|
|
|
State: Send + 'a,
|
|
|
|
|
{
|
|
|
|
|
async move {
|
|
|
|
|
let mut ls_data_dir = fs::read_dir(path).await?;
|
|
|
|
|
loop {
|
|
|
|
|
let data_dir_ent = ls_data_dir.next_entry().await?;
|
|
|
|
|
let data_dir_ent = match data_dir_ent {
|
|
|
|
|
Some(x) => x,
|
|
|
|
|
None => break,
|
|
|
|
|
};
|
|
|
|
|
while let Some(data_dir_ent) = ls_data_dir.next_entry().await? {
|
|
|
|
|
if *must_exit.borrow() {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let name = data_dir_ent.file_name();
|
|
|
|
|
let name = match name.into_string() {
|
|
|
|
|
Ok(x) => x,
|
|
|
|
|
Err(_) => continue,
|
|
|
|
|
let name = if let Ok(n) = name.into_string() {
|
|
|
|
|
n
|
|
|
|
|
} else {
|
|
|
|
|
continue;
|
|
|
|
|
};
|
|
|
|
|
let ent_type = data_dir_ent.file_type().await?;
|
|
|
|
|
|
|
|
|
|
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
|
|
|
|
|
self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit)
|
|
|
|
|
state = self
|
|
|
|
|
.for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
|
|
|
|
|
.await?;
|
|
|
|
|
} else if name.len() == 64 {
|
|
|
|
|
let hash_bytes = match hex::decode(&name) {
|
|
|
|
|
Ok(h) => h,
|
|
|
|
|
Err(_) => continue,
|
|
|
|
|
let hash_bytes = if let Ok(h) = hex::decode(&name) {
|
|
|
|
|
h
|
|
|
|
|
} else {
|
|
|
|
|
continue;
|
|
|
|
|
};
|
|
|
|
|
let mut hash = [0u8; 32];
|
|
|
|
|
hash.copy_from_slice(&hash_bytes[..]);
|
|
|
|
|
self.put_to_resync(&hash.into(), Duration::from_secs(0))?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if *must_exit.borrow() {
|
|
|
|
|
break;
|
|
|
|
|
state = f(state, hash.into()).await?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
Ok(state)
|
|
|
|
|
}
|
|
|
|
|
.boxed()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
|
|
|
|
|
/// this function.
|
|
|
|
|
pub async fn verify_data_store_integrity(
|
|
|
|
|
&self,
|
|
|
|
|
must_exit: &watch::Receiver<bool>,
|
|
|
|
|
speed_limit: Option<usize>,
|
|
|
|
|
) -> Result<(), Error> {
|
|
|
|
|
let last_refill = Instant::now();
|
|
|
|
|
let token_left = speed_limit.unwrap_or(0);
|
|
|
|
|
self.for_each_file(
|
|
|
|
|
(last_refill, token_left),
|
|
|
|
|
move |(last_refill, token_left), hash| {
|
|
|
|
|
async move {
|
|
|
|
|
let len = match self.read_block(&hash).await {
|
|
|
|
|
Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
|
|
|
|
|
Ok(_) => unreachable!(),
|
|
|
|
|
Err(_) => 0, // resync and warn message made by read_block if necessary
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if let Some(speed_limit) = speed_limit {
|
|
|
|
|
// throttling logic
|
|
|
|
|
if let Some(t) = token_left.checked_sub(len) {
|
|
|
|
|
// token bucket not empty yet
|
|
|
|
|
Ok((last_refill, t))
|
|
|
|
|
} else {
|
|
|
|
|
// token bucket empty. Sleep and refill
|
|
|
|
|
tokio::time::sleep_until(last_refill + Duration::from_secs(1)).await;
|
|
|
|
|
Ok((Instant::now(), speed_limit))
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
Ok((last_refill, token_left)) // actually not used
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
must_exit,
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
@ -598,7 +661,7 @@ impl BlockManagerLocked {
|
|
|
|
|
);
|
|
|
|
|
let path = mgr.block_path(hash);
|
|
|
|
|
let mut path2 = path.clone();
|
|
|
|
|
path2.set_extension(".corrupted");
|
|
|
|
|
path2.set_extension("corrupted");
|
|
|
|
|
fs::rename(path, path2).await?;
|
|
|
|
|
mgr.put_to_resync(hash, Duration::from_millis(0))?;
|
|
|
|
|
Ok(())
|
|
|
|
|