diff --git a/Cargo.lock b/Cargo.lock index 659e2fe7..9ba0d553 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1277,7 +1277,9 @@ version = "1.0.1" dependencies = [ "argon2", "async-trait", + "bytesize", "err-derive", + "format_table", "futures", "garage_api_common", "garage_model", diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml index 94a321a6..7b1ad2f0 100644 --- a/src/api/admin/Cargo.toml +++ b/src/api/admin/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +format_table.workspace = true garage_model.workspace = true garage_table.workspace = true garage_util.workspace = true @@ -22,6 +23,7 @@ garage_api_common.workspace = true argon2.workspace = true async-trait.workspace = true +bytesize.workspace = true err-derive.workspace = true hex.workspace = true paste.workspace = true diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs index 3f041208..4caae02c 100644 --- a/src/api/admin/api.rs +++ b/src/api/admin/api.rs @@ -79,6 +79,8 @@ admin_endpoints![ // Node operations CreateMetadataSnapshot, + GetNodeStatistics, + GetClusterStatistics, // Worker operations ListWorkers, @@ -96,6 +98,7 @@ admin_endpoints![ local_admin_endpoints![ // Node operations CreateMetadataSnapshot, + GetNodeStatistics, // Background workers ListWorkers, GetWorkerInfo, @@ -640,6 +643,26 @@ pub struct LocalCreateMetadataSnapshotRequest; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LocalCreateMetadataSnapshotResponse; +// ---- GetNodeStatistics ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalGetNodeStatisticsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetNodeStatisticsResponse { + pub freeform: String, +} + +// ---- GetClusterStatistics ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct GetClusterStatisticsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetClusterStatisticsResponse { + pub freeform: String, +} + // ********************************************** // Worker operations // ********************************************** diff --git a/src/api/admin/node.rs b/src/api/admin/node.rs index 8c79acfd..870db9fb 100644 --- a/src/api/admin/node.rs +++ b/src/api/admin/node.rs @@ -1,7 +1,19 @@ +use std::collections::HashMap; +use std::fmt::Write; use std::sync::Arc; use async_trait::async_trait; +use format_table::format_table_to_string; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; + +use garage_table::replication::*; +use garage_table::*; + +use garage_rpc::layout::PARTITION_BITS; + use garage_model::garage::Garage; use crate::api::*; @@ -21,3 +33,189 @@ impl RequestHandler for LocalCreateMetadataSnapshotRequest { Ok(LocalCreateMetadataSnapshotResponse) } } + +#[async_trait] +impl RequestHandler for LocalGetNodeStatisticsRequest { + type Response = LocalGetNodeStatisticsResponse; + + // FIXME: return this as a JSON struct instead of text + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let mut ret = String::new(); + writeln!( + &mut ret, + "Garage version: {} [features: {}]\nRust compiler version: {}", + garage_util::version::garage_version(), + garage_util::version::garage_features() + .map(|list| list.join(", ")) + .unwrap_or_else(|| "(unknown)".into()), + garage_util::version::rust_version(), + ) + .unwrap(); + + writeln!(&mut ret, "\nDatabase engine: {}", garage.db.engine()).unwrap(); + + // Gather table statistics + let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; + table.push(gather_table_stats(&garage.bucket_table)?); + table.push(gather_table_stats(&garage.key_table)?); + table.push(gather_table_stats(&garage.object_table)?); + table.push(gather_table_stats(&garage.version_table)?); + table.push(gather_table_stats(&garage.block_ref_table)?); + write!( + &mut ret, + "\nTable stats:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + // Gather block manager statistics + writeln!(&mut ret, "\nBlock manager stats:").unwrap(); + let rc_len = garage.block_manager.rc_len()?.to_string(); + + writeln!( + &mut ret, + " number of RC entries (~= number of blocks): {}", + rc_len + ) + .unwrap(); + writeln!( + &mut ret, + " resync queue length: {}", + garage.block_manager.resync.queue_len()? + ) + .unwrap(); + writeln!( + &mut ret, + " blocks with resync errors: {}", + garage.block_manager.resync.errors_len()? + ) + .unwrap(); + + Ok(LocalGetNodeStatisticsResponse { freeform: ret }) + } +} + +#[async_trait] +impl RequestHandler for GetClusterStatisticsRequest { + type Response = GetClusterStatisticsResponse; + + // FIXME: return this as a JSON struct instead of text + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let mut ret = String::new(); + + // Gather storage node and free space statistics for current nodes + let layout = &garage.system.cluster_layout(); + let mut node_partition_count = HashMap::::new(); + for short_id in layout.current().ring_assignment_data.iter() { + let id = layout.current().node_id_vec[*short_id as usize]; + *node_partition_count.entry(id).or_default() += 1; + } + let node_info = garage + .system + .get_known_nodes() + .into_iter() + .map(|n| (n.id, n)) + .collect::>(); + + let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; + for (id, parts) in node_partition_count.iter() { + let info = node_info.get(id); + let status = info.map(|x| &x.status); + let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref()); + let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?"); + let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); + let capacity = role + .map(|x| x.capacity_string()) + .unwrap_or_else(|| "?".into()); + let avail_str = |x| match x { + Some((avail, total)) => { + let pct = (avail as f64) / (total as f64) * 100.; + let avail = bytesize::ByteSize::b(avail); + let total = bytesize::ByteSize::b(total); + format!("{}/{} ({:.1}%)", avail, total, pct) + } + None => "?".into(), + }; + let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); + let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); + table.push(format!( + " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", + id, hostname, zone, capacity, parts, data_avail, meta_avail + )); + } + write!( + &mut ret, + "Storage nodes:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + let meta_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.meta_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::>(); + let data_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.data_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::>(); + if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { + let meta_avail = + bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + let data_avail = + bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + writeln!( + &mut ret, + "\nEstimated available storage space cluster-wide (might be lower in practice):" + ) + .unwrap(); + if meta_part_avail.len() < node_partition_count.len() + || data_part_avail.len() < node_partition_count.len() + { + writeln!(&mut ret, " data: < {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); + writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); + } else { + writeln!(&mut ret, " data: {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); + } + } + + Ok(GetClusterStatisticsResponse { freeform: ret }) + } +} + +fn gather_table_stats(t: &Arc>) -> Result +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + let data_len = t.data.store.len().map_err(GarageError::from)?.to_string(); + let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string(); + + Ok(format!( + " {}\t{}\t{}\t{}\t{}", + F::TABLE_NAME, + data_len, + mkl_len, + t.merkle_updater.todo_len()?, + t.data.gc_todo_len()? + )) +} diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs index dac6c5f9..a0f415c2 100644 --- a/src/api/admin/router_v2.rs +++ b/src/api/admin/router_v2.rs @@ -61,6 +61,8 @@ impl AdminApiRequest { POST RemoveBucketAlias (body), // Node APIs POST CreateMetadataSnapshot (default::body, query::node), + GET GetNodeStatistics (default::body, query::node), + GET GetClusterStatistics (), // Worker APIs POST ListWorkers (body_field, query::node), POST GetWorkerInfo (body_field, query::node), diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 87724559..c4ab2810 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -1,20 +1,11 @@ -use std::collections::HashMap; -use std::fmt::Write; use std::sync::Arc; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use format_table::format_table_to_string; - use garage_util::background::BackgroundRunner; -use garage_util::data::*; use garage_util::error::Error as GarageError; -use garage_table::replication::*; -use garage_table::*; - -use garage_rpc::layout::PARTITION_BITS; use garage_rpc::*; use garage_model::garage::Garage; @@ -29,7 +20,6 @@ pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; #[allow(clippy::large_enum_variant)] pub enum AdminRpc { LaunchRepair(RepairOpt), - Stats(StatsOpt), // Replies Ok(String), @@ -101,219 +91,6 @@ impl AdminRpcHandler { ))) } } - - // ================ STATS COMMANDS ==================== - - async fn handle_stats(&self, opt: StatsOpt) -> Result { - if opt.all_nodes { - let mut ret = String::new(); - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - - for node in all_nodes.iter() { - let mut opt = opt.clone(); - opt.all_nodes = false; - opt.skip_global = true; - - writeln!(&mut ret, "\n======================").unwrap(); - writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); - - let node_id = (*node).into(); - match self - .endpoint - .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL) - .await - { - Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(), - Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), - Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(), - Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(), - } - } - - writeln!(&mut ret, "\n======================").unwrap(); - write!( - &mut ret, - "Cluster statistics:\n\n{}", - self.gather_cluster_stats() - ) - .unwrap(); - - Ok(AdminRpc::Ok(ret)) - } else { - Ok(AdminRpc::Ok(self.gather_stats_local(opt)?)) - } - } - - fn gather_stats_local(&self, opt: StatsOpt) -> Result { - let mut ret = String::new(); - writeln!( - &mut ret, - "\nGarage version: {} [features: {}]\nRust compiler version: {}", - garage_util::version::garage_version(), - garage_util::version::garage_features() - .map(|list| list.join(", ")) - .unwrap_or_else(|| "(unknown)".into()), - garage_util::version::rust_version(), - ) - .unwrap(); - - writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); - - // Gather table statistics - let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; - table.push(self.gather_table_stats(&self.garage.bucket_table)?); - table.push(self.gather_table_stats(&self.garage.key_table)?); - table.push(self.gather_table_stats(&self.garage.object_table)?); - table.push(self.gather_table_stats(&self.garage.version_table)?); - table.push(self.gather_table_stats(&self.garage.block_ref_table)?); - write!( - &mut ret, - "\nTable stats:\n{}", - format_table_to_string(table) - ) - .unwrap(); - - // Gather block manager statistics - writeln!(&mut ret, "\nBlock manager stats:").unwrap(); - let rc_len = self.garage.block_manager.rc_len()?.to_string(); - - writeln!( - &mut ret, - " number of RC entries (~= number of blocks): {}", - rc_len - ) - .unwrap(); - writeln!( - &mut ret, - " resync queue length: {}", - self.garage.block_manager.resync.queue_len()? - ) - .unwrap(); - writeln!( - &mut ret, - " blocks with resync errors: {}", - self.garage.block_manager.resync.errors_len()? - ) - .unwrap(); - - if !opt.skip_global { - write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap(); - } - - Ok(ret) - } - - fn gather_cluster_stats(&self) -> String { - let mut ret = String::new(); - - // Gather storage node and free space statistics for current nodes - let layout = &self.garage.system.cluster_layout(); - let mut node_partition_count = HashMap::::new(); - for short_id in layout.current().ring_assignment_data.iter() { - let id = layout.current().node_id_vec[*short_id as usize]; - *node_partition_count.entry(id).or_default() += 1; - } - let node_info = self - .garage - .system - .get_known_nodes() - .into_iter() - .map(|n| (n.id, n)) - .collect::>(); - - let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; - for (id, parts) in node_partition_count.iter() { - let info = node_info.get(id); - let status = info.map(|x| &x.status); - let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref()); - let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?"); - let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); - let capacity = role - .map(|x| x.capacity_string()) - .unwrap_or_else(|| "?".into()); - let avail_str = |x| match x { - Some((avail, total)) => { - let pct = (avail as f64) / (total as f64) * 100.; - let avail = bytesize::ByteSize::b(avail); - let total = bytesize::ByteSize::b(total); - format!("{}/{} ({:.1}%)", avail, total, pct) - } - None => "?".into(), - }; - let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); - let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); - table.push(format!( - " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", - id, hostname, zone, capacity, parts, data_avail, meta_avail - )); - } - write!( - &mut ret, - "Storage nodes:\n{}", - format_table_to_string(table) - ) - .unwrap(); - - let meta_part_avail = node_partition_count - .iter() - .filter_map(|(id, parts)| { - node_info - .get(id) - .and_then(|x| x.status.meta_disk_avail) - .map(|c| c.0 / *parts) - }) - .collect::>(); - let data_part_avail = node_partition_count - .iter() - .filter_map(|(id, parts)| { - node_info - .get(id) - .and_then(|x| x.status.data_disk_avail) - .map(|c| c.0 / *parts) - }) - .collect::>(); - if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { - let meta_avail = - bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); - let data_avail = - bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); - writeln!( - &mut ret, - "\nEstimated available storage space cluster-wide (might be lower in practice):" - ) - .unwrap(); - if meta_part_avail.len() < node_partition_count.len() - || data_part_avail.len() < node_partition_count.len() - { - writeln!(&mut ret, " data: < {}", data_avail).unwrap(); - writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); - writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); - } else { - writeln!(&mut ret, " data: {}", data_avail).unwrap(); - writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); - } - } - - ret - } - - fn gather_table_stats(&self, t: &Arc>) -> Result - where - F: TableSchema + 'static, - R: TableReplication + 'static, - { - let data_len = t.data.store.len().map_err(GarageError::from)?.to_string(); - let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string(); - - Ok(format!( - " {}\t{}\t{}\t{}\t{}", - F::TABLE_NAME, - data_len, - mkl_len, - t.merkle_updater.todo_len()?, - t.data.gc_todo_len()? - )) - } } #[async_trait] @@ -325,7 +102,6 @@ impl EndpointHandler for AdminRpcHandler { ) -> Result { match message { AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, - AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, m => Err(GarageError::unexpected_rpc_message(m).into()), } } diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs index 0de4ead8..dccdc295 100644 --- a/src/garage/cli_v2/mod.rs +++ b/src/garage/cli_v2/mod.rs @@ -45,6 +45,7 @@ impl Cli { Command::Worker(wo) => self.cmd_worker(wo).await, Command::Block(bo) => self.cmd_block(bo).await, Command::Meta(mo) => self.cmd_meta(mo).await, + Command::Stats(so) => self.cmd_stats(so).await, // TODO Command::Repair(ro) => cli_v1::cmd_admin( @@ -54,11 +55,6 @@ impl Cli { ) .await .ok_or_message("cli_v1"), - Command::Stats(so) => { - cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so)) - .await - .ok_or_message("cli_v1") - } _ => unreachable!(), } diff --git a/src/garage/cli_v2/node.rs b/src/garage/cli_v2/node.rs index c5f28300..b1915dc4 100644 --- a/src/garage/cli_v2/node.rs +++ b/src/garage/cli_v2/node.rs @@ -33,4 +33,35 @@ impl Cli { Ok(()) } + + pub async fn cmd_stats(&self, cmd: StatsOpt) -> Result<(), Error> { + let res = self + .api_request(GetNodeStatisticsRequest { + node: if cmd.all_nodes { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalGetNodeStatisticsRequest, + }) + .await?; + + for (node, res) in res.success.iter() { + println!("======================"); + println!("Stats for node {:.16}:\n", node); + println!("{}\n", res.freeform); + } + + for (node, err) in res.error.iter() { + println!("======================"); + println!("Node {:.16}: error: {}\n", node, err); + } + + let res = self.api_request(GetClusterStatisticsRequest).await?; + println!("======================"); + println!("Cluster statistics:\n"); + println!("{}\n", res.freeform); + + Ok(()) + } }