Background task manager #332

Merged
lx merged 35 commits from background-task-manager into main 2022-07-08 11:30:32 +00:00
2 changed files with 13 additions and 12 deletions
Showing only changes of commit 14337d2a56 - Show all commits

View file

@ -109,7 +109,7 @@ pub struct BlockManager {
// it INSIDE a Mutex. // it INSIDE a Mutex.
struct BlockManagerLocked(); struct BlockManagerLocked();
enum BlockIterResult { enum ResyncIterResult {
BusyDidSomething, BusyDidSomething,
BusyDidNothing, BusyDidNothing,
IdleFor(Duration), IdleFor(Duration),
@ -478,7 +478,7 @@ impl BlockManager {
fn spawn_background_worker(self: Arc<Self>) { fn spawn_background_worker(self: Arc<Self>) {
// Launch a background workers for background resync loop processing // Launch a background workers for background resync loop processing
let background = self.system.background.clone(); let background = self.system.background.clone();
let worker = BlockResyncWorker { let worker = ResyncWorker {
manager: self, manager: self,
tranquilizer: Tranquilizer::new(30), tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10), next_delay: Duration::from_secs(10),
@ -503,7 +503,7 @@ impl BlockManager {
Ok(()) Ok(())
} }
async fn resync_iter(&self) -> Result<BlockIterResult, db::Error> { async fn resync_iter(&self) -> Result<ResyncIterResult, db::Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? { if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec(); let now = now_msec();
@ -523,7 +523,7 @@ impl BlockManager {
// (we want to do the remove after the insert to ensure // (we want to do the remove after the insert to ensure
// that the item is not lost if we crash in-between) // that the item is not lost if we crash in-between)
self.resync_queue.remove(time_bytes)?; self.resync_queue.remove(time_bytes)?;
return Ok(BlockIterResult::BusyDidNothing); return Ok(ResyncIterResult::BusyDidNothing);
} }
} }
@ -570,9 +570,9 @@ impl BlockManager {
self.resync_queue.remove(time_bytes)?; self.resync_queue.remove(time_bytes)?;
} }
Ok(BlockIterResult::BusyDidSomething) Ok(ResyncIterResult::BusyDidSomething)
} else { } else {
Ok(BlockIterResult::IdleFor(Duration::from_millis( Ok(ResyncIterResult::IdleFor(Duration::from_millis(
time_msec - now, time_msec - now,
))) )))
} }
@ -583,7 +583,7 @@ impl BlockManager {
// between the time we checked the queue and the first poll // between the time we checked the queue and the first poll
// to resync_notify.notified(): if that happens, we'll just loop // to resync_notify.notified(): if that happens, we'll just loop
// back 10 seconds later, which is fine. // back 10 seconds later, which is fine.
Ok(BlockIterResult::IdleFor(Duration::from_secs(10))) Ok(ResyncIterResult::IdleFor(Duration::from_secs(10)))
} }
} }
@ -716,14 +716,14 @@ impl EndpointHandler<BlockRpc> for BlockManager {
} }
} }
struct BlockResyncWorker { struct ResyncWorker {
manager: Arc<BlockManager>, manager: Arc<BlockManager>,
tranquilizer: Tranquilizer, tranquilizer: Tranquilizer,
next_delay: Duration, next_delay: Duration,
} }
#[async_trait] #[async_trait]
impl Worker for BlockResyncWorker { impl Worker for ResyncWorker {
fn name(&self) -> String { fn name(&self) -> String {
"Block resync worker".into() "Block resync worker".into()
} }
@ -734,14 +734,14 @@ impl Worker for BlockResyncWorker {
) -> Result<WorkerStatus, Error> { ) -> Result<WorkerStatus, Error> {
self.tranquilizer.reset(); self.tranquilizer.reset();
match self.manager.resync_iter().await { match self.manager.resync_iter().await {
Ok(BlockIterResult::BusyDidSomething) => { Ok(ResyncIterResult::BusyDidSomething) => {
self.tranquilizer self.tranquilizer
.tranquilize(self.manager.background_tranquility) .tranquilize(self.manager.background_tranquility)
.await; .await;
Ok(WorkerStatus::Busy) Ok(WorkerStatus::Busy)
} }
Ok(BlockIterResult::BusyDidNothing) => Ok(WorkerStatus::Busy), Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerStatus::Busy),
Ok(BlockIterResult::IdleFor(delay)) => { Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay; self.next_delay = delay;
Ok(WorkerStatus::Idle) Ok(WorkerStatus::Idle)
} }

View file

@ -86,6 +86,7 @@ impl WorkerProcessor {
} }
worker = await_next_worker => { worker = await_next_worker => {
if let Some(mut worker) = worker { if let Some(mut worker) = worker {
trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status);
// TODO save new worker status somewhere // TODO save new worker status somewhere
if worker.status == WorkerStatus::Done { if worker.status == WorkerStatus::Done {
info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);