WIP: block/repair.rs: Initial implementation of scrub checkpointing #582
1 changed files with 44 additions and 4 deletions
|
@ -136,7 +136,7 @@ impl Worker for RepairWorker {
|
||||||
// Lists all blocks on disk and adds them to the resync queue.
|
// Lists all blocks on disk and adds them to the resync queue.
|
||||||
// This allows us to find blocks we are storing but don't actually need,
|
// This allows us to find blocks we are storing but don't actually need,
|
||||||
// so that we can offload them if necessary and then delete them locally.
|
// so that we can offload them if necessary and then delete them locally.
|
||||||
if let Some(hash) = bi.next().await? {
|
if let Some(hash) = bi.next(None).await? {
|
||||||
self.manager
|
self.manager
|
||||||
.resync
|
.resync
|
||||||
.put_to_resync(&hash, Duration::from_secs(0))?;
|
.put_to_resync(&hash, Duration::from_secs(0))?;
|
||||||
|
@ -204,7 +204,39 @@ mod v082 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub use v082::*;
|
mod v083 {
|
||||||
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use garage_util::data::Hash;
|
||||||
|
|
||||||
|
use super::v082;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub struct ScrubWorkerPersisted {
|
||||||
|
pub tranquility: u32,
|
||||||
|
pub(crate) time_last_complete_scrub: u64,
|
||||||
|
pub(crate) time_next_run_scrub: u64,
|
||||||
|
pub(crate) corruptions_detected: u64,
|
||||||
|
pub(crate) last_hash_checked: Option<Hash>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl garage_util::migrate::Migrate for ScrubWorkerPersisted {
|
||||||
|
type Previous = v082::ScrubWorkerPersisted;
|
||||||
|
const VERSION_MARKER: &'static [u8] = b"G083bswp";
|
||||||
|
|
||||||
|
fn migrate(old: v082::ScrubWorkerPersisted) -> ScrubWorkerPersisted {
|
||||||
|
ScrubWorkerPersisted {
|
||||||
|
tranquility: old.tranquility,
|
||||||
|
time_last_complete_scrub: old.time_last_complete_scrub,
|
||||||
|
time_next_run_scrub: old.time_last_complete_scrub,
|
||||||
|
corruptions_detected: old.corruptions_detected,
|
||||||
|
last_hash_checked: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub use v083::*;
|
||||||
|
|
||||||
pub struct ScrubWorker {
|
pub struct ScrubWorker {
|
||||||
manager: Arc<BlockManager>,
|
manager: Arc<BlockManager>,
|
||||||
|
@ -235,6 +267,7 @@ impl Default for ScrubWorkerPersisted {
|
||||||
time_next_run_scrub: randomize_next_scrub_run_time(now_msec()),
|
time_next_run_scrub: randomize_next_scrub_run_time(now_msec()),
|
||||||
tranquility: INITIAL_SCRUB_TRANQUILITY,
|
tranquility: INITIAL_SCRUB_TRANQUILITY,
|
||||||
corruptions_detected: 0,
|
corruptions_detected: 0,
|
||||||
|
last_hash_checked: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -376,7 +409,10 @@ impl Worker for ScrubWorker {
|
||||||
match &mut self.work {
|
match &mut self.work {
|
||||||
ScrubWorkerState::Running(bsi) => {
|
ScrubWorkerState::Running(bsi) => {
|
||||||
self.tranquilizer.reset();
|
self.tranquilizer.reset();
|
||||||
if let Some(hash) = bsi.next().await? {
|
|
||||||
|
let has_last_hash_checked = self.persister.get_with(|p| p.last_hash_checked);
|
||||||
|
|
||||||
|
if let Some(hash) = bsi.next(has_last_hash_checked).await? {
|
||||||
match self.manager.read_block(&hash).await {
|
match self.manager.read_block(&hash).await {
|
||||||
Err(Error::CorruptData(_)) => {
|
Err(Error::CorruptData(_)) => {
|
||||||
error!("Found corrupt data block during scrub: {:?}", hash);
|
error!("Found corrupt data block during scrub: {:?}", hash);
|
||||||
|
@ -385,6 +421,10 @@ impl Worker for ScrubWorker {
|
||||||
Err(e) => return Err(e),
|
Err(e) => return Err(e),
|
||||||
_ => (),
|
_ => (),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
self.persister
|
||||||
|
.set_with(|p| p.last_hash_checked = Some(hash))?;
|
||||||
|
|
||||||
Ok(self
|
Ok(self
|
||||||
.tranquilizer
|
.tranquilizer
|
||||||
.tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
|
.tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
|
||||||
|
@ -487,7 +527,7 @@ impl BlockStoreIterator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn next(&mut self) -> Result<Option<Hash>, Error> {
|
async fn next(&mut self, last_hash: Option<Hash>) -> Result<Option<Hash>, Error> {
|
||||||
loop {
|
loop {
|
||||||
let last_path = match self.path.last_mut() {
|
let last_path = match self.path.last_mut() {
|
||||||
None => return Ok(None),
|
None => return Ok(None),
|
||||||
|
|
Loading…
Reference in a new issue
BTW you do not need to add this whole module with a new migration. Since the value of
last_hash_checked
if it was not present before is itsDefault::default()
value (None
), you can just add it to the struct inv082
with the attribute#[serde(default)]