From 41d4817eeb4210f9705b5a0f2de6545af0f80b9f Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Thu, 11 May 2023 22:14:25 +0100 Subject: [PATCH] block/repair.rs: Initial implementation of scrub checkpointing. --- src/block/repair.rs | 48 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/src/block/repair.rs b/src/block/repair.rs index 71093d69..6533752d 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -136,7 +136,7 @@ impl Worker for RepairWorker { // Lists all blocks on disk and adds them to the resync queue. // This allows us to find blocks we are storing but don't actually need, // so that we can offload them if necessary and then delete them locally. - if let Some(hash) = bi.next().await? { + if let Some(hash) = bi.next(None).await? { self.manager .resync .put_to_resync(&hash, Duration::from_secs(0))?; @@ -204,7 +204,39 @@ mod v082 { } } -pub use v082::*; +mod v083 { + use serde::{Deserialize, Serialize}; + + use garage_util::data::Hash; + + use super::v082; + + #[derive(Serialize, Deserialize)] + pub struct ScrubWorkerPersisted { + pub tranquility: u32, + pub(crate) time_last_complete_scrub: u64, + pub(crate) time_next_run_scrub: u64, + pub(crate) corruptions_detected: u64, + pub(crate) last_hash_checked: Option, + } + + impl garage_util::migrate::Migrate for ScrubWorkerPersisted { + type Previous = v082::ScrubWorkerPersisted; + const VERSION_MARKER: &'static [u8] = b"G083bswp"; + + fn migrate(old: v082::ScrubWorkerPersisted) -> ScrubWorkerPersisted { + ScrubWorkerPersisted { + tranquility: old.tranquility, + time_last_complete_scrub: old.time_last_complete_scrub, + time_next_run_scrub: old.time_last_complete_scrub, + corruptions_detected: old.corruptions_detected, + last_hash_checked: None, + } + } + } +} + +pub use v083::*; pub struct ScrubWorker { manager: Arc, @@ -235,6 +267,7 @@ impl Default for ScrubWorkerPersisted { time_next_run_scrub: randomize_next_scrub_run_time(now_msec()), tranquility: INITIAL_SCRUB_TRANQUILITY, corruptions_detected: 0, + last_hash_checked: None, } } } @@ -376,7 +409,10 @@ impl Worker for ScrubWorker { match &mut self.work { ScrubWorkerState::Running(bsi) => { self.tranquilizer.reset(); - if let Some(hash) = bsi.next().await? { + + let has_last_hash_checked = self.persister.get_with(|p| p.last_hash_checked); + + if let Some(hash) = bsi.next(has_last_hash_checked).await? { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); @@ -385,6 +421,10 @@ impl Worker for ScrubWorker { Err(e) => return Err(e), _ => (), }; + + self.persister + .set_with(|p| p.last_hash_checked = Some(hash))?; + Ok(self .tranquilizer .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) @@ -487,7 +527,7 @@ impl BlockStoreIterator { } } - async fn next(&mut self) -> Result, Error> { + async fn next(&mut self, last_hash: Option) -> Result, Error> { loop { let last_path = match self.path.last_mut() { None => return Ok(None), -- 2.45.2