diff --git a/Cargo.lock b/Cargo.lock index ecdf8a57..486ad4a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1065,11 +1065,11 @@ dependencies = [ "err-derive 0.3.1", "heed", "hexdump", - "log", "mktemp", "pretty_env_logger", "rusqlite", "sled", + "tracing", ] [[package]] @@ -1258,6 +1258,7 @@ dependencies = [ name = "garage_util" version = "0.7.0" dependencies = [ + "async-trait", "blake2", "chrono", "err-derive 0.3.1", diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 6d8f64be..f697054b 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -19,7 +19,7 @@ required-features = ["cli"] [dependencies] err-derive = "0.3" hexdump = "0.1" -log = "0.4" +tracing = "0.1.30" heed = "0.11" rusqlite = { version = "0.27", features = ["bundled"] } diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 68d96ca0..97a78b07 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -6,7 +6,7 @@ use std::pin::Pin; use std::ptr::NonNull; use std::sync::{Arc, Mutex, MutexGuard}; -use log::trace; +use tracing::trace; use rusqlite::{params, Connection, Rows, Statement, Transaction}; diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 5d073436..57c70ffb 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -16,6 +16,7 @@ path = "lib.rs" [dependencies] garage_db = { version = "0.8.0", path = "../db" } +async-trait = "0.1" blake2 = "0.9" err-derive = "0.3" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } diff --git a/src/util/background.rs b/src/util/background.rs deleted file mode 100644 index d35425f5..00000000 --- a/src/util/background.rs +++ /dev/null @@ -1,160 +0,0 @@ -//! Job runner for futures and async functions -use core::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Duration; - -use futures::future::*; -use futures::select; -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex}; - -use crate::error::Error; - -type JobOutput = Result<(), Error>; -type Job = Pin + Send>>; - -/// Job runner for futures and async functions -pub struct BackgroundRunner { - stop_signal: watch::Receiver, - queue_in: mpsc::UnboundedSender<(Job, bool)>, - worker_in: mpsc::UnboundedSender>, -} - -impl BackgroundRunner { - /// Create a new BackgroundRunner - pub fn new( - n_runners: usize, - stop_signal: watch::Receiver, - ) -> (Arc, tokio::task::JoinHandle<()>) { - let (worker_in, mut worker_out) = mpsc::unbounded_channel(); - - let stop_signal_2 = stop_signal.clone(); - let await_all_done = tokio::spawn(async move { - let mut workers = FuturesUnordered::new(); - let mut shutdown_timer = 0; - loop { - let closed = match worker_out.try_recv() { - Ok(wkr) => { - workers.push(wkr); - false - } - Err(TryRecvError::Empty) => false, - Err(TryRecvError::Disconnected) => true, - }; - select! { - res = workers.next() => { - if let Some(Err(e)) = res { - error!("Worker exited with error: {}", e); - } - } - _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { - if closed || *stop_signal_2.borrow() { - shutdown_timer += 1; - if shutdown_timer >= 10 { - break; - } - } - } - } - } - }); - - let (queue_in, queue_out) = mpsc::unbounded_channel(); - let queue_out = Arc::new(Mutex::new(queue_out)); - - for i in 0..n_runners { - let queue_out = queue_out.clone(); - let stop_signal = stop_signal.clone(); - - worker_in - .send(tokio::spawn(async move { - loop { - let (job, cancellable) = { - select! { - item = wait_job(&queue_out).fuse() => match item { - // We received a task, process it - Some(x) => x, - // We received a signal that no more tasks will ever be sent - // because the sending side was dropped. Exit now. - None => break, - }, - _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => { - if *stop_signal.borrow() { - // Nothing has been going on for 5 secs, and we are shutting - // down. Exit now. - break; - } else { - // Nothing is going on but we don't want to exit. - continue; - } - } - } - }; - if cancellable && *stop_signal.borrow() { - continue; - } - if let Err(e) = job.await { - error!("Job failed: {}", e) - } - } - info!("Background worker {} exiting", i); - })) - .unwrap(); - } - - let bgrunner = Arc::new(Self { - stop_signal, - queue_in, - worker_in, - }); - (bgrunner, await_all_done) - } - - /// Spawn a task to be run in background - pub fn spawn(&self, job: T) - where - T: Future + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.queue_in - .send((boxed, false)) - .map_err(|_| "could not put job in queue") - .unwrap(); - } - - /// Spawn a task to be run in background. It may get discarded before running if spawned while - /// the runner is stopping - pub fn spawn_cancellable(&self, job: T) - where - T: Future + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.queue_in - .send((boxed, true)) - .map_err(|_| "could not put job in queue") - .unwrap(); - } - - pub fn spawn_worker(&self, name: String, worker: F) - where - F: FnOnce(watch::Receiver) -> T + Send + 'static, - T: Future + Send + 'static, - { - let stop_signal = self.stop_signal.clone(); - let task = tokio::spawn(async move { - info!("Worker started: {}", name); - worker(stop_signal).await; - info!("Worker exited: {}", name); - }); - self.worker_in - .send(task) - .map_err(|_| "could not put job in queue") - .unwrap(); - } -} - -async fn wait_job(q: &Mutex>) -> Option<(Job, bool)> { - q.lock().await.recv().await -} diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs new file mode 100644 index 00000000..8cc660f8 --- /dev/null +++ b/src/util/background/job_worker.rs @@ -0,0 +1,52 @@ +//! Job worker: a generic worker that just processes incoming +//! jobs one by one + +use std::sync::Arc; + +use async_trait::async_trait; +use tokio::sync::{mpsc, Mutex}; + +use crate::background::worker::*; +use crate::background::*; + +pub(crate) struct JobWorker { + pub(crate) index: usize, + pub(crate) job_chan: Arc>>, + pub(crate) next_job: Option, +} + +#[async_trait] +impl Worker for JobWorker { + fn name(&self) -> String { + format!("Job worker #{}", self.index) + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver, + ) -> Result { + match self.next_job.take() { + None => return Ok(WorkerStatus::Idle), + Some(job) => { + job.await?; + Ok(WorkerStatus::Busy) + } + } + } + + async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver) -> WorkerStatus { + loop { + match self.job_chan.lock().await.recv().await { + Some((job, cancellable)) => { + if cancellable && *must_exit.borrow() { + // skip job + continue; + } + self.next_job = Some(job); + return WorkerStatus::Busy + } + None => return WorkerStatus::Done, + } + } + } +} diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs new file mode 100644 index 00000000..97d25784 --- /dev/null +++ b/src/util/background/mod.rs @@ -0,0 +1,91 @@ +//! Job runner for futures and async functions + +pub mod job_worker; +pub mod worker; + +use core::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use tokio::sync::{mpsc, watch, Mutex}; + +use crate::error::Error; +use worker::{Worker, WorkerProcessor}; + +pub(crate) type JobOutput = Result<(), Error>; +pub(crate) type Job = Pin + Send>>; + +/// Job runner for futures and async functions +pub struct BackgroundRunner { + send_job: mpsc::UnboundedSender<(Job, bool)>, + send_worker: mpsc::UnboundedSender>, +} + +impl BackgroundRunner { + /// Create a new BackgroundRunner + pub fn new( + n_runners: usize, + stop_signal: watch::Receiver, + ) -> (Arc, tokio::task::JoinHandle<()>) { + let (send_worker, worker_out) = mpsc::unbounded_channel::>(); + + let await_all_done = + tokio::spawn( + async move { WorkerProcessor::new(worker_out, stop_signal).run().await }, + ); + + let (send_job, queue_out) = mpsc::unbounded_channel(); + let queue_out = Arc::new(Mutex::new(queue_out)); + + for i in 0..n_runners { + let queue_out = queue_out.clone(); + + send_worker.send(Box::new(job_worker::JobWorker { + index: i, + job_chan: queue_out.clone(), + next_job: None, + })).ok().unwrap(); + } + + let bgrunner = Arc::new(Self { + send_job, + send_worker, + }); + (bgrunner, await_all_done) + } + + /// Spawn a task to be run in background + pub fn spawn(&self, job: T) + where + T: Future + Send + 'static, + { + let boxed: Job = Box::pin(job); + self.send_job + .send((boxed, false)) + .ok() + .expect("Could not put job in queue"); + } + + /// Spawn a task to be run in background. It may get discarded before running if spawned while + /// the runner is stopping + pub fn spawn_cancellable(&self, job: T) + where + T: Future + Send + 'static, + { + let boxed: Job = Box::pin(job); + self.send_job + .send((boxed, true)) + .ok() + .expect("Could not put job in queue"); + } + + pub fn spawn_worker(&self, worker: W) + where + W: Worker + 'static, + { + self.send_worker + .send(Box::new(worker)) + .ok() + .expect("Could not put worker in queue"); + } +} diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs new file mode 100644 index 00000000..a173902c --- /dev/null +++ b/src/util/background/worker.rs @@ -0,0 +1,145 @@ +use std::time::{Duration, Instant}; + +use tracing::*; +use async_trait::async_trait; +use futures::future::*; +use tokio::select; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use tokio::sync::{mpsc, watch}; + +use crate::error::Error; + +#[derive(PartialEq, Copy, Clone)] +pub enum WorkerStatus { + Busy, + Idle, + Done, +} + +#[async_trait] +pub trait Worker: Send { + fn name(&self) -> String; + async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result; + async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver) -> WorkerStatus; +} + +pub(crate) struct WorkerProcessor { + stop_signal: watch::Receiver, + worker_chan: mpsc::UnboundedReceiver>, +} + +impl WorkerProcessor { + pub(crate) fn new( + worker_chan: mpsc::UnboundedReceiver>, + stop_signal: watch::Receiver, + ) -> Self { + Self { + stop_signal, + worker_chan, + } + } + + pub(crate) async fn run(&mut self) { + let mut workers = FuturesUnordered::new(); + let mut next_task_id = 1; + + while !*self.stop_signal.borrow() { + let await_next_worker = async { + if workers.is_empty() { + futures::future::pending().await + } else { + workers.next().await + } + }; + select! { + new_worker_opt = self.worker_chan.recv() => { + if let Some(new_worker) = new_worker_opt { + let task_id = next_task_id; + next_task_id += 1; + let stop_signal = self.stop_signal.clone(); + workers.push(async move { + let mut worker = WorkerHandler { + task_id, + stop_signal, + worker: new_worker, + status: WorkerStatus::Busy, + }; + worker.step().await; + worker + }.boxed()); + } + } + worker = await_next_worker => { + if let Some(mut worker) = worker { + // TODO save new worker status somewhere + if worker.status == WorkerStatus::Done { + info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); + } else { + workers.push(async move { + worker.step().await; + worker + }.boxed()); + } + } + } + _ = self.stop_signal.changed() => (), + } + } + + // We are exiting, drain everything + let drain_half_time = Instant::now() + Duration::from_secs(5); + let drain_everything = async move { + while let Some(mut worker) = workers.next().await { + if worker.status == WorkerStatus::Busy + || (worker.status == WorkerStatus::Idle && Instant::now() < drain_half_time) + { + workers.push(async move { + worker.step().await; + worker + }.boxed()); + } else { + info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); + } + } + }; + + select! { + _ = drain_everything => { + info!("All workers exited in time \\o/"); + } + _ = tokio::time::sleep(Duration::from_secs(9)) => { + warn!("Some workers could not exit in time, we are cancelling some things in the middle."); + } + } + } +} + +// TODO add tranquilizer +struct WorkerHandler { + task_id: usize, + stop_signal: watch::Receiver, + worker: Box, + status: WorkerStatus, +} + +impl WorkerHandler { + async fn step(&mut self) { + match self.status { + WorkerStatus::Busy => { + match self.worker.work(&mut self.stop_signal).await { + Ok(s) => { + self.status = s; + } + Err(e) => { + error!("Error in worker {}: {}", self.worker.name(), e); + } + } + } + WorkerStatus::Idle => { + self.status = self.worker.wait_for_work(&mut self.stop_signal).await; + } + WorkerStatus::Done => unreachable!() + } + } +} diff --git a/src/util/lib.rs b/src/util/lib.rs index 8ca6e310..fce151af 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -11,7 +11,6 @@ pub mod error; pub mod formater; pub mod metrics; pub mod persister; -//pub mod sled_counter; pub mod time; pub mod token_bucket; pub mod tranquilizer;