Background task manager #332
1 changed files with 73 additions and 51 deletions
|
@ -9,9 +9,9 @@ use async_trait::async_trait;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use futures::future::*;
|
use futures::future::*;
|
||||||
use futures::select;
|
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use tokio::select;
|
||||||
use tokio::sync::{watch, Mutex, Notify};
|
use tokio::sync::{watch, Mutex, Notify};
|
||||||
|
|
||||||
use opentelemetry::{
|
use opentelemetry::{
|
||||||
|
@ -22,6 +22,7 @@ use opentelemetry::{
|
||||||
use garage_db as db;
|
use garage_db as db;
|
||||||
use garage_db::counted_tree_hack::CountedTree;
|
use garage_db::counted_tree_hack::CountedTree;
|
||||||
|
|
||||||
|
use garage_util::background::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
use garage_util::metrics::RecordDuration;
|
use garage_util::metrics::RecordDuration;
|
||||||
|
@ -110,6 +111,12 @@ pub struct BlockManager {
|
||||||
// it INSIDE a Mutex.
|
// it INSIDE a Mutex.
|
||||||
struct BlockManagerLocked();
|
struct BlockManagerLocked();
|
||||||
|
|
||||||
|
enum BlockIterResult {
|
||||||
|
BusyDidSomething,
|
||||||
|
BusyDidNothing,
|
||||||
|
IdleFor(Duration),
|
||||||
|
}
|
||||||
|
|
||||||
impl BlockManager {
|
impl BlockManager {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
db: &db::Db,
|
db: &db::Db,
|
||||||
|
@ -557,11 +564,14 @@ impl BlockManager {
|
||||||
fn spawn_background_worker(self: Arc<Self>) {
|
fn spawn_background_worker(self: Arc<Self>) {
|
||||||
// Launch a background workers for background resync loop processing
|
// Launch a background workers for background resync loop processing
|
||||||
let background = self.system.background.clone();
|
let background = self.system.background.clone();
|
||||||
|
let worker = BlockResyncWorker {
|
||||||
|
manager: self,
|
||||||
|
tranquilizer: Tranquilizer::new(30),
|
||||||
|
next_delay: Duration::from_secs(10),
|
||||||
|
};
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||||
background.spawn_worker("block resync worker".into(), move |must_exit| {
|
background.spawn_worker(worker);
|
||||||
self.resync_loop(must_exit)
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -579,37 +589,7 @@ impl BlockManager {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
|
async fn resync_iter(&self) -> Result<BlockIterResult, db::Error> {
|
||||||
let mut tranquilizer = Tranquilizer::new(30);
|
|
||||||
|
|
||||||
while !*must_exit.borrow() {
|
|
||||||
match self.resync_iter(&mut must_exit).await {
|
|
||||||
Ok(true) => {
|
|
||||||
tranquilizer.tranquilize(self.background_tranquility).await;
|
|
||||||
}
|
|
||||||
Ok(false) => {
|
|
||||||
tranquilizer.reset();
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
// The errors that we have here are only Sled errors
|
|
||||||
// We don't really know how to handle them so just ¯\_(ツ)_/¯
|
|
||||||
// (there is kind of an assumption that Sled won't error on us,
|
|
||||||
// if it does there is not much we can do -- TODO should we just panic?)
|
|
||||||
error!(
|
|
||||||
"Could not do a resync iteration: {} (this is a very bad error)",
|
|
||||||
e
|
|
||||||
);
|
|
||||||
tranquilizer.reset();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// The result of resync_iter is:
|
|
||||||
// - Ok(true) -> a block was processed (successfully or not)
|
|
||||||
// - Ok(false) -> no block was processed, but we are ready for the next iteration
|
|
||||||
// - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
|
|
||||||
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
|
|
||||||
if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? {
|
if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? {
|
||||||
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
|
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
|
||||||
let now = now_msec();
|
let now = now_msec();
|
||||||
|
@ -629,7 +609,7 @@ impl BlockManager {
|
||||||
// (we want to do the remove after the insert to ensure
|
// (we want to do the remove after the insert to ensure
|
||||||
// that the item is not lost if we crash in-between)
|
// that the item is not lost if we crash in-between)
|
||||||
self.resync_queue.remove(time_bytes)?;
|
self.resync_queue.remove(time_bytes)?;
|
||||||
return Ok(false);
|
return Ok(BlockIterResult::BusyDidNothing);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -676,15 +656,11 @@ impl BlockManager {
|
||||||
self.resync_queue.remove(time_bytes)?;
|
self.resync_queue.remove(time_bytes)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(true)
|
Ok(BlockIterResult::BusyDidSomething)
|
||||||
} else {
|
} else {
|
||||||
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
|
Ok(BlockIterResult::IdleFor(Duration::from_millis(
|
||||||
select! {
|
time_msec - now,
|
||||||
_ = delay.fuse() => {},
|
)))
|
||||||
_ = self.resync_notify.notified().fuse() => {},
|
|
||||||
_ = must_exit.changed().fuse() => {},
|
|
||||||
}
|
|
||||||
Ok(false)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Here we wait either for a notification that an item has been
|
// Here we wait either for a notification that an item has been
|
||||||
|
@ -693,13 +669,7 @@ impl BlockManager {
|
||||||
// between the time we checked the queue and the first poll
|
// between the time we checked the queue and the first poll
|
||||||
// to resync_notify.notified(): if that happens, we'll just loop
|
// to resync_notify.notified(): if that happens, we'll just loop
|
||||||
// back 10 seconds later, which is fine.
|
// back 10 seconds later, which is fine.
|
||||||
let delay = tokio::time::sleep(Duration::from_secs(10));
|
Ok(BlockIterResult::IdleFor(Duration::from_secs(10)))
|
||||||
select! {
|
|
||||||
_ = delay.fuse() => {},
|
|
||||||
_ = self.resync_notify.notified().fuse() => {},
|
|
||||||
_ = must_exit.changed().fuse() => {},
|
|
||||||
}
|
|
||||||
Ok(false)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -898,6 +868,58 @@ impl EndpointHandler<BlockRpc> for BlockManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct BlockResyncWorker {
|
||||||
|
manager: Arc<BlockManager>,
|
||||||
|
tranquilizer: Tranquilizer,
|
||||||
|
next_delay: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Worker for BlockResyncWorker {
|
||||||
|
fn name(&self) -> String {
|
||||||
|
"Block resync worker".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn work(
|
||||||
|
&mut self,
|
||||||
|
_must_exit: &mut watch::Receiver<bool>,
|
||||||
|
) -> Result<WorkerStatus, Error> {
|
||||||
|
self.tranquilizer.reset();
|
||||||
|
match self.manager.resync_iter().await {
|
||||||
|
Ok(BlockIterResult::BusyDidSomething) => {
|
||||||
|
self.tranquilizer
|
||||||
|
.tranquilize(self.manager.background_tranquility)
|
||||||
|
.await;
|
||||||
|
Ok(WorkerStatus::Busy)
|
||||||
|
}
|
||||||
|
Ok(BlockIterResult::BusyDidNothing) => Ok(WorkerStatus::Busy),
|
||||||
|
Ok(BlockIterResult::IdleFor(delay)) => {
|
||||||
|
self.next_delay = delay;
|
||||||
|
Ok(WorkerStatus::Idle)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
// The errors that we have here are only Sled errors
|
||||||
|
// We don't really know how to handle them so just ¯\_(ツ)_/¯
|
||||||
|
// (there is kind of an assumption that Sled won't error on us,
|
||||||
|
// if it does there is not much we can do -- TODO should we just panic?)
|
||||||
|
error!(
|
||||||
|
"Could not do a resync iteration: {} (this is a very bad error)",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
Err(e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
|
||||||
|
select! {
|
||||||
|
_ = tokio::time::sleep(self.next_delay) => (),
|
||||||
|
_ = self.manager.resync_notify.notified() => (),
|
||||||
|
};
|
||||||
|
WorkerStatus::Busy
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct BlockStatus {
|
struct BlockStatus {
|
||||||
exists: bool,
|
exists: bool,
|
||||||
needed: RcEntry,
|
needed: RcEntry,
|
||||||
|
|
Loading…
Reference in a new issue