Background task manager #332

Merged
lx merged 35 commits from background-task-manager into main 2022-07-08 11:30:32 +00:00
Showing only changes of commit b0a181e17e - Show all commits

View file

@ -148,6 +148,7 @@ pub struct ScrubWorker {
struct ScrubWorkerPersisted {
tranquility: u32,
time_last_complete_scrub: u64,
corruptions_detected: u64,
}
enum ScrubWorkerState {
@ -179,6 +180,7 @@ impl ScrubWorker {
Err(_) => ScrubWorkerPersisted {
time_last_complete_scrub: 0,
tranquility: 4,
corruptions_detected: 0,
},
};
Self {
@ -253,20 +255,21 @@ impl Worker for ScrubWorker {
}
fn info(&self) -> Option<String> {
match &self.work {
ScrubWorkerState::Running(bsi) => Some(format!(
let s = match &self.work {
ScrubWorkerState::Running(bsi) => format!(
"{:.2}% done (tranquility = {})",
bsi.progress() * 100.,
self.persisted.tranquility
)),
),
ScrubWorkerState::Paused(_bsi, rt) => {
Some(format!("Paused, resumes at {}", msec_to_rfc3339(*rt)))
format!("Paused, resumes at {}", msec_to_rfc3339(*rt))
}
ScrubWorkerState::Finished => Some(format!(
ScrubWorkerState::Finished => format!(
"Last completed scrub: {}",
msec_to_rfc3339(self.persisted.time_last_complete_scrub)
)),
}
),
};
Some(format!("{} ; corruptions detected: {}", s, self.persisted.corruptions_detected))
}
async fn work(
@ -283,7 +286,15 @@ impl Worker for ScrubWorker {
ScrubWorkerState::Running(bsi) => {
self.tranquilizer.reset();
if let Some(hash) = bsi.next().await? {
let _ = self.manager.read_block(&hash).await;
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
self.persisted.corruptions_detected += 1;
self.persister.save_async(&self.persisted).await?;
}
Err(e) => return Err(e),
_ => (),
};
Ok(self
.tranquilizer
.tranquilize_worker(self.persisted.tranquility))