forked from Deuxfleurs/garage
Compare commits
2 commits
4e8af1d956
...
6b47c294f5
Author | SHA1 | Date | |
---|---|---|---|
6b47c294f5 | |||
28c015d9ff |
5 changed files with 161 additions and 70 deletions
|
@ -265,7 +265,7 @@ pub struct RepairOpt {
|
||||||
pub yes: bool,
|
pub yes: bool,
|
||||||
|
|
||||||
#[structopt(subcommand)]
|
#[structopt(subcommand)]
|
||||||
pub what: Option<RepairWhat>,
|
pub what: RepairWhat,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
|
||||||
|
@ -282,6 +282,13 @@ pub enum RepairWhat {
|
||||||
/// Only redo the propagation of version deletions to the block ref table (extremely slow)
|
/// Only redo the propagation of version deletions to the block ref table (extremely slow)
|
||||||
#[structopt(name = "block_refs")]
|
#[structopt(name = "block_refs")]
|
||||||
BlockRefs,
|
BlockRefs,
|
||||||
|
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
|
||||||
|
#[structopt(name = "scrub")]
|
||||||
|
Scrub {
|
||||||
|
/// Limit on i/o speed, in B/s
|
||||||
|
#[structopt(name = "limit")]
|
||||||
|
limit: Option<usize>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
|
|
|
@ -27,42 +27,38 @@ impl Repair {
|
||||||
opt: RepairOpt,
|
opt: RepairOpt,
|
||||||
must_exit: watch::Receiver<bool>,
|
must_exit: watch::Receiver<bool>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true);
|
match opt.what {
|
||||||
|
RepairWhat::Tables => {
|
||||||
if todo(RepairWhat::Tables) {
|
info!("Launching a full sync of tables");
|
||||||
info!("Launching a full sync of tables");
|
self.garage.bucket_table.syncer.add_full_sync();
|
||||||
self.garage.bucket_table.syncer.add_full_sync();
|
self.garage.object_table.syncer.add_full_sync();
|
||||||
self.garage.object_table.syncer.add_full_sync();
|
self.garage.version_table.syncer.add_full_sync();
|
||||||
self.garage.version_table.syncer.add_full_sync();
|
self.garage.block_ref_table.syncer.add_full_sync();
|
||||||
self.garage.block_ref_table.syncer.add_full_sync();
|
self.garage.key_table.syncer.add_full_sync();
|
||||||
self.garage.key_table.syncer.add_full_sync();
|
}
|
||||||
|
RepairWhat::Versions => {
|
||||||
|
info!("Repairing the versions table");
|
||||||
|
self.repair_versions(&must_exit).await?;
|
||||||
|
}
|
||||||
|
RepairWhat::BlockRefs => {
|
||||||
|
info!("Repairing the block refs table");
|
||||||
|
self.repair_block_ref(&must_exit).await?;
|
||||||
|
}
|
||||||
|
RepairWhat::Blocks => {
|
||||||
|
info!("Repairing the stored blocks");
|
||||||
|
self.garage
|
||||||
|
.block_manager
|
||||||
|
.repair_data_store(&must_exit)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
RepairWhat::Scrub { limit } => {
|
||||||
|
info!("Verifying integrity of stored blocks");
|
||||||
|
self.garage
|
||||||
|
.block_manager
|
||||||
|
.scrub_data_store(&must_exit, limit)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: wait for full sync to finish before proceeding to the rest?
|
|
||||||
|
|
||||||
if todo(RepairWhat::Versions) {
|
|
||||||
info!("Repairing the versions table");
|
|
||||||
self.repair_versions(&must_exit).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if todo(RepairWhat::BlockRefs) {
|
|
||||||
info!("Repairing the block refs table");
|
|
||||||
self.repair_block_ref(&must_exit).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if opt.what.is_none() {
|
|
||||||
info!("Repairing the RC");
|
|
||||||
self.repair_rc(&must_exit).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if todo(RepairWhat::Blocks) {
|
|
||||||
info!("Repairing the stored blocks");
|
|
||||||
self.garage
|
|
||||||
.block_manager
|
|
||||||
.repair_data_store(&must_exit)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,10 +146,4 @@ impl Repair {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn repair_rc(&self, _must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
|
|
||||||
// TODO
|
|
||||||
warn!("repair_rc: not implemented");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ use tokio::sync::{watch, Mutex, Notify};
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
use garage_util::token_bucket::TokenBucket;
|
||||||
|
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
@ -197,10 +198,44 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Repair blocks actually on disk
|
// 2. Repair blocks actually on disk
|
||||||
self.repair_aux_read_dir_rec(&self.data_dir, must_exit)
|
// Lists all blocks on disk and adds them to the resync queue.
|
||||||
.await?;
|
// This allows us to find blocks we are storing but don't actually need,
|
||||||
|
// so that we can offload them if necessary and then delete them locally.
|
||||||
|
self.for_each_file(
|
||||||
|
(),
|
||||||
|
move |_, hash| async move { self.put_to_resync(&hash, Duration::from_secs(0)) },
|
||||||
|
must_exit,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
/// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
|
||||||
|
/// this function.
|
||||||
|
pub async fn scrub_data_store(
|
||||||
|
&self,
|
||||||
|
must_exit: &watch::Receiver<bool>,
|
||||||
|
speed_limit: Option<usize>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64));
|
||||||
|
self.for_each_file(
|
||||||
|
token_bucket,
|
||||||
|
move |mut token_bucket, hash| {
|
||||||
|
async move {
|
||||||
|
let len = match self.read_block(&hash).await {
|
||||||
|
Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
|
||||||
|
Ok(_) => unreachable!(),
|
||||||
|
Err(_) => 0, // resync and warn message made by read_block if necessary
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(tb) = &mut token_bucket {
|
||||||
|
tb.take(len as u64).await;
|
||||||
|
}
|
||||||
|
Ok(token_bucket)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
must_exit,
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get lenght of resync queue
|
/// Get lenght of resync queue
|
||||||
|
@ -485,47 +520,65 @@ impl BlockManager {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn repair_aux_read_dir_rec<'a>(
|
async fn for_each_file<F, Fut, State>(
|
||||||
|
&self,
|
||||||
|
state: State,
|
||||||
|
mut f: F,
|
||||||
|
must_exit: &watch::Receiver<bool>,
|
||||||
|
) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
F: FnMut(State, Hash) -> Fut + Send,
|
||||||
|
Fut: Future<Output = Result<State, Error>> + Send,
|
||||||
|
State: Send,
|
||||||
|
{
|
||||||
|
self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
|
||||||
|
.await
|
||||||
|
.map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn for_each_file_rec<'a, F, Fut, State>(
|
||||||
&'a self,
|
&'a self,
|
||||||
path: &'a Path,
|
path: &'a Path,
|
||||||
|
mut state: State,
|
||||||
|
f: &'a mut F,
|
||||||
must_exit: &'a watch::Receiver<bool>,
|
must_exit: &'a watch::Receiver<bool>,
|
||||||
) -> BoxFuture<'a, Result<(), Error>> {
|
) -> BoxFuture<'a, Result<State, Error>>
|
||||||
// Lists all blocks on disk and adds them to the resync queue.
|
where
|
||||||
// This allows us to find blocks we are storing but don't actually need,
|
F: FnMut(State, Hash) -> Fut + Send,
|
||||||
// so that we can offload them if necessary and then delete them locally.
|
Fut: Future<Output = Result<State, Error>> + Send,
|
||||||
|
State: Send + 'a,
|
||||||
|
{
|
||||||
async move {
|
async move {
|
||||||
let mut ls_data_dir = fs::read_dir(path).await?;
|
let mut ls_data_dir = fs::read_dir(path).await?;
|
||||||
loop {
|
while let Some(data_dir_ent) = ls_data_dir.next_entry().await? {
|
||||||
let data_dir_ent = ls_data_dir.next_entry().await?;
|
if *must_exit.borrow() {
|
||||||
let data_dir_ent = match data_dir_ent {
|
break;
|
||||||
Some(x) => x,
|
}
|
||||||
None => break,
|
|
||||||
};
|
|
||||||
let name = data_dir_ent.file_name();
|
let name = data_dir_ent.file_name();
|
||||||
let name = match name.into_string() {
|
let name = if let Ok(n) = name.into_string() {
|
||||||
Ok(x) => x,
|
n
|
||||||
Err(_) => continue,
|
} else {
|
||||||
|
continue;
|
||||||
};
|
};
|
||||||
let ent_type = data_dir_ent.file_type().await?;
|
let ent_type = data_dir_ent.file_type().await?;
|
||||||
|
|
||||||
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
|
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
|
||||||
self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit)
|
state = self
|
||||||
|
.for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
|
||||||
.await?;
|
.await?;
|
||||||
} else if name.len() == 64 {
|
} else if name.len() == 64 {
|
||||||
let hash_bytes = match hex::decode(&name) {
|
let hash_bytes = if let Ok(h) = hex::decode(&name) {
|
||||||
Ok(h) => h,
|
h
|
||||||
Err(_) => continue,
|
} else {
|
||||||
|
continue;
|
||||||
};
|
};
|
||||||
let mut hash = [0u8; 32];
|
let mut hash = [0u8; 32];
|
||||||
hash.copy_from_slice(&hash_bytes[..]);
|
hash.copy_from_slice(&hash_bytes[..]);
|
||||||
self.put_to_resync(&hash.into(), Duration::from_secs(0))?;
|
state = f(state, hash.into()).await?;
|
||||||
}
|
|
||||||
|
|
||||||
if *must_exit.borrow() {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(state)
|
||||||
}
|
}
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
@ -598,7 +651,7 @@ impl BlockManagerLocked {
|
||||||
);
|
);
|
||||||
let path = mgr.block_path(hash);
|
let path = mgr.block_path(hash);
|
||||||
let mut path2 = path.clone();
|
let mut path2 = path.clone();
|
||||||
path2.set_extension(".corrupted");
|
path2.set_extension("corrupted");
|
||||||
fs::rename(path, path2).await?;
|
fs::rename(path, path2).await?;
|
||||||
mgr.put_to_resync(hash, Duration::from_millis(0))?;
|
mgr.put_to_resync(hash, Duration::from_millis(0))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -9,3 +9,4 @@ pub mod data;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod persister;
|
pub mod persister;
|
||||||
pub mod time;
|
pub mod time;
|
||||||
|
pub mod token_bucket;
|
||||||
|
|
40
src/util/token_bucket.rs
Normal file
40
src/util/token_bucket.rs
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
|
pub struct TokenBucket {
|
||||||
|
// Replenish rate: number of tokens per second
|
||||||
|
replenish_rate: u64,
|
||||||
|
// Current number of tokens
|
||||||
|
tokens: u64,
|
||||||
|
// Last replenish time
|
||||||
|
last_replenish: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TokenBucket {
|
||||||
|
pub fn new(replenish_rate: u64) -> Self {
|
||||||
|
Self {
|
||||||
|
replenish_rate,
|
||||||
|
tokens: 0,
|
||||||
|
last_replenish: Instant::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn take(&mut self, tokens: u64) {
|
||||||
|
while self.tokens < tokens {
|
||||||
|
let needed = tokens - self.tokens;
|
||||||
|
let delay = (needed as f64) / (self.replenish_rate as f64);
|
||||||
|
sleep(Duration::from_secs_f64(delay)).await;
|
||||||
|
self.replenish();
|
||||||
|
}
|
||||||
|
self.tokens -= tokens;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn replenish(&mut self) {
|
||||||
|
let now = Instant::now();
|
||||||
|
let new_tokens =
|
||||||
|
((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64;
|
||||||
|
self.tokens += new_tokens;
|
||||||
|
self.last_replenish = now;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue