forked from Deuxfleurs/garage
Refactoring on repair commands
This commit is contained in:
parent
28c015d9ff
commit
6b47c294f5
5 changed files with 105 additions and 92 deletions
|
@ -265,7 +265,7 @@ pub struct RepairOpt {
|
|||
pub yes: bool,
|
||||
|
||||
#[structopt(subcommand)]
|
||||
pub what: Option<RepairWhat>,
|
||||
pub what: RepairWhat,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
|
||||
|
@ -283,8 +283,8 @@ pub enum RepairWhat {
|
|||
#[structopt(name = "block_refs")]
|
||||
BlockRefs,
|
||||
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
|
||||
#[structopt(name = "blocks_integrity")]
|
||||
BlockIntegrity {
|
||||
#[structopt(name = "scrub")]
|
||||
Scrub {
|
||||
/// Limit on i/o speed, in B/s
|
||||
#[structopt(name = "limit")]
|
||||
limit: Option<usize>,
|
||||
|
|
|
@ -27,50 +27,38 @@ impl Repair {
|
|||
opt: RepairOpt,
|
||||
must_exit: watch::Receiver<bool>,
|
||||
) -> Result<(), Error> {
|
||||
let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true);
|
||||
|
||||
if todo(RepairWhat::Tables) {
|
||||
info!("Launching a full sync of tables");
|
||||
self.garage.bucket_table.syncer.add_full_sync();
|
||||
self.garage.object_table.syncer.add_full_sync();
|
||||
self.garage.version_table.syncer.add_full_sync();
|
||||
self.garage.block_ref_table.syncer.add_full_sync();
|
||||
self.garage.key_table.syncer.add_full_sync();
|
||||
match opt.what {
|
||||
RepairWhat::Tables => {
|
||||
info!("Launching a full sync of tables");
|
||||
self.garage.bucket_table.syncer.add_full_sync();
|
||||
self.garage.object_table.syncer.add_full_sync();
|
||||
self.garage.version_table.syncer.add_full_sync();
|
||||
self.garage.block_ref_table.syncer.add_full_sync();
|
||||
self.garage.key_table.syncer.add_full_sync();
|
||||
}
|
||||
RepairWhat::Versions => {
|
||||
info!("Repairing the versions table");
|
||||
self.repair_versions(&must_exit).await?;
|
||||
}
|
||||
RepairWhat::BlockRefs => {
|
||||
info!("Repairing the block refs table");
|
||||
self.repair_block_ref(&must_exit).await?;
|
||||
}
|
||||
RepairWhat::Blocks => {
|
||||
info!("Repairing the stored blocks");
|
||||
self.garage
|
||||
.block_manager
|
||||
.repair_data_store(&must_exit)
|
||||
.await?;
|
||||
}
|
||||
RepairWhat::Scrub { limit } => {
|
||||
info!("Verifying integrity of stored blocks");
|
||||
self.garage
|
||||
.block_manager
|
||||
.scrub_data_store(&must_exit, limit)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: wait for full sync to finish before proceeding to the rest?
|
||||
|
||||
if todo(RepairWhat::Versions) {
|
||||
info!("Repairing the versions table");
|
||||
self.repair_versions(&must_exit).await?;
|
||||
}
|
||||
|
||||
if todo(RepairWhat::BlockRefs) {
|
||||
info!("Repairing the block refs table");
|
||||
self.repair_block_ref(&must_exit).await?;
|
||||
}
|
||||
|
||||
if opt.what.is_none() {
|
||||
info!("Repairing the RC");
|
||||
self.repair_rc(&must_exit).await?;
|
||||
}
|
||||
|
||||
if todo(RepairWhat::Blocks) {
|
||||
info!("Repairing the stored blocks");
|
||||
self.garage
|
||||
.block_manager
|
||||
.repair_data_store(&must_exit)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if let Some(RepairWhat::BlockIntegrity { limit }) = opt.what {
|
||||
info!("Verifying integrity of stored blocks");
|
||||
self.garage
|
||||
.block_manager
|
||||
.verify_data_store_integrity(&must_exit, limit)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -158,10 +146,4 @@ impl Repair {
|
|||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn repair_rc(&self, _must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
|
||||
// TODO
|
||||
warn!("repair_rc: not implemented");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,11 +10,11 @@ use serde::{Deserialize, Serialize};
|
|||
use tokio::fs;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::sync::{watch, Mutex, Notify};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error;
|
||||
use garage_util::time::*;
|
||||
use garage_util::token_bucket::TokenBucket;
|
||||
|
||||
use garage_rpc::system::System;
|
||||
use garage_rpc::*;
|
||||
|
@ -209,6 +209,35 @@ impl BlockManager {
|
|||
.await
|
||||
}
|
||||
|
||||
/// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
|
||||
/// this function.
|
||||
pub async fn scrub_data_store(
|
||||
&self,
|
||||
must_exit: &watch::Receiver<bool>,
|
||||
speed_limit: Option<usize>,
|
||||
) -> Result<(), Error> {
|
||||
let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64));
|
||||
self.for_each_file(
|
||||
token_bucket,
|
||||
move |mut token_bucket, hash| {
|
||||
async move {
|
||||
let len = match self.read_block(&hash).await {
|
||||
Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
|
||||
Ok(_) => unreachable!(),
|
||||
Err(_) => 0, // resync and warn message made by read_block if necessary
|
||||
};
|
||||
|
||||
if let Some(tb) = &mut token_bucket {
|
||||
tb.take(len as u64).await;
|
||||
}
|
||||
Ok(token_bucket)
|
||||
}
|
||||
},
|
||||
must_exit,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get lenght of resync queue
|
||||
pub fn resync_queue_len(&self) -> usize {
|
||||
self.resync_queue.len()
|
||||
|
@ -553,45 +582,6 @@ impl BlockManager {
|
|||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
/// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
|
||||
/// this function.
|
||||
pub async fn verify_data_store_integrity(
|
||||
&self,
|
||||
must_exit: &watch::Receiver<bool>,
|
||||
speed_limit: Option<usize>,
|
||||
) -> Result<(), Error> {
|
||||
let last_refill = Instant::now();
|
||||
let token_left = speed_limit.unwrap_or(0);
|
||||
self.for_each_file(
|
||||
(last_refill, token_left),
|
||||
move |(last_refill, token_left), hash| {
|
||||
async move {
|
||||
let len = match self.read_block(&hash).await {
|
||||
Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
|
||||
Ok(_) => unreachable!(),
|
||||
Err(_) => 0, // resync and warn message made by read_block if necessary
|
||||
};
|
||||
|
||||
if let Some(speed_limit) = speed_limit {
|
||||
// throttling logic
|
||||
if let Some(t) = token_left.checked_sub(len) {
|
||||
// token bucket not empty yet
|
||||
Ok((last_refill, t))
|
||||
} else {
|
||||
// token bucket empty. Sleep and refill
|
||||
tokio::time::sleep_until(last_refill + Duration::from_secs(1)).await;
|
||||
Ok((Instant::now(), speed_limit))
|
||||
}
|
||||
} else {
|
||||
Ok((last_refill, token_left)) // actually not used
|
||||
}
|
||||
}
|
||||
},
|
||||
must_exit,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
|
|
@ -9,3 +9,4 @@ pub mod data;
|
|||
pub mod error;
|
||||
pub mod persister;
|
||||
pub mod time;
|
||||
pub mod token_bucket;
|
||||
|
|
40
src/util/token_bucket.rs
Normal file
40
src/util/token_bucket.rs
Normal file
|
@ -0,0 +1,40 @@
|
|||
use std::time::{Duration, Instant};
|
||||
|
||||
use tokio::time::sleep;
|
||||
|
||||
pub struct TokenBucket {
|
||||
// Replenish rate: number of tokens per second
|
||||
replenish_rate: u64,
|
||||
// Current number of tokens
|
||||
tokens: u64,
|
||||
// Last replenish time
|
||||
last_replenish: Instant,
|
||||
}
|
||||
|
||||
impl TokenBucket {
|
||||
pub fn new(replenish_rate: u64) -> Self {
|
||||
Self {
|
||||
replenish_rate,
|
||||
tokens: 0,
|
||||
last_replenish: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn take(&mut self, tokens: u64) {
|
||||
while self.tokens < tokens {
|
||||
let needed = tokens - self.tokens;
|
||||
let delay = (needed as f64) / (self.replenish_rate as f64);
|
||||
sleep(Duration::from_secs_f64(delay)).await;
|
||||
self.replenish();
|
||||
}
|
||||
self.tokens -= tokens;
|
||||
}
|
||||
|
||||
pub fn replenish(&mut self) {
|
||||
let now = Instant::now();
|
||||
let new_tokens =
|
||||
((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64;
|
||||
self.tokens += new_tokens;
|
||||
self.last_replenish = now;
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue