Background task manager #332

Merged
lx merged 35 commits from background-task-manager into main 2022-07-08 11:30:32 +00:00
7 changed files with 9 additions and 33 deletions
Showing only changes of commit 08cd5f2f1d - Show all commits

2
Cargo.lock generated
View file

@ -1737,7 +1737,7 @@ dependencies = [
[[package]]
name = "k2v-client"
version = "0.1.0"
version = "0.0.1"
dependencies = [
"base64",
"clap 3.1.18",

View file

@ -773,10 +773,7 @@ impl Worker for ResyncWorker {
}
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerState, Error> {
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
self.tranquilizer.reset();
match self.manager.resync_iter().await {
Ok(ResyncIterResult::BusyDidSomething) => Ok(self

View file

@ -62,10 +62,7 @@ impl Worker for RepairWorker {
}
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerState, Error> {
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
match self.block_iter.as_mut() {
None => {
// Phase 1: Repair blocks from RC table.
@ -279,10 +276,7 @@ impl Worker for ScrubWorker {
))
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerState, Error> {
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
match self.rx_cmd.try_recv() {
Ok(cmd) => self.handle_cmd(cmd).await,
Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerState::Done),

View file

@ -89,10 +89,7 @@ impl Worker for RepairVersionsWorker {
Some(format!("{} items done", self.counter))
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerState, Error> {
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? {
Some((k, v)) => {
self.pos = k;
@ -170,10 +167,7 @@ impl Worker for RepairBlockrefsWorker {
Some(format!("{} items done", self.counter))
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerState, Error> {
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
Some((k, v)) => {
self.pos = k;

View file

@ -344,10 +344,7 @@ where
}
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerState, Error> {
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
match self.gc.gc_loop_iter().await? {
None => Ok(WorkerState::Busy),
Some(delay) => {

View file

@ -322,10 +322,7 @@ where
}
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerState, Error> {
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
let updater = self.0.clone();
tokio::task::spawn_blocking(move || {
for _i in 0..100 {

View file

@ -21,10 +21,7 @@ impl Worker for JobWorker {
format!("Job worker #{}", self.index)
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerState, Error> {
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
match self.next_job.take() {
None => return Ok(WorkerState::Idle),
Some(job) => {