diff --git a/src/block/repair.rs b/src/block/repair.rs index e2884b69..eed40599 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -53,7 +53,7 @@ impl Worker for RepairWorker { "Block repair worker".into() } - fn info(&self) -> Option { + fn status(&self) -> WorkerStatus { match self.block_iter.as_ref() { None => { let idx_bytes = self @@ -66,9 +66,17 @@ impl Worker for RepairWorker { } else { idx_bytes }; - Some(format!("Phase 1: {}", hex::encode(idx_bytes))) + WorkerStatus { + progress: Some("Phase 1".into()), + freeform: vec![format!("Now at: {}", hex::encode(idx_bytes))], + ..Default::default() + } } - Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)), + Some(bi) => WorkerStatus { + progress: Some(format!("{:.2}%", bi.progress() * 100.)), + freeform: vec!["Phase 2".into()], + ..Default::default() + }, } } @@ -271,29 +279,28 @@ impl Worker for ScrubWorker { "Block scrub worker".into() } - fn info(&self) -> Option { - let s = match &self.work { - ScrubWorkerState::Running(bsi) => format!( - "{:.2}% done (tranquility = {})", - bsi.progress() * 100., - self.persisted.tranquility - ), - ScrubWorkerState::Paused(bsi, rt) => { - format!( - "Paused, {:.2}% done, resumes at {}", - bsi.progress() * 100., - msec_to_rfc3339(*rt) - ) - } - ScrubWorkerState::Finished => format!( - "Last completed scrub: {}", - msec_to_rfc3339(self.persisted.time_last_complete_scrub) - ), + fn status(&self) -> WorkerStatus { + let mut s = WorkerStatus { + persistent_errors: Some(self.persisted.corruptions_detected), + tranquility: Some(self.persisted.tranquility), + ..Default::default() }; - Some(format!( - "{} ; corruptions detected: {}", - s, self.persisted.corruptions_detected - )) + match &self.work { + ScrubWorkerState::Running(bsi) => { + s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); + } + ScrubWorkerState::Paused(bsi, rt) => { + s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); + s.freeform = vec![format!("Paused, resumes at {}", msec_to_rfc3339(*rt))]; + } + ScrubWorkerState::Finished => { + s.freeform = vec![format!( + "Completed {}", + msec_to_rfc3339(self.persisted.time_last_complete_scrub) + )]; + } + } + s } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { diff --git a/src/block/resync.rs b/src/block/resync.rs index ada3ac54..875ead9b 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -477,27 +477,22 @@ impl Worker for ResyncWorker { format!("Block resync worker #{}", self.index + 1) } - fn info(&self) -> Option { + fn status(&self) -> WorkerStatus { let persisted = self.manager.resync.persisted.load(); if self.index >= persisted.n_workers { - return Some("(unused)".into()); + return WorkerStatus { + freeform: vec!["(unused)".into()], + ..Default::default() + }; } - let mut ret = vec![]; - ret.push(format!("tranquility = {}", persisted.tranquility)); - - let qlen = self.manager.resync.queue_len().unwrap_or(0); - if qlen > 0 { - ret.push(format!("{} blocks in queue", qlen)); + WorkerStatus { + queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64), + tranquility: Some(persisted.tranquility), + persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64), + ..Default::default() } - - let elen = self.manager.resync.errors_len().unwrap_or(0); - if elen > 0 { - ret.push(format!("{} blocks in error state", elen)); - } - - Some(ret.join(", ")) } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 396938ae..1f098b47 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -254,7 +254,7 @@ pub fn print_worker_info(wi: HashMap, wlo: WorkerListOpt) { ) }); - let mut table = vec![]; + let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()]; for (tid, info) in wi.iter() { if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) { continue; @@ -263,33 +263,38 @@ pub fn print_worker_info(wi: HashMap, wlo: WorkerListOpt) { continue; } - table.push(format!("{}\t{}\t{}", tid, info.state, info.name)); - if let Some(i) = &info.info { - table.push(format!("\t\t {}", i)); - } let tf = timeago::Formatter::new(); - let (err_ago, err_msg) = info + let err_ago = info .last_error .as_ref() - .map(|(m, t)| { - ( - tf.convert(Duration::from_millis(now_msec() - t)), - m.as_str(), - ) - }) - .unwrap_or(("(?) ago".into(), "(?)")); - if info.consecutive_errors > 0 { - table.push(format!( - "\t\t {} consecutive errors ({} total), last {}", - info.consecutive_errors, info.errors, err_ago, - )); - table.push(format!("\t\t {}", err_msg)); - } else if info.errors > 0 { - table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,)); - if wlo.errors { - table.push(format!("\t\t {}", err_msg)); - } - } + .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t))) + .unwrap_or_default(); + let (total_err, consec_err) = if info.errors > 0 { + (info.errors.to_string(), info.consecutive_errors.to_string()) + } else { + ("-".into(), "-".into()) + }; + + table.push(format!( + "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}", + tid, + info.state, + info.name, + info.status + .tranquility + .as_ref() + .map(ToString::to_string) + .unwrap_or("-".into()), + info.status.progress.as_deref().unwrap_or("-"), + info.status + .queue_length + .as_ref() + .map(ToString::to_string) + .unwrap_or("-".into()), + total_err, + consec_err, + err_ago, + )); } format_table(table); } diff --git a/src/garage/main.rs b/src/garage/main.rs index edda734b..107b1389 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -127,9 +127,16 @@ async fn main() { std::process::abort(); })); + // Parse arguments and dispatch command line + let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches()); + // Initialize logging as well as other libraries used in Garage if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "netapp=info,garage=info") + let default_log = match &opt.cmd { + Command::Server => "netapp=info,garage=info", + _ => "netapp=warn,garage=warn", + }; + std::env::set_var("RUST_LOG", default_log) } tracing_subscriber::fmt() .with_writer(std::io::stderr) @@ -137,9 +144,6 @@ async fn main() { .init(); sodiumoxide::init().expect("Unable to init sodiumoxide"); - // Parse arguments and dispatch command line - let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches()); - let res = match opt.cmd { Command::Server => server::run_server(opt.config_file).await, Command::OfflineRepair(repair_opt) => { @@ -182,9 +186,9 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk); // Find and parse the address of the target host - let (id, addr) = if let Some(h) = opt.rpc_host { + let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host { let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is @:.", h))?; - (id, addrs[0]) + (id, addrs[0], false) } else { let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) .err_context(READ_KEY_ERROR)?; @@ -195,24 +199,26 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { .ok_or_message("unable to resolve rpc_public_addr specified in config file")? .next() .ok_or_message("unable to resolve rpc_public_addr specified in config file")?; - (node_id, a) + (node_id, a, false) } else { let default_addr = SocketAddr::new( "127.0.0.1".parse().unwrap(), config.as_ref().unwrap().rpc_bind_addr.port(), ); - warn!( - "Trying to contact Garage node at default address {}", - default_addr - ); - warn!("If this doesn't work, consider adding rpc_public_addr in your config file or specifying the -h command line parameter."); - (node_id, default_addr) + (node_id, default_addr, true) } }; // Connect to target host - netapp.clone().try_connect(addr, id).await - .err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?; + if let Err(e) = netapp.clone().try_connect(addr, id).await { + if is_default_addr { + warn!( + "Tried to contact Garage node at default address {}, which didn't work. If that address is wrong, consider setting rpc_public_addr in your config file.", + addr + ); + } + Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?; + } let system_rpc_endpoint = netapp.endpoint::(SYSTEM_RPC_PATH.into()); let admin_rpc_endpoint = netapp.endpoint::(ADMIN_RPC_PATH.into()); diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index e33cf097..42221c2a 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -85,8 +85,11 @@ impl Worker for RepairVersionsWorker { "Version repair worker".into() } - fn info(&self) -> Option { - Some(format!("{} items done", self.counter)) + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(self.counter.to_string()), + ..Default::default() + } } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { @@ -163,8 +166,11 @@ impl Worker for RepairBlockrefsWorker { "Block refs repair worker".into() } - fn info(&self) -> Option { - Some(format!("{} items done", self.counter)) + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(self.counter.to_string()), + ..Default::default() + } } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index e6394f0c..b9594406 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -404,14 +404,13 @@ impl IndexPropagatorWorker { #[async_trait] impl Worker for IndexPropagatorWorker { fn name(&self) -> String { - format!("{} index counter propagator", T::COUNTER_TABLE_NAME) + format!("{} counter", T::COUNTER_TABLE_NAME) } - fn info(&self) -> Option { - if !self.buf.is_empty() { - Some(format!("{} items in queue", self.buf.len())) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.buf.len() as u64), + ..Default::default() } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 83e7eeff..cfdc9d2d 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -330,12 +330,10 @@ where format!("{} GC", F::TABLE_NAME) } - fn info(&self) -> Option { - let l = self.gc.data.gc_todo_len().unwrap_or(0); - if l > 0 { - Some(format!("{} items in queue", l)) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64), + ..Default::default() } } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index a5c29723..6f8a19b6 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -310,15 +310,13 @@ where R: TableReplication + 'static, { fn name(&self) -> String { - format!("{} Merkle tree updater", F::TABLE_NAME) + format!("{} Merkle", F::TABLE_NAME) } - fn info(&self) -> Option { - let l = self.0.todo_len().unwrap_or(0); - if l > 0 { - Some(format!("{} items in queue", l)) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.0.todo_len().unwrap_or(0) as u64), + ..Default::default() } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 9d79d856..af7aa640 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -570,12 +570,10 @@ impl Worker for SyncWor format!("{} sync", F::TABLE_NAME) } - fn info(&self) -> Option { - let l = self.todo.len(); - if l > 0 { - Some(format!("{} partitions remaining", l)) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.todo.len() as u64), + ..Default::default() } } diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 619f5068..fd9258b8 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -29,13 +29,24 @@ pub struct BackgroundRunner { #[derive(Clone, Serialize, Deserialize, Debug)] pub struct WorkerInfo { pub name: String, - pub info: Option, + pub status: WorkerStatus, pub state: WorkerState, pub errors: usize, pub consecutive_errors: usize, pub last_error: Option<(String, u64)>, } +/// WorkerStatus is a struct returned by the worker with a bunch of canonical +/// fields to indicate their status to CLI users. All fields are optional. +#[derive(Clone, Serialize, Deserialize, Debug, Default)] +pub struct WorkerStatus { + pub tranquility: Option, + pub progress: Option, + pub queue_length: Option, + pub persistent_errors: Option, + pub freeform: Vec, +} + impl BackgroundRunner { /// Create a new BackgroundRunner pub fn new( diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index f5e3addb..7e9da7f8 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use tokio::select; use tokio::sync::{mpsc, watch}; -use crate::background::WorkerInfo; +use crate::background::{WorkerInfo, WorkerStatus}; use crate::error::Error; use crate::time::now_msec; @@ -26,7 +26,7 @@ impl std::fmt::Display for WorkerState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { WorkerState::Busy => write!(f, "Busy"), - WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t), + WorkerState::Throttled(_) => write!(f, "Busy*"), WorkerState::Idle => write!(f, "Idle"), WorkerState::Done => write!(f, "Done"), } @@ -37,8 +37,8 @@ impl std::fmt::Display for WorkerState { pub trait Worker: Send { fn name(&self) -> String; - fn info(&self) -> Option { - None + fn status(&self) -> WorkerStatus { + Default::default() } /// Work: do a basic unit of work, if one is available (otherwise, should return @@ -119,7 +119,7 @@ impl WorkerProcessor { match wi.get_mut(&worker.task_id) { Some(i) => { i.state = worker.state; - i.info = worker.worker.info(); + i.status = worker.worker.status(); i.errors = worker.errors; i.consecutive_errors = worker.consecutive_errors; if worker.last_error.is_some() { @@ -130,7 +130,7 @@ impl WorkerProcessor { wi.insert(worker.task_id, WorkerInfo { name: worker.worker.name(), state: worker.state, - info: worker.worker.info(), + status: worker.worker.status(), errors: worker.errors, consecutive_errors: worker.consecutive_errors, last_error: worker.last_error.take(),