forked from Deuxfleurs/garage
Alex
4f38cadf6e
- [x] New background worker trait - [x] Adapt all current workers to use new API - [x] Command to list currently running workers, and whether they are active, idle, or dead - [x] Error reporting - Optimizations - [x] Merkle updater: several items per iteration - [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on - scrub: - [x] have only one worker with a channel to start/pause/cancel - [x] automatic scrub - [x] ability to view and change tranquility from CLI - [x] persistence of a few info - [ ] Testing Co-authored-by: Alex Auvolat <alex@adnab.me> Reviewed-on: Deuxfleurs/garage#332 Co-authored-by: Alex <alex@adnab.me> Co-committed-by: Alex <alex@adnab.me>
48 lines
1.1 KiB
Rust
48 lines
1.1 KiB
Rust
//! Job worker: a generic worker that just processes incoming
|
|
//! jobs one by one
|
|
|
|
use std::sync::Arc;
|
|
|
|
use async_trait::async_trait;
|
|
use tokio::sync::{mpsc, Mutex};
|
|
|
|
use crate::background::worker::*;
|
|
use crate::background::*;
|
|
|
|
pub(crate) struct JobWorker {
|
|
pub(crate) index: usize,
|
|
pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
|
|
pub(crate) next_job: Option<Job>,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Worker for JobWorker {
|
|
fn name(&self) -> String {
|
|
format!("Job worker #{}", self.index)
|
|
}
|
|
|
|
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
|
|
match self.next_job.take() {
|
|
None => return Ok(WorkerState::Idle),
|
|
Some(job) => {
|
|
job.await?;
|
|
Ok(WorkerState::Busy)
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
|
|
loop {
|
|
match self.job_chan.lock().await.recv().await {
|
|
Some((job, cancellable)) => {
|
|
if cancellable && *must_exit.borrow() {
|
|
continue;
|
|
}
|
|
self.next_job = Some(job);
|
|
return WorkerState::Busy;
|
|
}
|
|
None => return WorkerState::Done,
|
|
}
|
|
}
|
|
}
|
|
}
|