Compare commits

...

1 commit

Author SHA1 Message Date
Jonathan Davies
41d4817eeb block/repair.rs: Initial implementation of scrub checkpointing.
All checks were successful
continuous-integration/drone/pr Build is passing
2023-06-08 13:12:03 +01:00

View file

@ -136,7 +136,7 @@ impl Worker for RepairWorker {
// Lists all blocks on disk and adds them to the resync queue. // Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need, // This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally. // so that we can offload them if necessary and then delete them locally.
if let Some(hash) = bi.next().await? { if let Some(hash) = bi.next(None).await? {
self.manager self.manager
.resync .resync
.put_to_resync(&hash, Duration::from_secs(0))?; .put_to_resync(&hash, Duration::from_secs(0))?;
@ -204,7 +204,39 @@ mod v082 {
} }
} }
pub use v082::*; mod v083 {
use serde::{Deserialize, Serialize};
use garage_util::data::Hash;
use super::v082;
#[derive(Serialize, Deserialize)]
pub struct ScrubWorkerPersisted {
pub tranquility: u32,
pub(crate) time_last_complete_scrub: u64,
pub(crate) time_next_run_scrub: u64,
pub(crate) corruptions_detected: u64,
pub(crate) last_hash_checked: Option<Hash>,
}
impl garage_util::migrate::Migrate for ScrubWorkerPersisted {
type Previous = v082::ScrubWorkerPersisted;
const VERSION_MARKER: &'static [u8] = b"G083bswp";
fn migrate(old: v082::ScrubWorkerPersisted) -> ScrubWorkerPersisted {
ScrubWorkerPersisted {
tranquility: old.tranquility,
time_last_complete_scrub: old.time_last_complete_scrub,
time_next_run_scrub: old.time_last_complete_scrub,
corruptions_detected: old.corruptions_detected,
last_hash_checked: None,
}
}
}
}
pub use v083::*;
pub struct ScrubWorker { pub struct ScrubWorker {
manager: Arc<BlockManager>, manager: Arc<BlockManager>,
@ -235,6 +267,7 @@ impl Default for ScrubWorkerPersisted {
time_next_run_scrub: randomize_next_scrub_run_time(now_msec()), time_next_run_scrub: randomize_next_scrub_run_time(now_msec()),
tranquility: INITIAL_SCRUB_TRANQUILITY, tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0, corruptions_detected: 0,
last_hash_checked: None,
} }
} }
} }
@ -376,7 +409,10 @@ impl Worker for ScrubWorker {
match &mut self.work { match &mut self.work {
ScrubWorkerState::Running(bsi) => { ScrubWorkerState::Running(bsi) => {
self.tranquilizer.reset(); self.tranquilizer.reset();
if let Some(hash) = bsi.next().await? {
let has_last_hash_checked = self.persister.get_with(|p| p.last_hash_checked);
if let Some(hash) = bsi.next(has_last_hash_checked).await? {
match self.manager.read_block(&hash).await { match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => { Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash); error!("Found corrupt data block during scrub: {:?}", hash);
@ -385,6 +421,10 @@ impl Worker for ScrubWorker {
Err(e) => return Err(e), Err(e) => return Err(e),
_ => (), _ => (),
}; };
self.persister
.set_with(|p| p.last_hash_checked = Some(hash))?;
Ok(self Ok(self
.tranquilizer .tranquilizer
.tranquilize_worker(self.persister.get_with(|p| p.tranquility))) .tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
@ -487,7 +527,7 @@ impl BlockStoreIterator {
} }
} }
async fn next(&mut self) -> Result<Option<Hash>, Error> { async fn next(&mut self, last_hash: Option<Hash>) -> Result<Option<Hash>, Error> {
loop { loop {
let last_path = match self.path.last_mut() { let last_path = match self.path.last_mut() {
None => return Ok(None), None => return Ok(None),