cli_v2: implement Get{Node,Cluster}Statistics
All checks were successful
ci/woodpecker/pr/debug Pipeline was successful
ci/woodpecker/push/debug Pipeline was successful

This commit is contained in:
Alex 2025-02-05 15:06:10 +01:00
parent 9f468b4439
commit 406b6da163
8 changed files with 259 additions and 229 deletions

2
Cargo.lock generated
View file

@ -1277,7 +1277,9 @@ version = "1.0.1"
dependencies = [ dependencies = [
"argon2", "argon2",
"async-trait", "async-trait",
"bytesize",
"err-derive", "err-derive",
"format_table",
"futures", "futures",
"garage_api_common", "garage_api_common",
"garage_model", "garage_model",

View file

@ -14,6 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
format_table.workspace = true
garage_model.workspace = true garage_model.workspace = true
garage_table.workspace = true garage_table.workspace = true
garage_util.workspace = true garage_util.workspace = true
@ -22,6 +23,7 @@ garage_api_common.workspace = true
argon2.workspace = true argon2.workspace = true
async-trait.workspace = true async-trait.workspace = true
bytesize.workspace = true
err-derive.workspace = true err-derive.workspace = true
hex.workspace = true hex.workspace = true
paste.workspace = true paste.workspace = true

View file

@ -79,6 +79,8 @@ admin_endpoints![
// Node operations // Node operations
CreateMetadataSnapshot, CreateMetadataSnapshot,
GetNodeStatistics,
GetClusterStatistics,
// Worker operations // Worker operations
ListWorkers, ListWorkers,
@ -96,6 +98,7 @@ admin_endpoints![
local_admin_endpoints![ local_admin_endpoints![
// Node operations // Node operations
CreateMetadataSnapshot, CreateMetadataSnapshot,
GetNodeStatistics,
// Background workers // Background workers
ListWorkers, ListWorkers,
GetWorkerInfo, GetWorkerInfo,
@ -640,6 +643,26 @@ pub struct LocalCreateMetadataSnapshotRequest;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalCreateMetadataSnapshotResponse; pub struct LocalCreateMetadataSnapshotResponse;
// ---- GetNodeStatistics ----
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LocalGetNodeStatisticsRequest;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalGetNodeStatisticsResponse {
pub freeform: String,
}
// ---- GetClusterStatistics ----
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct GetClusterStatisticsRequest;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetClusterStatisticsResponse {
pub freeform: String,
}
// ********************************************** // **********************************************
// Worker operations // Worker operations
// ********************************************** // **********************************************

View file

@ -1,7 +1,19 @@
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use format_table::format_table_to_string;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_table::replication::*;
use garage_table::*;
use garage_rpc::layout::PARTITION_BITS;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use crate::api::*; use crate::api::*;
@ -21,3 +33,189 @@ impl RequestHandler for LocalCreateMetadataSnapshotRequest {
Ok(LocalCreateMetadataSnapshotResponse) Ok(LocalCreateMetadataSnapshotResponse)
} }
} }
#[async_trait]
impl RequestHandler for LocalGetNodeStatisticsRequest {
type Response = LocalGetNodeStatisticsResponse;
// FIXME: return this as a JSON struct instead of text
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalGetNodeStatisticsResponse, Error> {
let mut ret = String::new();
writeln!(
&mut ret,
"Garage version: {} [features: {}]\nRust compiler version: {}",
garage_util::version::garage_version(),
garage_util::version::garage_features()
.map(|list| list.join(", "))
.unwrap_or_else(|| "(unknown)".into()),
garage_util::version::rust_version(),
)
.unwrap();
writeln!(&mut ret, "\nDatabase engine: {}", garage.db.engine()).unwrap();
// Gather table statistics
let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
table.push(gather_table_stats(&garage.bucket_table)?);
table.push(gather_table_stats(&garage.key_table)?);
table.push(gather_table_stats(&garage.object_table)?);
table.push(gather_table_stats(&garage.version_table)?);
table.push(gather_table_stats(&garage.block_ref_table)?);
write!(
&mut ret,
"\nTable stats:\n{}",
format_table_to_string(table)
)
.unwrap();
// Gather block manager statistics
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
let rc_len = garage.block_manager.rc_len()?.to_string();
writeln!(
&mut ret,
" number of RC entries (~= number of blocks): {}",
rc_len
)
.unwrap();
writeln!(
&mut ret,
" resync queue length: {}",
garage.block_manager.resync.queue_len()?
)
.unwrap();
writeln!(
&mut ret,
" blocks with resync errors: {}",
garage.block_manager.resync.errors_len()?
)
.unwrap();
Ok(LocalGetNodeStatisticsResponse { freeform: ret })
}
}
#[async_trait]
impl RequestHandler for GetClusterStatisticsRequest {
type Response = GetClusterStatisticsResponse;
// FIXME: return this as a JSON struct instead of text
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetClusterStatisticsResponse, Error> {
let mut ret = String::new();
// Gather storage node and free space statistics for current nodes
let layout = &garage.system.cluster_layout();
let mut node_partition_count = HashMap::<Uuid, u64>::new();
for short_id in layout.current().ring_assignment_data.iter() {
let id = layout.current().node_id_vec[*short_id as usize];
*node_partition_count.entry(id).or_default() += 1;
}
let node_info = garage
.system
.get_known_nodes()
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<_, _>>();
let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
for (id, parts) in node_partition_count.iter() {
let info = node_info.get(id);
let status = info.map(|x| &x.status);
let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
let capacity = role
.map(|x| x.capacity_string())
.unwrap_or_else(|| "?".into());
let avail_str = |x| match x {
Some((avail, total)) => {
let pct = (avail as f64) / (total as f64) * 100.;
let avail = bytesize::ByteSize::b(avail);
let total = bytesize::ByteSize::b(total);
format!("{}/{} ({:.1}%)", avail, total, pct)
}
None => "?".into(),
};
let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
table.push(format!(
" {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
id, hostname, zone, capacity, parts, data_avail, meta_avail
));
}
write!(
&mut ret,
"Storage nodes:\n{}",
format_table_to_string(table)
)
.unwrap();
let meta_part_avail = node_partition_count
.iter()
.filter_map(|(id, parts)| {
node_info
.get(id)
.and_then(|x| x.status.meta_disk_avail)
.map(|c| c.0 / *parts)
})
.collect::<Vec<_>>();
let data_part_avail = node_partition_count
.iter()
.filter_map(|(id, parts)| {
node_info
.get(id)
.and_then(|x| x.status.data_disk_avail)
.map(|c| c.0 / *parts)
})
.collect::<Vec<_>>();
if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
let meta_avail =
bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
let data_avail =
bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
writeln!(
&mut ret,
"\nEstimated available storage space cluster-wide (might be lower in practice):"
)
.unwrap();
if meta_part_avail.len() < node_partition_count.len()
|| data_part_avail.len() < node_partition_count.len()
{
writeln!(&mut ret, " data: < {}", data_avail).unwrap();
writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
} else {
writeln!(&mut ret, " data: {}", data_avail).unwrap();
writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
}
}
Ok(GetClusterStatisticsResponse { freeform: ret })
}
}
fn gather_table_stats<F, R>(t: &Arc<Table<F, R>>) -> Result<String, Error>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
let data_len = t.data.store.len().map_err(GarageError::from)?.to_string();
let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
Ok(format!(
" {}\t{}\t{}\t{}\t{}",
F::TABLE_NAME,
data_len,
mkl_len,
t.merkle_updater.todo_len()?,
t.data.gc_todo_len()?
))
}

View file

@ -61,6 +61,8 @@ impl AdminApiRequest {
POST RemoveBucketAlias (body), POST RemoveBucketAlias (body),
// Node APIs // Node APIs
POST CreateMetadataSnapshot (default::body, query::node), POST CreateMetadataSnapshot (default::body, query::node),
GET GetNodeStatistics (default::body, query::node),
GET GetClusterStatistics (),
// Worker APIs // Worker APIs
POST ListWorkers (body_field, query::node), POST ListWorkers (body_field, query::node),
POST GetWorkerInfo (body_field, query::node), POST GetWorkerInfo (body_field, query::node),

View file

@ -1,20 +1,11 @@
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use format_table::format_table_to_string;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_table::replication::*;
use garage_table::*;
use garage_rpc::layout::PARTITION_BITS;
use garage_rpc::*; use garage_rpc::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
@ -29,7 +20,6 @@ pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub enum AdminRpc { pub enum AdminRpc {
LaunchRepair(RepairOpt), LaunchRepair(RepairOpt),
Stats(StatsOpt),
// Replies // Replies
Ok(String), Ok(String),
@ -101,219 +91,6 @@ impl AdminRpcHandler {
))) )))
} }
} }
// ================ STATS COMMANDS ====================
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in all_nodes.iter() {
let mut opt = opt.clone();
opt.all_nodes = false;
opt.skip_global = true;
writeln!(&mut ret, "\n======================").unwrap();
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
let node_id = (*node).into();
match self
.endpoint
.call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
.await
{
Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
}
}
writeln!(&mut ret, "\n======================").unwrap();
write!(
&mut ret,
"Cluster statistics:\n\n{}",
self.gather_cluster_stats()
)
.unwrap();
Ok(AdminRpc::Ok(ret))
} else {
Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
}
}
fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
let mut ret = String::new();
writeln!(
&mut ret,
"\nGarage version: {} [features: {}]\nRust compiler version: {}",
garage_util::version::garage_version(),
garage_util::version::garage_features()
.map(|list| list.join(", "))
.unwrap_or_else(|| "(unknown)".into()),
garage_util::version::rust_version(),
)
.unwrap();
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
// Gather table statistics
let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
table.push(self.gather_table_stats(&self.garage.bucket_table)?);
table.push(self.gather_table_stats(&self.garage.key_table)?);
table.push(self.gather_table_stats(&self.garage.object_table)?);
table.push(self.gather_table_stats(&self.garage.version_table)?);
table.push(self.gather_table_stats(&self.garage.block_ref_table)?);
write!(
&mut ret,
"\nTable stats:\n{}",
format_table_to_string(table)
)
.unwrap();
// Gather block manager statistics
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
let rc_len = self.garage.block_manager.rc_len()?.to_string();
writeln!(
&mut ret,
" number of RC entries (~= number of blocks): {}",
rc_len
)
.unwrap();
writeln!(
&mut ret,
" resync queue length: {}",
self.garage.block_manager.resync.queue_len()?
)
.unwrap();
writeln!(
&mut ret,
" blocks with resync errors: {}",
self.garage.block_manager.resync.errors_len()?
)
.unwrap();
if !opt.skip_global {
write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
}
Ok(ret)
}
fn gather_cluster_stats(&self) -> String {
let mut ret = String::new();
// Gather storage node and free space statistics for current nodes
let layout = &self.garage.system.cluster_layout();
let mut node_partition_count = HashMap::<Uuid, u64>::new();
for short_id in layout.current().ring_assignment_data.iter() {
let id = layout.current().node_id_vec[*short_id as usize];
*node_partition_count.entry(id).or_default() += 1;
}
let node_info = self
.garage
.system
.get_known_nodes()
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<_, _>>();
let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
for (id, parts) in node_partition_count.iter() {
let info = node_info.get(id);
let status = info.map(|x| &x.status);
let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
let capacity = role
.map(|x| x.capacity_string())
.unwrap_or_else(|| "?".into());
let avail_str = |x| match x {
Some((avail, total)) => {
let pct = (avail as f64) / (total as f64) * 100.;
let avail = bytesize::ByteSize::b(avail);
let total = bytesize::ByteSize::b(total);
format!("{}/{} ({:.1}%)", avail, total, pct)
}
None => "?".into(),
};
let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
table.push(format!(
" {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
id, hostname, zone, capacity, parts, data_avail, meta_avail
));
}
write!(
&mut ret,
"Storage nodes:\n{}",
format_table_to_string(table)
)
.unwrap();
let meta_part_avail = node_partition_count
.iter()
.filter_map(|(id, parts)| {
node_info
.get(id)
.and_then(|x| x.status.meta_disk_avail)
.map(|c| c.0 / *parts)
})
.collect::<Vec<_>>();
let data_part_avail = node_partition_count
.iter()
.filter_map(|(id, parts)| {
node_info
.get(id)
.and_then(|x| x.status.data_disk_avail)
.map(|c| c.0 / *parts)
})
.collect::<Vec<_>>();
if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
let meta_avail =
bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
let data_avail =
bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
writeln!(
&mut ret,
"\nEstimated available storage space cluster-wide (might be lower in practice):"
)
.unwrap();
if meta_part_avail.len() < node_partition_count.len()
|| data_part_avail.len() < node_partition_count.len()
{
writeln!(&mut ret, " data: < {}", data_avail).unwrap();
writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
} else {
writeln!(&mut ret, " data: {}", data_avail).unwrap();
writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
}
}
ret
}
fn gather_table_stats<F, R>(&self, t: &Arc<Table<F, R>>) -> Result<String, Error>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
let data_len = t.data.store.len().map_err(GarageError::from)?.to_string();
let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
Ok(format!(
" {}\t{}\t{}\t{}\t{}",
F::TABLE_NAME,
data_len,
mkl_len,
t.merkle_updater.todo_len()?,
t.data.gc_todo_len()?
))
}
} }
#[async_trait] #[async_trait]
@ -325,7 +102,6 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
) -> Result<AdminRpc, Error> { ) -> Result<AdminRpc, Error> {
match message { match message {
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
m => Err(GarageError::unexpected_rpc_message(m).into()), m => Err(GarageError::unexpected_rpc_message(m).into()),
} }
} }

View file

@ -45,6 +45,7 @@ impl Cli {
Command::Worker(wo) => self.cmd_worker(wo).await, Command::Worker(wo) => self.cmd_worker(wo).await,
Command::Block(bo) => self.cmd_block(bo).await, Command::Block(bo) => self.cmd_block(bo).await,
Command::Meta(mo) => self.cmd_meta(mo).await, Command::Meta(mo) => self.cmd_meta(mo).await,
Command::Stats(so) => self.cmd_stats(so).await,
// TODO // TODO
Command::Repair(ro) => cli_v1::cmd_admin( Command::Repair(ro) => cli_v1::cmd_admin(
@ -54,11 +55,6 @@ impl Cli {
) )
.await .await
.ok_or_message("cli_v1"), .ok_or_message("cli_v1"),
Command::Stats(so) => {
cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so))
.await
.ok_or_message("cli_v1")
}
_ => unreachable!(), _ => unreachable!(),
} }

View file

@ -33,4 +33,35 @@ impl Cli {
Ok(()) Ok(())
} }
pub async fn cmd_stats(&self, cmd: StatsOpt) -> Result<(), Error> {
let res = self
.api_request(GetNodeStatisticsRequest {
node: if cmd.all_nodes {
"*".to_string()
} else {
hex::encode(self.rpc_host)
},
body: LocalGetNodeStatisticsRequest,
})
.await?;
for (node, res) in res.success.iter() {
println!("======================");
println!("Stats for node {:.16}:\n", node);
println!("{}\n", res.freeform);
}
for (node, err) in res.error.iter() {
println!("======================");
println!("Node {:.16}: error: {}\n", node, err);
}
let res = self.api_request(GetClusterStatisticsRequest).await?;
println!("======================");
println!("Cluster statistics:\n");
println!("{}\n", res.freeform);
Ok(())
}
} }