Background task manager #332

Merged
lx merged 35 commits from background-task-manager into main 2022-07-08 11:30:32 +00:00
9 changed files with 151 additions and 6 deletions
Showing only changes of commit 10c886111e - Show all commits

View file

@ -5,6 +5,7 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_util::background::*;
use garage_util::crdt::*; use garage_util::crdt::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
@ -36,6 +37,7 @@ pub enum AdminRpc {
LaunchRepair(RepairOpt), LaunchRepair(RepairOpt),
Migrate(MigrateOpt), Migrate(MigrateOpt),
Stats(StatsOpt), Stats(StatsOpt),
Worker(WorkerOpt),
// Replies // Replies
Ok(String), Ok(String),
@ -47,6 +49,7 @@ pub enum AdminRpc {
}, },
KeyList(Vec<(String, String)>), KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>), KeyInfo(Key, HashMap<Uuid, Bucket>),
WorkerList(HashMap<usize, garage_util::background::WorkerInfo>),
} }
impl Rpc for AdminRpc { impl Rpc for AdminRpc {
@ -822,6 +825,25 @@ impl AdminRpcHandler {
Ok(()) Ok(())
} }
// ----
async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> {
match opt.cmd {
WorkerCmd::List { busy } => {
let workers = self.garage.background.get_worker_info();
let workers = if busy {
workers
.into_iter()
.filter(|(_, w)| w.status == WorkerStatus::Busy)
.collect()
} else {
workers
};
Ok(AdminRpc::WorkerList(workers))
}
}
}
} }
#[async_trait] #[async_trait]
@ -837,6 +859,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await, AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await,
m => Err(GarageError::unexpected_rpc_message(m).into()), m => Err(GarageError::unexpected_rpc_message(m).into()),
} }
} }

View file

@ -39,6 +39,7 @@ pub async fn cli_command_dispatch(
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
} }
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
_ => unreachable!(), _ => unreachable!(),
} }
} }
@ -182,6 +183,9 @@ pub async fn cmd_admin(
AdminRpc::KeyInfo(key, rb) => { AdminRpc::KeyInfo(key, rb) => {
print_key_info(&key, &rb); print_key_info(&key, &rb);
} }
AdminRpc::WorkerList(wi) => {
print_worker_info(wi);
}
r => { r => {
error!("Unexpected response: {:?}", r); error!("Unexpected response: {:?}", r);
} }

View file

@ -45,6 +45,10 @@ pub enum Command {
/// Gather node statistics /// Gather node statistics
#[structopt(name = "stats")] #[structopt(name = "stats")]
Stats(StatsOpt), Stats(StatsOpt),
/// Manage background workers
#[structopt(name = "worker")]
Worker(WorkerOpt),
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
@ -460,3 +464,20 @@ pub struct StatsOpt {
#[structopt(short = "d", long = "detailed")] #[structopt(short = "d", long = "detailed")]
pub detailed: bool, pub detailed: bool,
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct WorkerOpt {
#[structopt(subcommand)]
pub cmd: WorkerCmd,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerCmd {
/// List all workers on Garage node
#[structopt(name = "list")]
List {
/// Show only busy workers
#[structopt(short = "b", long = "busy")]
busy: bool,
},
}

View file

@ -1,5 +1,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use garage_util::background::*;
use garage_util::crdt::*; use garage_util::crdt::*;
use garage_util::data::Uuid; use garage_util::data::Uuid;
use garage_util::error::*; use garage_util::error::*;
@ -235,3 +236,26 @@ pub fn find_matching_node(
Ok(candidates[0]) Ok(candidates[0])
} }
} }
pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>) {
let mut wi = wi.into_iter().collect::<Vec<_>>();
wi.sort_by_key(|(tid, info)| {
(
match info.status {
WorkerStatus::Busy => 0,
WorkerStatus::Idle => 1,
WorkerStatus::Done => 2,
},
*tid,
)
});
let mut table = vec![];
for (tid, info) in wi.iter() {
table.push(format!("{}\t{:?}\t{}", tid, info.status, info.name));
if let Some(i) = &info.info {
table.push(format!("\t\t{}", i));
}
}
format_table(table);
}

View file

@ -332,7 +332,16 @@ where
R: TableReplication + 'static, R: TableReplication + 'static,
{ {
fn name(&self) -> String { fn name(&self) -> String {
format!("Table GC: {}", F::TABLE_NAME) format!("{} GC", F::TABLE_NAME)
}
fn info(&self) -> Option<String> {
let l = self.gc.data.gc_todo_len().unwrap_or(0);
if l > 0 {
Some(format!("{} items in queue", l))
} else {
None
}
} }
async fn work( async fn work(

View file

@ -310,7 +310,16 @@ where
R: TableReplication + 'static, R: TableReplication + 'static,
{ {
fn name(&self) -> String { fn name(&self) -> String {
format!("Merkle tree updater: {}", F::TABLE_NAME) format!("{} Merkle tree updater", F::TABLE_NAME)
}
fn info(&self) -> Option<String> {
let l = self.0.todo_len().unwrap_or(0);
if l > 0 {
Some(format!("{} items in queue", l))
} else {
None
}
} }
async fn work( async fn work(

View file

@ -574,7 +574,16 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
#[async_trait] #[async_trait]
impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> { impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
fn name(&self) -> String { fn name(&self) -> String {
format!("Table sync worker for {}", F::TABLE_NAME) format!("{} sync", F::TABLE_NAME)
}
fn info(&self) -> Option<String> {
let l = self.todo.len();
if l > 0 {
Some(format!("{} partitions remaining", l))
} else {
None
}
} }
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> { async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> {

View file

@ -4,9 +4,12 @@ pub mod job_worker;
pub mod worker; pub mod worker;
use core::future::Future; use core::future::Future;
use std::collections::HashMap;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch, Mutex}; use tokio::sync::{mpsc, watch, Mutex};
use crate::error::Error; use crate::error::Error;
@ -20,6 +23,14 @@ pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
pub struct BackgroundRunner { pub struct BackgroundRunner {
send_job: mpsc::UnboundedSender<(Job, bool)>, send_job: mpsc::UnboundedSender<(Job, bool)>,
send_worker: mpsc::UnboundedSender<Box<dyn Worker>>, send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct WorkerInfo {
pub name: String,
pub info: Option<String>,
pub status: WorkerStatus,
} }
impl BackgroundRunner { impl BackgroundRunner {
@ -30,8 +41,13 @@ impl BackgroundRunner {
) -> (Arc<Self>, tokio::task::JoinHandle<()>) { ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>(); let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
let await_all_done = let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new()));
tokio::spawn(async move { WorkerProcessor::new(worker_out, stop_signal).run().await }); let mut worker_processor =
WorkerProcessor::new(worker_out, stop_signal, worker_info.clone());
let await_all_done = tokio::spawn(async move {
worker_processor.run().await;
});
let (send_job, queue_out) = mpsc::unbounded_channel(); let (send_job, queue_out) = mpsc::unbounded_channel();
let queue_out = Arc::new(Mutex::new(queue_out)); let queue_out = Arc::new(Mutex::new(queue_out));
@ -52,10 +68,15 @@ impl BackgroundRunner {
let bgrunner = Arc::new(Self { let bgrunner = Arc::new(Self {
send_job, send_job,
send_worker, send_worker,
worker_info,
}); });
(bgrunner, await_all_done) (bgrunner, await_all_done)
} }
pub fn get_worker_info(&self) -> HashMap<usize, WorkerInfo> {
self.worker_info.lock().unwrap().clone()
}
/// Spawn a task to be run in background /// Spawn a task to be run in background
pub fn spawn<T>(&self, job: T) pub fn spawn<T>(&self, job: T)
where where

View file

@ -1,16 +1,20 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use async_trait::async_trait; use async_trait::async_trait;
use futures::future::*; use futures::future::*;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use tracing::*; use tracing::*;
use crate::background::WorkerInfo;
use crate::error::Error; use crate::error::Error;
#[derive(PartialEq, Copy, Clone, Debug)] #[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)]
pub enum WorkerStatus { pub enum WorkerStatus {
Busy, Busy,
Idle, Idle,
@ -21,6 +25,10 @@ pub enum WorkerStatus {
pub trait Worker: Send { pub trait Worker: Send {
fn name(&self) -> String; fn name(&self) -> String;
fn info(&self) -> Option<String> {
None
}
/// Work: do a basic unit of work, if one is available (otherwise, should return /// Work: do a basic unit of work, if one is available (otherwise, should return
/// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the /// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the
/// middle of processing, it will only be interrupted at the last minute when Garage is trying /// middle of processing, it will only be interrupted at the last minute when Garage is trying
@ -39,16 +47,19 @@ pub trait Worker: Send {
pub(crate) struct WorkerProcessor { pub(crate) struct WorkerProcessor {
stop_signal: watch::Receiver<bool>, stop_signal: watch::Receiver<bool>,
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
} }
impl WorkerProcessor { impl WorkerProcessor {
pub(crate) fn new( pub(crate) fn new(
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
stop_signal: watch::Receiver<bool>, stop_signal: watch::Receiver<bool>,
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
) -> Self { ) -> Self {
Self { Self {
stop_signal, stop_signal,
worker_chan, worker_chan,
worker_info,
} }
} }
@ -87,6 +98,20 @@ impl WorkerProcessor {
worker = await_next_worker => { worker = await_next_worker => {
if let Some(mut worker) = worker { if let Some(mut worker) = worker {
trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status); trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status);
let mut wi = self.worker_info.lock().unwrap();
match wi.get_mut(&worker.task_id) {
Some(i) => {
i.status = worker.status;
i.info = worker.worker.info();
}
None => {
wi.insert(worker.task_id, WorkerInfo {
name: worker.worker.name(),
status: worker.status,
info: worker.worker.info(),
});
}
}
// TODO save new worker status somewhere // TODO save new worker status somewhere
if worker.status == WorkerStatus::Done { if worker.status == WorkerStatus::Done {
info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);