Background task manager #332

Merged
lx merged 35 commits from background-task-manager into main 2022-07-08 11:30:32 +00:00
12 changed files with 105 additions and 105 deletions
Showing only changes of commit d1cf1a0fa6 - Show all commits

View file

@ -776,16 +776,16 @@ impl Worker for ResyncWorker {
async fn work( async fn work(
&mut self, &mut self,
_must_exit: &mut watch::Receiver<bool>, _must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> { ) -> Result<WorkerState, Error> {
self.tranquilizer.reset(); self.tranquilizer.reset();
match self.manager.resync_iter().await { match self.manager.resync_iter().await {
Ok(ResyncIterResult::BusyDidSomething) => Ok(self Ok(ResyncIterResult::BusyDidSomething) => Ok(self
.tranquilizer .tranquilizer
.tranquilize_worker(self.manager.background_tranquility)), .tranquilize_worker(self.manager.background_tranquility)),
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerStatus::Busy), Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => { Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay; self.next_delay = delay;
Ok(WorkerStatus::Idle) Ok(WorkerState::Idle)
} }
Err(e) => { Err(e) => {
// The errors that we have here are only Sled errors // The errors that we have here are only Sled errors
@ -799,12 +799,12 @@ impl Worker for ResyncWorker {
} }
} }
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
select! { select! {
_ = tokio::time::sleep(self.next_delay) => (), _ = tokio::time::sleep(self.next_delay) => (),
_ = self.manager.resync_notify.notified() => (), _ = self.manager.resync_notify.notified() => (),
}; };
WorkerStatus::Busy WorkerState::Busy
} }
} }

View file

@ -65,7 +65,7 @@ impl Worker for RepairWorker {
async fn work( async fn work(
&mut self, &mut self,
_must_exit: &mut watch::Receiver<bool>, _must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> { ) -> Result<WorkerState, Error> {
match self.block_iter.as_mut() { match self.block_iter.as_mut() {
None => { None => {
// Phase 1: Repair blocks from RC table. // Phase 1: Repair blocks from RC table.
@ -101,7 +101,7 @@ impl Worker for RepairWorker {
if batch_of_hashes.is_empty() { if batch_of_hashes.is_empty() {
// move on to phase 2 // move on to phase 2
self.block_iter = Some(BlockStoreIterator::new(&self.manager)); self.block_iter = Some(BlockStoreIterator::new(&self.manager));
return Ok(WorkerStatus::Busy); return Ok(WorkerState::Busy);
} }
for hash in batch_of_hashes.into_iter() { for hash in batch_of_hashes.into_iter() {
@ -109,7 +109,7 @@ impl Worker for RepairWorker {
self.next_start = Some(hash) self.next_start = Some(hash)
} }
Ok(WorkerStatus::Busy) Ok(WorkerState::Busy)
} }
Some(bi) => { Some(bi) => {
// Phase 2: Repair blocks actually on disk // Phase 2: Repair blocks actually on disk
@ -118,15 +118,15 @@ impl Worker for RepairWorker {
// so that we can offload them if necessary and then delete them locally. // so that we can offload them if necessary and then delete them locally.
if let Some(hash) = bi.next().await? { if let Some(hash) = bi.next().await? {
self.manager.put_to_resync(&hash, Duration::from_secs(0))?; self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
Ok(WorkerStatus::Busy) Ok(WorkerState::Busy)
} else { } else {
Ok(WorkerStatus::Done) Ok(WorkerState::Done)
} }
} }
} }
} }
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
unreachable!() unreachable!()
} }
} }
@ -282,10 +282,10 @@ impl Worker for ScrubWorker {
async fn work( async fn work(
&mut self, &mut self,
_must_exit: &mut watch::Receiver<bool>, _must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> { ) -> Result<WorkerState, Error> {
match self.rx_cmd.try_recv() { match self.rx_cmd.try_recv() {
Ok(cmd) => self.handle_cmd(cmd).await, Ok(cmd) => self.handle_cmd(cmd).await,
Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerStatus::Done), Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerState::Done),
Err(mpsc::error::TryRecvError::Empty) => (), Err(mpsc::error::TryRecvError::Empty) => (),
}; };
@ -310,16 +310,16 @@ impl Worker for ScrubWorker {
self.persister.save_async(&self.persisted).await?; self.persister.save_async(&self.persisted).await?;
self.work = ScrubWorkerState::Finished; self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear(); self.tranquilizer.clear();
Ok(WorkerStatus::Idle) Ok(WorkerState::Idle)
} }
} }
_ => Ok(WorkerStatus::Idle), _ => Ok(WorkerState::Idle),
} }
} }
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
let (wait_until, command) = match &self.work { let (wait_until, command) = match &self.work {
ScrubWorkerState::Running(_) => return WorkerStatus::Busy, ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => ( ScrubWorkerState::Finished => (
self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64, self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64,
@ -330,7 +330,7 @@ impl Worker for ScrubWorker {
let now = now_msec(); let now = now_msec();
if now >= wait_until { if now >= wait_until {
self.handle_cmd(command).await; self.handle_cmd(command).await;
return WorkerStatus::Busy; return WorkerState::Busy;
} }
let delay = Duration::from_millis(wait_until - now); let delay = Duration::from_millis(wait_until - now);
select! { select! {
@ -338,13 +338,13 @@ impl Worker for ScrubWorker {
cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd { cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
self.handle_cmd(cmd).await; self.handle_cmd(cmd).await;
} else { } else {
return WorkerStatus::Done; return WorkerState::Done;
} }
} }
match &self.work { match &self.work {
ScrubWorkerState::Running(_) => WorkerStatus::Busy, ScrubWorkerState::Running(_) => WorkerState::Busy,
_ => WorkerStatus::Idle, _ => WorkerState::Idle,
} }
} }
} }

View file

@ -245,10 +245,10 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
let mut wi = wi.into_iter().collect::<Vec<_>>(); let mut wi = wi.into_iter().collect::<Vec<_>>();
wi.sort_by_key(|(tid, info)| { wi.sort_by_key(|(tid, info)| {
( (
match info.status { match info.state {
WorkerStatus::Busy | WorkerStatus::Throttled(_) => 0, WorkerState::Busy | WorkerState::Throttled(_) => 0,
WorkerStatus::Idle => 1, WorkerState::Idle => 1,
WorkerStatus::Done => 2, WorkerState::Done => 2,
}, },
*tid, *tid,
) )
@ -256,14 +256,14 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
let mut table = vec![]; let mut table = vec![];
for (tid, info) in wi.iter() { for (tid, info) in wi.iter() {
if wlo.busy && !matches!(info.status, WorkerStatus::Busy | WorkerStatus::Throttled(_)) { if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
continue; continue;
} }
if wlo.errors && info.errors == 0 { if wlo.errors && info.errors == 0 {
continue; continue;
} }
table.push(format!("{}\t{}\t{}", tid, info.status, info.name)); table.push(format!("{}\t{}\t{}", tid, info.state, info.name));
if let Some(i) = &info.info { if let Some(i) = &info.info {
table.push(format!("\t\t {}", i)); table.push(format!("\t\t {}", i));
} }

View file

@ -92,7 +92,7 @@ impl Worker for RepairVersionsWorker {
async fn work( async fn work(
&mut self, &mut self,
_must_exit: &mut watch::Receiver<bool>, _must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> { ) -> Result<WorkerState, Error> {
let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? { let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? {
Some((k, v)) => { Some((k, v)) => {
self.pos = k; self.pos = k;
@ -100,7 +100,7 @@ impl Worker for RepairVersionsWorker {
} }
None => { None => {
info!("repair_versions: finished, done {}", self.counter); info!("repair_versions: finished, done {}", self.counter);
return Ok(WorkerStatus::Done); return Ok(WorkerState::Done);
} }
}; };
@ -134,10 +134,10 @@ impl Worker for RepairVersionsWorker {
} }
} }
Ok(WorkerStatus::Busy) Ok(WorkerState::Busy)
} }
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
unreachable!() unreachable!()
} }
} }
@ -173,7 +173,7 @@ impl Worker for RepairBlockrefsWorker {
async fn work( async fn work(
&mut self, &mut self,
_must_exit: &mut watch::Receiver<bool>, _must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> { ) -> Result<WorkerState, Error> {
let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
Some((k, v)) => { Some((k, v)) => {
self.pos = k; self.pos = k;
@ -181,7 +181,7 @@ impl Worker for RepairBlockrefsWorker {
} }
None => { None => {
info!("repair_block_ref: finished, done {}", self.counter); info!("repair_block_ref: finished, done {}", self.counter);
return Ok(WorkerStatus::Done); return Ok(WorkerState::Done);
} }
}; };
@ -212,10 +212,10 @@ impl Worker for RepairBlockrefsWorker {
} }
} }
Ok(WorkerStatus::Busy) Ok(WorkerState::Busy)
} }
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
unreachable!() unreachable!()
} }
} }

View file

@ -415,7 +415,7 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
} }
} }
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> { async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
// This loop batches updates to counters to be sent all at once. // This loop batches updates to counters to be sent all at once.
// They are sent once the propagate_rx channel has been emptied (or is closed). // They are sent once the propagate_rx channel has been emptied (or is closed).
let closed = loop { let closed = loop {
@ -435,7 +435,7 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
self.errors += 1; self.errors += 1;
if self.errors >= 2 && *must_exit.borrow() { if self.errors >= 2 && *must_exit.borrow() {
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
return Ok(WorkerStatus::Done); return Ok(WorkerState::Done);
} }
// Propagate error up to worker manager, it will log it, increment a counter, // Propagate error up to worker manager, it will log it, increment a counter,
// and sleep for a certain delay (with exponential backoff), waiting for // and sleep for a certain delay (with exponential backoff), waiting for
@ -448,23 +448,23 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
self.errors = 0; self.errors = 0;
} }
return Ok(WorkerStatus::Busy); return Ok(WorkerState::Busy);
} else if closed { } else if closed {
return Ok(WorkerStatus::Done); return Ok(WorkerState::Done);
} else { } else {
return Ok(WorkerStatus::Idle); return Ok(WorkerState::Idle);
} }
} }
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
match self.propagate_rx.recv().await { match self.propagate_rx.recv().await {
Some((pk, sk, counters)) => { Some((pk, sk, counters)) => {
self.add_ent(pk, sk, counters); self.add_ent(pk, sk, counters);
WorkerStatus::Busy WorkerState::Busy
} }
None => match self.buf.is_empty() { None => match self.buf.is_empty() {
false => WorkerStatus::Busy, false => WorkerState::Busy,
true => WorkerStatus::Done, true => WorkerState::Done,
}, },
} }
} }

View file

@ -347,22 +347,22 @@ where
async fn work( async fn work(
&mut self, &mut self,
_must_exit: &mut watch::Receiver<bool>, _must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> { ) -> Result<WorkerState, Error> {
match self.gc.gc_loop_iter().await? { match self.gc.gc_loop_iter().await? {
None => Ok(WorkerStatus::Busy), None => Ok(WorkerState::Busy),
Some(delay) => { Some(delay) => {
self.wait_delay = delay; self.wait_delay = delay;
Ok(WorkerStatus::Idle) Ok(WorkerState::Idle)
} }
} }
} }
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
if *must_exit.borrow() { if *must_exit.borrow() {
return WorkerStatus::Done; return WorkerState::Done;
} }
tokio::time::sleep(self.wait_delay).await; tokio::time::sleep(self.wait_delay).await;
WorkerStatus::Busy WorkerState::Busy
} }
} }

View file

@ -82,12 +82,12 @@ where
ret ret
} }
fn updater_loop_iter(&self) -> Result<WorkerStatus, Error> { fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
if let Some((key, valhash)) = self.data.merkle_todo.first()? { if let Some((key, valhash)) = self.data.merkle_todo.first()? {
self.update_item(&key, &valhash)?; self.update_item(&key, &valhash)?;
Ok(WorkerStatus::Busy) Ok(WorkerState::Busy)
} else { } else {
Ok(WorkerStatus::Idle) Ok(WorkerState::Idle)
} }
} }
@ -325,27 +325,27 @@ where
async fn work( async fn work(
&mut self, &mut self,
_must_exit: &mut watch::Receiver<bool>, _must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> { ) -> Result<WorkerState, Error> {
let updater = self.0.clone(); let updater = self.0.clone();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
for _i in 0..100 { for _i in 0..100 {
let s = updater.updater_loop_iter(); let s = updater.updater_loop_iter();
if !matches!(s, Ok(WorkerStatus::Busy)) { if !matches!(s, Ok(WorkerState::Busy)) {
return s; return s;
} }
} }
Ok(WorkerStatus::Busy) Ok(WorkerState::Busy)
}) })
.await .await
.unwrap() .unwrap()
} }
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
if *must_exit.borrow() { if *must_exit.borrow() {
return WorkerStatus::Done; return WorkerState::Done;
} }
tokio::time::sleep(Duration::from_secs(10)).await; tokio::time::sleep(Duration::from_secs(10)).await;
WorkerStatus::Busy WorkerState::Busy
} }
} }

View file

@ -586,18 +586,18 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
} }
} }
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> { async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
if let Some(partition) = self.pop_task() { if let Some(partition) = self.pop_task() {
self.syncer.sync_partition(&partition, must_exit).await?; self.syncer.sync_partition(&partition, must_exit).await?;
Ok(WorkerStatus::Busy) Ok(WorkerState::Busy)
} else { } else {
Ok(WorkerStatus::Idle) Ok(WorkerState::Idle)
} }
} }
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
if *must_exit.borrow() { if *must_exit.borrow() {
return WorkerStatus::Done; return WorkerState::Done;
} }
select! { select! {
s = self.add_full_sync_rx.recv() => { s = self.add_full_sync_rx.recv() => {
@ -619,8 +619,8 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
} }
} }
match self.todo.is_empty() { match self.todo.is_empty() {
false => WorkerStatus::Busy, false => WorkerState::Busy,
true => WorkerStatus::Idle, true => WorkerState::Idle,
} }
} }
} }

View file

@ -24,17 +24,17 @@ impl Worker for JobWorker {
async fn work( async fn work(
&mut self, &mut self,
_must_exit: &mut watch::Receiver<bool>, _must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> { ) -> Result<WorkerState, Error> {
match self.next_job.take() { match self.next_job.take() {
None => return Ok(WorkerStatus::Idle), None => return Ok(WorkerState::Idle),
Some(job) => { Some(job) => {
job.await?; job.await?;
Ok(WorkerStatus::Busy) Ok(WorkerState::Busy)
} }
} }
} }
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
loop { loop {
match self.job_chan.lock().await.recv().await { match self.job_chan.lock().await.recv().await {
Some((job, cancellable)) => { Some((job, cancellable)) => {
@ -42,9 +42,9 @@ impl Worker for JobWorker {
continue; continue;
} }
self.next_job = Some(job); self.next_job = Some(job);
return WorkerStatus::Busy; return WorkerState::Busy;
} }
None => return WorkerStatus::Done, None => return WorkerState::Done,
} }
} }
} }

View file

@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch, Mutex};
use crate::error::Error; use crate::error::Error;
use worker::WorkerProcessor; use worker::WorkerProcessor;
pub use worker::{Worker, WorkerStatus}; pub use worker::{Worker, WorkerState};
pub(crate) type JobOutput = Result<(), Error>; pub(crate) type JobOutput = Result<(), Error>;
pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
@ -30,7 +30,7 @@ pub struct BackgroundRunner {
pub struct WorkerInfo { pub struct WorkerInfo {
pub name: String, pub name: String,
pub info: Option<String>, pub info: Option<String>,
pub status: WorkerStatus, pub state: WorkerState,
pub errors: usize, pub errors: usize,
pub consecutive_errors: usize, pub consecutive_errors: usize,
pub last_error: Option<(String, u64)>, pub last_error: Option<(String, u64)>,

View file

@ -16,20 +16,20 @@ use crate::error::Error;
use crate::time::now_msec; use crate::time::now_msec;
#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)] #[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
pub enum WorkerStatus { pub enum WorkerState {
Busy, Busy,
Throttled(f32), Throttled(f32),
Idle, Idle,
Done, Done,
} }
impl std::fmt::Display for WorkerStatus { impl std::fmt::Display for WorkerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
WorkerStatus::Busy => write!(f, "Busy"), WorkerState::Busy => write!(f, "Busy"),
WorkerStatus::Throttled(t) => write!(f, "Thr:{:.3}", t), WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t),
WorkerStatus::Idle => write!(f, "Idle"), WorkerState::Idle => write!(f, "Idle"),
WorkerStatus::Done => write!(f, "Done"), WorkerState::Done => write!(f, "Done"),
} }
} }
} }
@ -43,18 +43,18 @@ pub trait Worker: Send {
} }
/// Work: do a basic unit of work, if one is available (otherwise, should return /// Work: do a basic unit of work, if one is available (otherwise, should return
/// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the /// WorkerState::Idle immediately). We will do our best to not interrupt this future in the
/// middle of processing, it will only be interrupted at the last minute when Garage is trying /// middle of processing, it will only be interrupted at the last minute when Garage is trying
/// to exit and this hasn't returned yet. This function may return an error to indicate that /// to exit and this hasn't returned yet. This function may return an error to indicate that
/// its unit of work could not be processed due to an error: the error will be logged and /// its unit of work could not be processed due to an error: the error will be logged and
/// .work() will be called again after a short delay. /// .work() will be called again after a short delay.
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>; async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
/// Wait for work: await for some task to become available. This future can be interrupted in /// Wait for work: await for some task to become available. This future can be interrupted in
/// the middle for any reason. This future doesn't have to await on must_exit.changed(), we /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
/// are doing it for you. Therefore it only receives a read refernce to must_exit which allows /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
/// it to check if we are exiting. /// it to check if we are exiting.
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus; async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState;
} }
pub(crate) struct WorkerProcessor { pub(crate) struct WorkerProcessor {
@ -100,7 +100,7 @@ impl WorkerProcessor {
stop_signal, stop_signal,
stop_signal_worker, stop_signal_worker,
worker: new_worker, worker: new_worker,
status: WorkerStatus::Busy, state: WorkerState::Busy,
errors: 0, errors: 0,
consecutive_errors: 0, consecutive_errors: 0,
last_error: None, last_error: None,
@ -113,13 +113,13 @@ impl WorkerProcessor {
} }
worker = await_next_worker => { worker = await_next_worker => {
if let Some(mut worker) = worker { if let Some(mut worker) = worker {
trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status); trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.state);
// Save worker info // Save worker info
let mut wi = self.worker_info.lock().unwrap(); let mut wi = self.worker_info.lock().unwrap();
match wi.get_mut(&worker.task_id) { match wi.get_mut(&worker.task_id) {
Some(i) => { Some(i) => {
i.status = worker.status; i.state = worker.state;
i.info = worker.worker.info(); i.info = worker.worker.info();
i.errors = worker.errors; i.errors = worker.errors;
i.consecutive_errors = worker.consecutive_errors; i.consecutive_errors = worker.consecutive_errors;
@ -130,7 +130,7 @@ impl WorkerProcessor {
None => { None => {
wi.insert(worker.task_id, WorkerInfo { wi.insert(worker.task_id, WorkerInfo {
name: worker.worker.name(), name: worker.worker.name(),
status: worker.status, state: worker.state,
info: worker.worker.info(), info: worker.worker.info(),
errors: worker.errors, errors: worker.errors,
consecutive_errors: worker.consecutive_errors, consecutive_errors: worker.consecutive_errors,
@ -139,7 +139,7 @@ impl WorkerProcessor {
} }
} }
if worker.status == WorkerStatus::Done { if worker.state == WorkerState::Done {
info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
} else { } else {
workers.push(async move { workers.push(async move {
@ -157,14 +157,14 @@ impl WorkerProcessor {
let drain_half_time = Instant::now() + Duration::from_secs(5); let drain_half_time = Instant::now() + Duration::from_secs(5);
let drain_everything = async move { let drain_everything = async move {
while let Some(mut worker) = workers.next().await { while let Some(mut worker) = workers.next().await {
if worker.status == WorkerStatus::Done { if worker.state == WorkerState::Done {
info!( info!(
"Worker {} (TID {}) exited", "Worker {} (TID {}) exited",
worker.worker.name(), worker.worker.name(),
worker.task_id worker.task_id
); );
} else if Instant::now() > drain_half_time { } else if Instant::now() > drain_half_time {
warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.status); warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state);
} else { } else {
workers.push( workers.push(
async move { async move {
@ -193,7 +193,7 @@ struct WorkerHandler {
stop_signal: watch::Receiver<bool>, stop_signal: watch::Receiver<bool>,
stop_signal_worker: watch::Receiver<bool>, stop_signal_worker: watch::Receiver<bool>,
worker: Box<dyn Worker>, worker: Box<dyn Worker>,
status: WorkerStatus, state: WorkerState,
errors: usize, errors: usize,
consecutive_errors: usize, consecutive_errors: usize,
last_error: Option<(String, u64)>, last_error: Option<(String, u64)>,
@ -201,10 +201,10 @@ struct WorkerHandler {
impl WorkerHandler { impl WorkerHandler {
async fn step(&mut self) { async fn step(&mut self) {
match self.status { match self.state {
WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await { WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await {
Ok(s) => { Ok(s) => {
self.status = s; self.state = s;
self.consecutive_errors = 0; self.consecutive_errors = 0;
} }
Err(e) => { Err(e) => {
@ -219,12 +219,12 @@ impl WorkerHandler {
self.last_error = Some((format!("{}", e), now_msec())); self.last_error = Some((format!("{}", e), now_msec()));
// Sleep a bit so that error won't repeat immediately, exponential backoff // Sleep a bit so that error won't repeat immediately, exponential backoff
// strategy (min 1sec, max ~60sec) // strategy (min 1sec, max ~60sec)
self.status = WorkerStatus::Throttled( self.state = WorkerState::Throttled(
(1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32), (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32),
); );
} }
}, },
WorkerStatus::Throttled(delay) => { WorkerState::Throttled(delay) => {
// Sleep for given delay and go back to busy state // Sleep for given delay and go back to busy state
if !*self.stop_signal.borrow() { if !*self.stop_signal.borrow() {
select! { select! {
@ -232,13 +232,13 @@ impl WorkerHandler {
_ = self.stop_signal.changed() => (), _ = self.stop_signal.changed() => (),
} }
} }
self.status = WorkerStatus::Busy; self.state = WorkerState::Busy;
} }
WorkerStatus::Idle => { WorkerState::Idle => {
if *self.stop_signal.borrow() { if *self.stop_signal.borrow() {
select! { select! {
new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
self.status = new_st; self.state = new_st;
} }
_ = tokio::time::sleep(Duration::from_secs(1)) => { _ = tokio::time::sleep(Duration::from_secs(1)) => {
// stay in Idle state // stay in Idle state
@ -247,7 +247,7 @@ impl WorkerHandler {
} else { } else {
select! { select! {
new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
self.status = new_st; self.state = new_st;
} }
_ = self.stop_signal.changed() => { _ = self.stop_signal.changed() => {
// stay in Idle state // stay in Idle state
@ -255,7 +255,7 @@ impl WorkerHandler {
} }
} }
} }
WorkerStatus::Done => unreachable!(), WorkerState::Done => unreachable!(),
} }
} }
} }

View file

@ -3,7 +3,7 @@ use std::time::{Duration, Instant};
use tokio::time::sleep; use tokio::time::sleep;
use crate::background::WorkerStatus; use crate::background::WorkerState;
/// A tranquilizer is a helper object that is used to make /// A tranquilizer is a helper object that is used to make
/// background operations not take up too much time. /// background operations not take up too much time.
@ -61,10 +61,10 @@ impl Tranquilizer {
} }
#[must_use] #[must_use]
pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerStatus { pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerState {
match self.tranquilize_internal(tranquility) { match self.tranquilize_internal(tranquility) {
Some(delay) => WorkerStatus::Throttled(delay.as_secs_f32()), Some(delay) => WorkerState::Throttled(delay.as_secs_f32()),
None => WorkerStatus::Busy, None => WorkerState::Busy,
} }
} }