Background task manager #332

Merged
lx merged 35 commits from background-task-manager into main 2022-07-08 11:30:32 +00:00
Showing only changes of commit 4cc9a648ab - Show all commits

View file

@ -261,8 +261,12 @@ impl Worker for ScrubWorker {
bsi.progress() * 100., bsi.progress() * 100.,
self.persisted.tranquility self.persisted.tranquility
), ),
ScrubWorkerState::Paused(_bsi, rt) => { ScrubWorkerState::Paused(bsi, rt) => {
format!("Paused, resumes at {}", msec_to_rfc3339(*rt)) format!(
"Paused, {:.2}% done, resumes at {}",
bsi.progress() * 100.,
msec_to_rfc3339(*rt)
)
} }
ScrubWorkerState::Finished => format!( ScrubWorkerState::Finished => format!(
"Last completed scrub: {}", "Last completed scrub: {}",
@ -314,44 +318,30 @@ impl Worker for ScrubWorker {
} }
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
match &self.work { let (wait_until, command) = match &self.work {
ScrubWorkerState::Running(_) => return WorkerStatus::Busy, ScrubWorkerState::Running(_) => return WorkerStatus::Busy,
ScrubWorkerState::Paused(_, resume_time) => { ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
let now = now_msec(); ScrubWorkerState::Finished => (
if now >= *resume_time { self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64,
self.handle_cmd(ScrubWorkerCommand::Resume).await; ScrubWorkerCommand::Start,
return WorkerStatus::Busy; ),
} };
let delay = Duration::from_millis(*resume_time - now);
select! { let now = now_msec();
_ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Resume).await, if now >= wait_until {
cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd { self.handle_cmd(command).await;
self.handle_cmd(cmd).await; return WorkerStatus::Busy;
} else { }
return WorkerStatus::Done; let delay = Duration::from_millis(wait_until - now);
} select! {
} _ = tokio::time::sleep(delay) => self.handle_cmd(command).await,
} cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
ScrubWorkerState::Finished => { self.handle_cmd(cmd).await;
let now = now_msec(); } else {
if now - self.persisted.time_last_complete_scrub return WorkerStatus::Done;
>= SCRUB_INTERVAL.as_millis() as u64
{
self.handle_cmd(ScrubWorkerCommand::Start).await;
return WorkerStatus::Busy;
}
let delay = SCRUB_INTERVAL
- Duration::from_millis(now - self.persisted.time_last_complete_scrub);
select! {
_ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Start).await,
cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
self.handle_cmd(cmd).await;
} else {
return WorkerStatus::Done;
}
}
} }
} }
match &self.work { match &self.work {
ScrubWorkerState::Running(_) => WorkerStatus::Busy, ScrubWorkerState::Running(_) => WorkerStatus::Busy,
_ => WorkerStatus::Idle, _ => WorkerStatus::Idle,