First try on background worker manager
continuous-integration/drone/push Build is failing Details

This commit is contained in:
Alex 2022-06-21 12:37:52 +02:00
parent 77e3fd6db2
commit e12bc3b595
Signed by: lx
GPG Key ID: 0E496D15096376BE
9 changed files with 293 additions and 164 deletions

3
Cargo.lock generated
View File

@ -1065,11 +1065,11 @@ dependencies = [
"err-derive 0.3.1",
"heed",
"hexdump",
"log",
"mktemp",
"pretty_env_logger",
"rusqlite",
"sled",
"tracing",
]
[[package]]
@ -1258,6 +1258,7 @@ dependencies = [
name = "garage_util"
version = "0.7.0"
dependencies = [
"async-trait",
"blake2",
"chrono",
"err-derive 0.3.1",

View File

@ -19,7 +19,7 @@ required-features = ["cli"]
[dependencies]
err-derive = "0.3"
hexdump = "0.1"
log = "0.4"
tracing = "0.1.30"
heed = "0.11"
rusqlite = { version = "0.27", features = ["bundled"] }

View File

@ -6,7 +6,7 @@ use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, Mutex, MutexGuard};
use log::trace;
use tracing::trace;
use rusqlite::{params, Connection, Rows, Statement, Transaction};

View File

@ -16,6 +16,7 @@ path = "lib.rs"
[dependencies]
garage_db = { version = "0.8.0", path = "../db" }
async-trait = "0.1"
blake2 = "0.9"
err-derive = "0.3"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }

View File

@ -1,160 +0,0 @@
//! Job runner for futures and async functions
use core::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use futures::future::*;
use futures::select;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
use crate::error::Error;
type JobOutput = Result<(), Error>;
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
/// Job runner for futures and async functions
pub struct BackgroundRunner {
stop_signal: watch::Receiver<bool>,
queue_in: mpsc::UnboundedSender<(Job, bool)>,
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
}
impl BackgroundRunner {
/// Create a new BackgroundRunner
pub fn new(
n_runners: usize,
stop_signal: watch::Receiver<bool>,
) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
let (worker_in, mut worker_out) = mpsc::unbounded_channel();
let stop_signal_2 = stop_signal.clone();
let await_all_done = tokio::spawn(async move {
let mut workers = FuturesUnordered::new();
let mut shutdown_timer = 0;
loop {
let closed = match worker_out.try_recv() {
Ok(wkr) => {
workers.push(wkr);
false
}
Err(TryRecvError::Empty) => false,
Err(TryRecvError::Disconnected) => true,
};
select! {
res = workers.next() => {
if let Some(Err(e)) = res {
error!("Worker exited with error: {}", e);
}
}
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
if closed || *stop_signal_2.borrow() {
shutdown_timer += 1;
if shutdown_timer >= 10 {
break;
}
}
}
}
}
});
let (queue_in, queue_out) = mpsc::unbounded_channel();
let queue_out = Arc::new(Mutex::new(queue_out));
for i in 0..n_runners {
let queue_out = queue_out.clone();
let stop_signal = stop_signal.clone();
worker_in
.send(tokio::spawn(async move {
loop {
let (job, cancellable) = {
select! {
item = wait_job(&queue_out).fuse() => match item {
// We received a task, process it
Some(x) => x,
// We received a signal that no more tasks will ever be sent
// because the sending side was dropped. Exit now.
None => break,
},
_ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
if *stop_signal.borrow() {
// Nothing has been going on for 5 secs, and we are shutting
// down. Exit now.
break;
} else {
// Nothing is going on but we don't want to exit.
continue;
}
}
}
};
if cancellable && *stop_signal.borrow() {
continue;
}
if let Err(e) = job.await {
error!("Job failed: {}", e)
}
}
info!("Background worker {} exiting", i);
}))
.unwrap();
}
let bgrunner = Arc::new(Self {
stop_signal,
queue_in,
worker_in,
});
(bgrunner, await_all_done)
}
/// Spawn a task to be run in background
pub fn spawn<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
let boxed: Job = Box::pin(job);
self.queue_in
.send((boxed, false))
.map_err(|_| "could not put job in queue")
.unwrap();
}
/// Spawn a task to be run in background. It may get discarded before running if spawned while
/// the runner is stopping
pub fn spawn_cancellable<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
let boxed: Job = Box::pin(job);
self.queue_in
.send((boxed, true))
.map_err(|_| "could not put job in queue")
.unwrap();
}
pub fn spawn_worker<F, T>(&self, name: String, worker: F)
where
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
T: Future<Output = ()> + Send + 'static,
{
let stop_signal = self.stop_signal.clone();
let task = tokio::spawn(async move {
info!("Worker started: {}", name);
worker(stop_signal).await;
info!("Worker exited: {}", name);
});
self.worker_in
.send(task)
.map_err(|_| "could not put job in queue")
.unwrap();
}
}
async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> {
q.lock().await.recv().await
}

View File

@ -0,0 +1,52 @@
//! Job worker: a generic worker that just processes incoming
//! jobs one by one
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::{mpsc, Mutex};
use crate::background::worker::*;
use crate::background::*;
pub(crate) struct JobWorker {
pub(crate) index: usize,
pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
pub(crate) next_job: Option<Job>,
}
#[async_trait]
impl Worker for JobWorker {
fn name(&self) -> String {
format!("Job worker #{}", self.index)
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> {
match self.next_job.take() {
None => return Ok(WorkerStatus::Idle),
Some(job) => {
job.await?;
Ok(WorkerStatus::Busy)
}
}
}
async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus {
loop {
match self.job_chan.lock().await.recv().await {
Some((job, cancellable)) => {
if cancellable && *must_exit.borrow() {
// skip job
continue;
}
self.next_job = Some(job);
return WorkerStatus::Busy
}
None => return WorkerStatus::Done,
}
}
}
}

View File

@ -0,0 +1,91 @@
//! Job runner for futures and async functions
pub mod job_worker;
pub mod worker;
use core::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::{mpsc, watch, Mutex};
use crate::error::Error;
use worker::{Worker, WorkerProcessor};
pub(crate) type JobOutput = Result<(), Error>;
pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
/// Job runner for futures and async functions
pub struct BackgroundRunner {
send_job: mpsc::UnboundedSender<(Job, bool)>,
send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
}
impl BackgroundRunner {
/// Create a new BackgroundRunner
pub fn new(
n_runners: usize,
stop_signal: watch::Receiver<bool>,
) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
let await_all_done =
tokio::spawn(
async move { WorkerProcessor::new(worker_out, stop_signal).run().await },
);
let (send_job, queue_out) = mpsc::unbounded_channel();
let queue_out = Arc::new(Mutex::new(queue_out));
for i in 0..n_runners {
let queue_out = queue_out.clone();
send_worker.send(Box::new(job_worker::JobWorker {
index: i,
job_chan: queue_out.clone(),
next_job: None,
})).ok().unwrap();
}
let bgrunner = Arc::new(Self {
send_job,
send_worker,
});
(bgrunner, await_all_done)
}
/// Spawn a task to be run in background
pub fn spawn<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
let boxed: Job = Box::pin(job);
self.send_job
.send((boxed, false))
.ok()
.expect("Could not put job in queue");
}
/// Spawn a task to be run in background. It may get discarded before running if spawned while
/// the runner is stopping
pub fn spawn_cancellable<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
let boxed: Job = Box::pin(job);
self.send_job
.send((boxed, true))
.ok()
.expect("Could not put job in queue");
}
pub fn spawn_worker<W>(&self, worker: W)
where
W: Worker + 'static,
{
self.send_worker
.send(Box::new(worker))
.ok()
.expect("Could not put worker in queue");
}
}

View File

@ -0,0 +1,145 @@
use std::time::{Duration, Instant};
use tracing::*;
use async_trait::async_trait;
use futures::future::*;
use tokio::select;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio::sync::{mpsc, watch};
use crate::error::Error;
#[derive(PartialEq, Copy, Clone)]
pub enum WorkerStatus {
Busy,
Idle,
Done,
}
#[async_trait]
pub trait Worker: Send {
fn name(&self) -> String;
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>;
async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus;
}
pub(crate) struct WorkerProcessor {
stop_signal: watch::Receiver<bool>,
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
}
impl WorkerProcessor {
pub(crate) fn new(
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
stop_signal: watch::Receiver<bool>,
) -> Self {
Self {
stop_signal,
worker_chan,
}
}
pub(crate) async fn run(&mut self) {
let mut workers = FuturesUnordered::new();
let mut next_task_id = 1;
while !*self.stop_signal.borrow() {
let await_next_worker = async {
if workers.is_empty() {
futures::future::pending().await
} else {
workers.next().await
}
};
select! {
new_worker_opt = self.worker_chan.recv() => {
if let Some(new_worker) = new_worker_opt {
let task_id = next_task_id;
next_task_id += 1;
let stop_signal = self.stop_signal.clone();
workers.push(async move {
let mut worker = WorkerHandler {
task_id,
stop_signal,
worker: new_worker,
status: WorkerStatus::Busy,
};
worker.step().await;
worker
}.boxed());
}
}
worker = await_next_worker => {
if let Some(mut worker) = worker {
// TODO save new worker status somewhere
if worker.status == WorkerStatus::Done {
info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
} else {
workers.push(async move {
worker.step().await;
worker
}.boxed());
}
}
}
_ = self.stop_signal.changed() => (),
}
}
// We are exiting, drain everything
let drain_half_time = Instant::now() + Duration::from_secs(5);
let drain_everything = async move {
while let Some(mut worker) = workers.next().await {
if worker.status == WorkerStatus::Busy
|| (worker.status == WorkerStatus::Idle && Instant::now() < drain_half_time)
{
workers.push(async move {
worker.step().await;
worker
}.boxed());
} else {
info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
}
}
};
select! {
_ = drain_everything => {
info!("All workers exited in time \\o/");
}
_ = tokio::time::sleep(Duration::from_secs(9)) => {
warn!("Some workers could not exit in time, we are cancelling some things in the middle.");
}
}
}
}
// TODO add tranquilizer
struct WorkerHandler {
task_id: usize,
stop_signal: watch::Receiver<bool>,
worker: Box<dyn Worker>,
status: WorkerStatus,
}
impl WorkerHandler {
async fn step(&mut self) {
match self.status {
WorkerStatus::Busy => {
match self.worker.work(&mut self.stop_signal).await {
Ok(s) => {
self.status = s;
}
Err(e) => {
error!("Error in worker {}: {}", self.worker.name(), e);
}
}
}
WorkerStatus::Idle => {
self.status = self.worker.wait_for_work(&mut self.stop_signal).await;
}
WorkerStatus::Done => unreachable!()
}
}
}

View File

@ -11,7 +11,6 @@ pub mod error;
pub mod formater;
pub mod metrics;
pub mod persister;
//pub mod sled_counter;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;