Background task manager #332
8 changed files with 89 additions and 27 deletions
|
@ -734,12 +734,9 @@ impl Worker for ResyncWorker {
|
|||
) -> Result<WorkerStatus, Error> {
|
||||
self.tranquilizer.reset();
|
||||
match self.manager.resync_iter().await {
|
||||
Ok(ResyncIterResult::BusyDidSomething) => {
|
||||
self.tranquilizer
|
||||
.tranquilize(self.manager.background_tranquility)
|
||||
.await;
|
||||
Ok(WorkerStatus::Busy)
|
||||
}
|
||||
Ok(ResyncIterResult::BusyDidSomething) => Ok(self
|
||||
.tranquilizer
|
||||
.tranquilize_worker(self.manager.background_tranquility)),
|
||||
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerStatus::Busy),
|
||||
Ok(ResyncIterResult::IdleFor(delay)) => {
|
||||
self.next_delay = delay;
|
||||
|
@ -750,10 +747,8 @@ impl Worker for ResyncWorker {
|
|||
// We don't really know how to handle them so just ¯\_(ツ)_/¯
|
||||
// (there is kind of an assumption that Sled won't error on us,
|
||||
// if it does there is not much we can do -- TODO should we just panic?)
|
||||
error!(
|
||||
"Could not do a resync iteration: {} (this is a very bad error)",
|
||||
e
|
||||
);
|
||||
// Here we just give the error to the worker manager,
|
||||
// it will print it to the logs and increment a counter
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -138,8 +138,7 @@ impl Worker for ScrubWorker {
|
|||
self.tranquilizer.reset();
|
||||
if let Some(hash) = self.iterator.next().await? {
|
||||
let _ = self.manager.read_block(&hash).await;
|
||||
self.tranquilizer.tranquilize(self.tranquility).await;
|
||||
Ok(WorkerStatus::Busy)
|
||||
Ok(self.tranquilizer.tranquilize_worker(self.tranquility))
|
||||
} else {
|
||||
Ok(WorkerStatus::Done)
|
||||
}
|
||||
|
|
|
@ -835,7 +835,9 @@ impl AdminRpcHandler {
|
|||
let workers = if busy {
|
||||
workers
|
||||
.into_iter()
|
||||
.filter(|(_, w)| w.status == WorkerStatus::Busy)
|
||||
.filter(|(_, w)| {
|
||||
matches!(w.status, WorkerStatus::Busy | WorkerStatus::Throttled(_))
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
workers
|
||||
|
|
|
@ -242,7 +242,7 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>) {
|
|||
wi.sort_by_key(|(tid, info)| {
|
||||
(
|
||||
match info.status {
|
||||
WorkerStatus::Busy => 0,
|
||||
WorkerStatus::Busy | WorkerStatus::Throttled(_) => 0,
|
||||
WorkerStatus::Idle => 1,
|
||||
WorkerStatus::Done => 2,
|
||||
},
|
||||
|
@ -256,6 +256,20 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>) {
|
|||
if let Some(i) = &info.info {
|
||||
table.push(format!("\t\t{}", i));
|
||||
}
|
||||
if info.consecutive_errors > 0 {
|
||||
table.push(format!(
|
||||
"\t\t{} CONSECUTIVE ERRORS ({} total), last: {}",
|
||||
info.consecutive_errors,
|
||||
info.errors,
|
||||
info.last_error.as_deref().unwrap_or("(?)")
|
||||
));
|
||||
} else if info.errors > 0 {
|
||||
table.push(format!(
|
||||
"\t\t{} errors, last: {}",
|
||||
info.errors,
|
||||
info.last_error.as_deref().unwrap_or("(?)")
|
||||
));
|
||||
}
|
||||
}
|
||||
format_table(table);
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ use core::ops::Bound;
|
|||
use std::collections::{hash_map, BTreeMap, HashMap};
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -408,6 +407,10 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
|
|||
format!("{} index counter propagator", T::COUNTER_TABLE_NAME)
|
||||
}
|
||||
|
||||
fn info(&self) -> Option<String> {
|
||||
Some(format!("{} items in queue", self.buf.len()))
|
||||
}
|
||||
|
||||
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> {
|
||||
// This loop batches updates to counters to be sent all at once.
|
||||
// They are sent once the propagate_rx channel has been emptied (or is closed).
|
||||
|
@ -429,9 +432,10 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
|
|||
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
|
||||
return Ok(WorkerStatus::Done);
|
||||
}
|
||||
warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, self.buf.len(), e, self.errors);
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
return Ok(WorkerStatus::Busy);
|
||||
// Propagate error up to worker manager, it will log it, increment a counter,
|
||||
// and sleep for a certain delay (with exponential backoff), waiting for
|
||||
// things to go back to normal
|
||||
return Err(e);
|
||||
} else {
|
||||
self.buf.clear();
|
||||
self.errors = 0;
|
||||
|
|
|
@ -31,6 +31,9 @@ pub struct WorkerInfo {
|
|||
pub name: String,
|
||||
pub info: Option<String>,
|
||||
pub status: WorkerStatus,
|
||||
pub errors: usize,
|
||||
pub consecutive_errors: usize,
|
||||
pub last_error: Option<String>,
|
||||
}
|
||||
|
||||
impl BackgroundRunner {
|
||||
|
|
|
@ -17,6 +17,7 @@ use crate::error::Error;
|
|||
#[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum WorkerStatus {
|
||||
Busy,
|
||||
Throttled(f32),
|
||||
Idle,
|
||||
Done,
|
||||
}
|
||||
|
@ -82,14 +83,17 @@ impl WorkerProcessor {
|
|||
next_task_id += 1;
|
||||
let stop_signal = self.stop_signal.clone();
|
||||
let stop_signal_worker = self.stop_signal.clone();
|
||||
workers.push(async move {
|
||||
let mut worker = WorkerHandler {
|
||||
let mut worker = WorkerHandler {
|
||||
task_id,
|
||||
stop_signal,
|
||||
stop_signal_worker,
|
||||
worker: new_worker,
|
||||
status: WorkerStatus::Busy,
|
||||
errors: 0,
|
||||
consecutive_errors: 0,
|
||||
last_error: None,
|
||||
};
|
||||
workers.push(async move {
|
||||
worker.step().await;
|
||||
worker
|
||||
}.boxed());
|
||||
|
@ -98,21 +102,31 @@ impl WorkerProcessor {
|
|||
worker = await_next_worker => {
|
||||
if let Some(mut worker) = worker {
|
||||
trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status);
|
||||
|
||||
// Save worker info
|
||||
let mut wi = self.worker_info.lock().unwrap();
|
||||
match wi.get_mut(&worker.task_id) {
|
||||
Some(i) => {
|
||||
i.status = worker.status;
|
||||
i.info = worker.worker.info();
|
||||
i.errors = worker.errors;
|
||||
i.consecutive_errors = worker.consecutive_errors;
|
||||
if worker.last_error.is_some() {
|
||||
i.last_error = worker.last_error.take();
|
||||
}
|
||||
}
|
||||
None => {
|
||||
wi.insert(worker.task_id, WorkerInfo {
|
||||
name: worker.worker.name(),
|
||||
status: worker.status,
|
||||
info: worker.worker.info(),
|
||||
errors: worker.errors,
|
||||
consecutive_errors: worker.consecutive_errors,
|
||||
last_error: worker.last_error.take(),
|
||||
});
|
||||
}
|
||||
}
|
||||
// TODO save new worker status somewhere
|
||||
|
||||
if worker.status == WorkerStatus::Done {
|
||||
info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
|
||||
} else {
|
||||
|
@ -169,6 +183,9 @@ struct WorkerHandler {
|
|||
stop_signal_worker: watch::Receiver<bool>,
|
||||
worker: Box<dyn Worker>,
|
||||
status: WorkerStatus,
|
||||
errors: usize,
|
||||
consecutive_errors: usize,
|
||||
last_error: Option<String>,
|
||||
}
|
||||
|
||||
impl WorkerHandler {
|
||||
|
@ -177,6 +194,7 @@ impl WorkerHandler {
|
|||
WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await {
|
||||
Ok(s) => {
|
||||
self.status = s;
|
||||
self.consecutive_errors = 0;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
|
@ -185,11 +203,21 @@ impl WorkerHandler {
|
|||
self.task_id,
|
||||
e
|
||||
);
|
||||
// Sleep a bit so that error won't repeat immediately
|
||||
// (TODO good way to handle errors)
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
self.errors += 1;
|
||||
self.consecutive_errors += 1;
|
||||
self.last_error = Some(format!("{}", e));
|
||||
// Sleep a bit so that error won't repeat immediately, exponential backoff
|
||||
// strategy (min 1sec, max ~60sec)
|
||||
self.status = WorkerStatus::Throttled(
|
||||
(1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32),
|
||||
);
|
||||
}
|
||||
},
|
||||
WorkerStatus::Throttled(delay) => {
|
||||
// Sleep for given delay and go back to busy state
|
||||
tokio::time::sleep(Duration::from_secs_f32(delay)).await;
|
||||
self.status = WorkerStatus::Busy;
|
||||
}
|
||||
WorkerStatus::Idle => {
|
||||
if *self.stop_signal.borrow() {
|
||||
select! {
|
||||
|
|
|
@ -3,6 +3,8 @@ use std::time::{Duration, Instant};
|
|||
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::background::WorkerStatus;
|
||||
|
||||
/// A tranquilizer is a helper object that is used to make
|
||||
/// background operations not take up too much time.
|
||||
///
|
||||
|
@ -33,7 +35,7 @@ impl Tranquilizer {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn tranquilize(&mut self, tranquility: u32) {
|
||||
fn tranquilize_internal(&mut self, tranquility: u32) -> Option<Duration> {
|
||||
let observation = Instant::now() - self.last_step_begin;
|
||||
|
||||
self.observations.push_back(observation);
|
||||
|
@ -45,10 +47,25 @@ impl Tranquilizer {
|
|||
|
||||
if !self.observations.is_empty() {
|
||||
let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32);
|
||||
sleep(delay).await;
|
||||
Some(delay)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
self.reset();
|
||||
pub async fn tranquilize(&mut self, tranquility: u32) {
|
||||
if let Some(delay) = self.tranquilize_internal(tranquility) {
|
||||
sleep(delay).await;
|
||||
self.reset();
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerStatus {
|
||||
match self.tranquilize_internal(tranquility) {
|
||||
Some(delay) => WorkerStatus::Throttled(delay.as_secs_f32()),
|
||||
None => WorkerStatus::Busy,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
|
|
Loading…
Reference in a new issue