diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs deleted file mode 100644 index 3bbc2b86..00000000 --- a/src/garage/admin/mod.rs +++ /dev/null @@ -1,540 +0,0 @@ -mod block; -mod bucket; -mod key; - -use std::collections::HashMap; -use std::fmt::Write; -use std::future::Future; -use std::sync::Arc; - -use futures::future::FutureExt; - -use serde::{Deserialize, Serialize}; - -use format_table::format_table_to_string; - -use garage_util::background::BackgroundRunner; -use garage_util::data::*; -use garage_util::error::Error as GarageError; - -use garage_table::replication::*; -use garage_table::*; - -use garage_rpc::layout::PARTITION_BITS; -use garage_rpc::*; - -use garage_block::manager::BlockResyncErrorInfo; - -use garage_model::bucket_table::*; -use garage_model::garage::Garage; -use garage_model::helper::error::{Error, OkOrBadRequest}; -use garage_model::key_table::*; -use garage_model::s3::mpu_table::MultipartUpload; -use garage_model::s3::version_table::Version; - -use crate::cli::*; -use crate::repair::online::launch_online_repair; - -pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; - -#[derive(Debug, Serialize, Deserialize)] -#[allow(clippy::large_enum_variant)] -pub enum AdminRpc { - BucketOperation(BucketOperation), - KeyOperation(KeyOperation), - LaunchRepair(RepairOpt), - Stats(StatsOpt), - Worker(WorkerOperation), - BlockOperation(BlockOperation), - MetaOperation(MetaOperation), - - // Replies - Ok(String), - BucketList(Vec), - BucketInfo { - bucket: Bucket, - relevant_keys: HashMap, - counters: HashMap, - mpu_counters: HashMap, - }, - KeyList(Vec<(String, String)>), - KeyInfo(Key, HashMap), - WorkerList( - HashMap, - WorkerListOpt, - ), - WorkerVars(Vec<(Uuid, String, String)>), - WorkerInfo(usize, garage_util::background::WorkerInfo), - BlockErrorList(Vec), - BlockInfo { - hash: Hash, - refcount: u64, - versions: Vec>, - uploads: Vec, - }, -} - -impl Rpc for AdminRpc { - type Response = Result; -} - -pub struct AdminRpcHandler { - garage: Arc, - background: Arc, - endpoint: Arc>, -} - -impl AdminRpcHandler { - pub fn new(garage: Arc, background: Arc) -> Arc { - let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); - let admin = Arc::new(Self { - garage, - background, - endpoint, - }); - admin.endpoint.set_handler(admin.clone()); - admin - } - - // ================ REPAIR COMMANDS ==================== - - async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { - if !opt.yes { - return Err(Error::BadRequest( - "Please provide the --yes flag to initiate repair operations.".to_string(), - )); - } - if opt.all_nodes { - let mut opt_to_send = opt.clone(); - opt_to_send.all_nodes = false; - - let mut failures = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - let resp = self - .endpoint - .call( - &node, - AdminRpc::LaunchRepair(opt_to_send.clone()), - PRIO_NORMAL, - ) - .await; - if !matches!(resp, Ok(Ok(_))) { - failures.push(node); - } - } - if failures.is_empty() { - Ok(AdminRpc::Ok("Repair launched on all nodes".to_string())) - } else { - Err(Error::BadRequest(format!( - "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", - failures - ))) - } - } else { - launch_online_repair(&self.garage, &self.background, opt).await?; - Ok(AdminRpc::Ok(format!( - "Repair launched on {:?}", - self.garage.system.id - ))) - } - } - - // ================ STATS COMMANDS ==================== - - async fn handle_stats(&self, opt: StatsOpt) -> Result { - if opt.all_nodes { - let mut ret = String::new(); - let mut all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in self.garage.system.get_known_nodes().iter() { - if node.is_up && !all_nodes.contains(&node.id) { - all_nodes.push(node.id); - } - } - - for node in all_nodes.iter() { - let mut opt = opt.clone(); - opt.all_nodes = false; - opt.skip_global = true; - - writeln!(&mut ret, "\n======================").unwrap(); - writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); - - let node_id = (*node).into(); - match self - .endpoint - .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL) - .await - { - Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(), - Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), - Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(), - Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(), - } - } - - writeln!(&mut ret, "\n======================").unwrap(); - write!( - &mut ret, - "Cluster statistics:\n\n{}", - self.gather_cluster_stats() - ) - .unwrap(); - - Ok(AdminRpc::Ok(ret)) - } else { - Ok(AdminRpc::Ok(self.gather_stats_local(opt)?)) - } - } - - fn gather_stats_local(&self, opt: StatsOpt) -> Result { - let mut ret = String::new(); - writeln!( - &mut ret, - "\nGarage version: {} [features: {}]\nRust compiler version: {}", - garage_util::version::garage_version(), - garage_util::version::garage_features() - .map(|list| list.join(", ")) - .unwrap_or_else(|| "(unknown)".into()), - garage_util::version::rust_version(), - ) - .unwrap(); - - writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); - - // Gather table statistics - let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; - table.push(self.gather_table_stats(&self.garage.bucket_table)?); - table.push(self.gather_table_stats(&self.garage.key_table)?); - table.push(self.gather_table_stats(&self.garage.object_table)?); - table.push(self.gather_table_stats(&self.garage.version_table)?); - table.push(self.gather_table_stats(&self.garage.block_ref_table)?); - write!( - &mut ret, - "\nTable stats:\n{}", - format_table_to_string(table) - ) - .unwrap(); - - // Gather block manager statistics - writeln!(&mut ret, "\nBlock manager stats:").unwrap(); - let rc_len = self.garage.block_manager.rc_len()?.to_string(); - - writeln!( - &mut ret, - " number of RC entries (~= number of blocks): {}", - rc_len - ) - .unwrap(); - writeln!( - &mut ret, - " resync queue length: {}", - self.garage.block_manager.resync.queue_len()? - ) - .unwrap(); - writeln!( - &mut ret, - " blocks with resync errors: {}", - self.garage.block_manager.resync.errors_len()? - ) - .unwrap(); - - if !opt.skip_global { - write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap(); - } - - Ok(ret) - } - - fn gather_cluster_stats(&self) -> String { - let mut ret = String::new(); - - // Gather storage node and free space statistics for current nodes - let layout = &self.garage.system.cluster_layout(); - let mut node_partition_count = HashMap::::new(); - for short_id in layout.current().ring_assignment_data.iter() { - let id = layout.current().node_id_vec[*short_id as usize]; - *node_partition_count.entry(id).or_default() += 1; - } - let node_info = self - .garage - .system - .get_known_nodes() - .into_iter() - .map(|n| (n.id, n)) - .collect::>(); - - let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; - for (id, parts) in node_partition_count.iter() { - let info = node_info.get(id); - let status = info.map(|x| &x.status); - let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref()); - let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?"); - let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); - let capacity = role - .map(|x| x.capacity_string()) - .unwrap_or_else(|| "?".into()); - let avail_str = |x| match x { - Some((avail, total)) => { - let pct = (avail as f64) / (total as f64) * 100.; - let avail = bytesize::ByteSize::b(avail); - let total = bytesize::ByteSize::b(total); - format!("{}/{} ({:.1}%)", avail, total, pct) - } - None => "?".into(), - }; - let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); - let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); - table.push(format!( - " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", - id, hostname, zone, capacity, parts, data_avail, meta_avail - )); - } - write!( - &mut ret, - "Storage nodes:\n{}", - format_table_to_string(table) - ) - .unwrap(); - - let meta_part_avail = node_partition_count - .iter() - .filter_map(|(id, parts)| { - node_info - .get(id) - .and_then(|x| x.status.meta_disk_avail) - .map(|c| c.0 / *parts) - }) - .collect::>(); - let data_part_avail = node_partition_count - .iter() - .filter_map(|(id, parts)| { - node_info - .get(id) - .and_then(|x| x.status.data_disk_avail) - .map(|c| c.0 / *parts) - }) - .collect::>(); - if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { - let meta_avail = - bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); - let data_avail = - bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); - writeln!( - &mut ret, - "\nEstimated available storage space cluster-wide (might be lower in practice):" - ) - .unwrap(); - if meta_part_avail.len() < node_partition_count.len() - || data_part_avail.len() < node_partition_count.len() - { - writeln!(&mut ret, " data: < {}", data_avail).unwrap(); - writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); - writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); - } else { - writeln!(&mut ret, " data: {}", data_avail).unwrap(); - writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); - } - } - - ret - } - - fn gather_table_stats(&self, t: &Arc>) -> Result - where - F: TableSchema + 'static, - R: TableReplication + 'static, - { - let data_len = t.data.store.len().map_err(GarageError::from)?.to_string(); - let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string(); - - Ok(format!( - " {}\t{}\t{}\t{}\t{}", - F::TABLE_NAME, - data_len, - mkl_len, - t.merkle_updater.todo_len()?, - t.data.gc_todo_len()? - )) - } - - // ================ WORKER COMMANDS ==================== - - async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result { - match cmd { - WorkerOperation::List { opt } => { - let workers = self.background.get_worker_info(); - Ok(AdminRpc::WorkerList(workers, *opt)) - } - WorkerOperation::Info { tid } => { - let info = self - .background - .get_worker_info() - .get(tid) - .ok_or_bad_request(format!("No worker with TID {}", tid))? - .clone(); - Ok(AdminRpc::WorkerInfo(*tid, info)) - } - WorkerOperation::Get { - all_nodes, - variable, - } => self.handle_get_var(*all_nodes, variable).await, - WorkerOperation::Set { - all_nodes, - variable, - value, - } => self.handle_set_var(*all_nodes, variable, value).await, - } - } - - async fn handle_get_var( - &self, - all_nodes: bool, - variable: &Option, - ) -> Result { - if all_nodes { - let mut ret = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Get { - all_nodes: false, - variable: variable.clone(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - #[allow(clippy::collapsible_else_if)] - if let Some(v) = variable { - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - v.clone(), - self.garage.bg_vars.get(v)?, - )])) - } else { - let mut vars = self.garage.bg_vars.get_all(); - vars.sort(); - Ok(AdminRpc::WorkerVars( - vars.into_iter() - .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) - .collect(), - )) - } - } - } - - async fn handle_set_var( - &self, - all_nodes: bool, - variable: &str, - value: &str, - ) -> Result { - if all_nodes { - let mut ret = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Set { - all_nodes: false, - variable: variable.to_string(), - value: value.to_string(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - self.garage.bg_vars.set(variable, value)?; - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - variable.to_string(), - value.to_string(), - )])) - } - } - - // ================ META DB COMMANDS ==================== - - async fn handle_meta_cmd(self: &Arc, mo: &MetaOperation) -> Result { - match mo { - MetaOperation::Snapshot { all: true } => { - let to = self.garage.system.cluster_layout().all_nodes().to_vec(); - - let resps = futures::future::join_all(to.iter().map(|to| async move { - let to = (*to).into(); - self.endpoint - .call( - &to, - AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }), - PRIO_NORMAL, - ) - .await? - })) - .await; - - let mut ret = vec![]; - for (to, resp) in to.iter().zip(resps.iter()) { - let res_str = match resp { - Ok(_) => "ok".to_string(), - Err(e) => format!("error: {}", e), - }; - ret.push(format!("{:?}\t{}", to, res_str)); - } - - if resps.iter().any(Result::is_err) { - Err(GarageError::Message(format_table_to_string(ret)).into()) - } else { - Ok(AdminRpc::Ok(format_table_to_string(ret))) - } - } - MetaOperation::Snapshot { all: false } => { - garage_model::snapshot::async_snapshot_metadata(&self.garage).await?; - Ok(AdminRpc::Ok("Snapshot has been saved.".into())) - } - } - } -} - -impl EndpointHandler for AdminRpcHandler { - fn handle( - self: &Arc, - message: &AdminRpc, - _from: NodeID, - ) -> impl Future> + Send { - let self2 = self.clone(); - async move { - match message { - AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, - AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await, - AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt.clone()).await, - AdminRpc::Stats(opt) => self2.handle_stats(opt.clone()).await, - AdminRpc::Worker(wo) => self2.handle_worker_cmd(wo).await, - AdminRpc::BlockOperation(bo) => self2.handle_block_cmd(bo).await, - AdminRpc::MetaOperation(mo) => self2.handle_meta_cmd(mo).await, - m => Err(GarageError::unexpected_rpc_message(m).into()), - } - } - .boxed() - } -} diff --git a/src/garage/cli/convert_db.rs b/src/garage/cli/local/convert_db.rs similarity index 100% rename from src/garage/cli/convert_db.rs rename to src/garage/cli/local/convert_db.rs diff --git a/src/garage/cli/init.rs b/src/garage/cli/local/init.rs similarity index 100% rename from src/garage/cli/init.rs rename to src/garage/cli/local/init.rs diff --git a/src/garage/cli/local/mod.rs b/src/garage/cli/local/mod.rs new file mode 100644 index 00000000..476010b8 --- /dev/null +++ b/src/garage/cli/local/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod convert_db; +pub(crate) mod init; +pub(crate) mod repair; diff --git a/src/garage/cli/repair.rs b/src/garage/cli/local/repair.rs similarity index 100% rename from src/garage/cli/repair.rs rename to src/garage/cli/local/repair.rs diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs index 146fac56..60e9a5de 100644 --- a/src/garage/cli/mod.rs +++ b/src/garage/cli/mod.rs @@ -1,5 +1,4 @@ -pub(crate) mod structs; +pub mod structs; -pub(crate) mod convert_db; -pub(crate) mod init; -pub(crate) mod repair; +pub mod local; +pub mod remote; diff --git a/src/garage/cli_v2/block.rs b/src/garage/cli/remote/block.rs similarity index 99% rename from src/garage/cli_v2/block.rs rename to src/garage/cli/remote/block.rs index bfc0db4a..933dcbdb 100644 --- a/src/garage/cli_v2/block.rs +++ b/src/garage/cli/remote/block.rs @@ -5,8 +5,8 @@ use garage_util::error::*; use garage_api_admin::api::*; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::*; impl Cli { pub async fn cmd_block(&self, cmd: BlockOperation) -> Result<(), Error> { diff --git a/src/garage/cli_v2/bucket.rs b/src/garage/cli/remote/bucket.rs similarity index 99% rename from src/garage/cli_v2/bucket.rs rename to src/garage/cli/remote/bucket.rs index c25c2c3e..9adcdbe5 100644 --- a/src/garage/cli_v2/bucket.rs +++ b/src/garage/cli/remote/bucket.rs @@ -5,8 +5,8 @@ use garage_util::error::*; use garage_api_admin::api::*; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::*; impl Cli { pub async fn cmd_bucket(&self, cmd: BucketOperation) -> Result<(), Error> { diff --git a/src/garage/cli_v2/cluster.rs b/src/garage/cli/remote/cluster.rs similarity index 98% rename from src/garage/cli_v2/cluster.rs rename to src/garage/cli/remote/cluster.rs index 6eb65d12..9639df8b 100644 --- a/src/garage/cli_v2/cluster.rs +++ b/src/garage/cli/remote/cluster.rs @@ -4,9 +4,9 @@ use garage_util::error::*; use garage_api_admin::api::*; +use crate::cli::remote::layout::*; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::layout::*; -use crate::cli_v2::*; impl Cli { pub async fn cmd_status(&self) -> Result<(), Error> { diff --git a/src/garage/cli_v2/key.rs b/src/garage/cli/remote/key.rs similarity index 99% rename from src/garage/cli_v2/key.rs rename to src/garage/cli/remote/key.rs index b956906d..67843a83 100644 --- a/src/garage/cli_v2/key.rs +++ b/src/garage/cli/remote/key.rs @@ -4,8 +4,8 @@ use garage_util::error::*; use garage_api_admin::api::*; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::*; impl Cli { pub async fn cmd_key(&self, cmd: KeyOperation) -> Result<(), Error> { diff --git a/src/garage/cli_v2/layout.rs b/src/garage/cli/remote/layout.rs similarity index 99% rename from src/garage/cli_v2/layout.rs rename to src/garage/cli/remote/layout.rs index bab6f28e..cd8f99f4 100644 --- a/src/garage/cli_v2/layout.rs +++ b/src/garage/cli/remote/layout.rs @@ -6,8 +6,8 @@ use garage_util::error::*; use garage_api_admin::api::*; use garage_rpc::layout; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::*; impl Cli { pub async fn layout_command_dispatch(&self, cmd: LayoutOperation) -> Result<(), Error> { diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli/remote/mod.rs similarity index 100% rename from src/garage/cli_v2/mod.rs rename to src/garage/cli/remote/mod.rs diff --git a/src/garage/cli_v2/node.rs b/src/garage/cli/remote/node.rs similarity index 99% rename from src/garage/cli_v2/node.rs rename to src/garage/cli/remote/node.rs index c5d0cdea..419d6bf7 100644 --- a/src/garage/cli_v2/node.rs +++ b/src/garage/cli/remote/node.rs @@ -4,8 +4,8 @@ use garage_util::error::*; use garage_api_admin::api::*; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::*; impl Cli { pub async fn cmd_meta(&self, cmd: MetaOperation) -> Result<(), Error> { diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli/remote/worker.rs similarity index 99% rename from src/garage/cli_v2/worker.rs rename to src/garage/cli/remote/worker.rs index 9c248a39..45f0b3cd 100644 --- a/src/garage/cli_v2/worker.rs +++ b/src/garage/cli/remote/worker.rs @@ -4,8 +4,8 @@ use garage_util::error::*; use garage_api_admin::api::*; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::*; impl Cli { pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> { diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 58d066b3..0af92c35 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -2,7 +2,7 @@ use structopt::StructOpt; use garage_util::version::garage_version; -use crate::cli::convert_db; +use crate::cli::local::convert_db; #[derive(StructOpt, Debug)] pub enum Command { diff --git a/src/garage/main.rs b/src/garage/main.rs index 5d392c44..a72b860c 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -5,7 +5,6 @@ extern crate tracing; mod cli; -mod cli_v2; mod secrets; mod server; #[cfg(feature = "telemetry-otlp")] @@ -144,13 +143,13 @@ async fn main() { let res = match opt.cmd { Command::Server => server::run_server(opt.config_file, opt.secrets).await, Command::OfflineRepair(repair_opt) => { - cli::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await + cli::local::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await } Command::ConvertDb(conv_opt) => { - cli::convert_db::do_conversion(conv_opt).map_err(From::from) + cli::local::convert_db::do_conversion(conv_opt).map_err(From::from) } Command::Node(NodeOperation::NodeId(node_id_opt)) => { - cli::init::node_id_command(opt.config_file, node_id_opt.quiet) + cli::local::init::node_id_command(opt.config_file, node_id_opt.quiet) } Command::AdminApiSchema => { println!( @@ -260,7 +259,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { (id, addrs[0], false) } else { let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) - .err_context(cli::init::READ_KEY_ERROR)?; + .err_context(cli::local::init::READ_KEY_ERROR)?; if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) { use std::net::ToSocketAddrs; let a = a @@ -291,7 +290,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { let proxy_rpc_endpoint = netapp.endpoint::(PROXY_RPC_PATH.into()); - let cli = cli_v2::Cli { + let cli = cli::remote::Cli { proxy_rpc_endpoint, rpc_host: id, };