From 69ddaafc6061d06d277fe772dfaa7fe64ecafcc1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 30 Jan 2025 12:07:12 +0100 Subject: [PATCH] wip: migrate garage status and garage layout assign --- src/garage/cli/cmd.rs | 203 ----------------------------------- src/garage/cli/layout.rs | 141 ------------------------ src/garage/cli/mod.rs | 1 - src/garage/cli_v2/cluster.rs | 188 ++++++++++++++++++++++++++++++++ src/garage/cli_v2/layout.rs | 119 ++++++++++++++++++++ src/garage/cli_v2/mod.rs | 72 +++++++++++-- src/garage/cli_v2/util.rs | 115 ++++++++++++++++++++ src/garage/main.rs | 2 - 8 files changed, 486 insertions(+), 355 deletions(-) create mode 100644 src/garage/cli_v2/cluster.rs create mode 100644 src/garage/cli_v2/layout.rs create mode 100644 src/garage/cli_v2/util.rs diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 44d3d96c..2b5f93d4 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -1,10 +1,5 @@ -use std::collections::{HashMap, HashSet}; -use std::time::Duration; - -use format_table::format_table; use garage_util::error::*; -use garage_rpc::layout::*; use garage_rpc::system::*; use garage_rpc::*; @@ -13,204 +8,6 @@ use garage_model::helper::error::Error as HelperError; use crate::admin::*; use crate::cli::*; -pub async fn cli_command_dispatch( - cmd: Command, - system_rpc_endpoint: &Endpoint, - admin_rpc_endpoint: &Endpoint, - rpc_host: NodeID, -) -> Result<(), HelperError> { - match cmd { - Command::Status => Ok(cmd_status(system_rpc_endpoint, rpc_host).await?), - Command::Node(NodeOperation::Connect(connect_opt)) => { - Ok(cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await?) - } - Command::Layout(layout_opt) => { - Ok(cli_layout_command_dispatch(layout_opt, system_rpc_endpoint, rpc_host).await?) - } - Command::Bucket(bo) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await - } - Command::Key(ko) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await - } - Command::Repair(ro) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await - } - Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await, - Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await, - Command::Block(bo) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await - } - Command::Meta(mo) => { - cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::MetaOperation(mo)).await - } - _ => unreachable!(), - } -} - -pub async fn cmd_status(rpc_cli: &Endpoint, rpc_host: NodeID) -> Result<(), Error> { - let status = fetch_status(rpc_cli, rpc_host).await?; - let layout = fetch_layout(rpc_cli, rpc_host).await?; - - println!("==== HEALTHY NODES ===="); - let mut healthy_nodes = - vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()]; - for adv in status.iter().filter(|adv| adv.is_up) { - let host = adv.status.hostname.as_deref().unwrap_or("?"); - let addr = match adv.addr { - Some(addr) => addr.to_string(), - None => "N/A".to_string(), - }; - if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) { - let data_avail = match &adv.status.data_disk_avail { - _ if cfg.capacity.is_none() => "N/A".into(), - Some((avail, total)) => { - let pct = (*avail as f64) / (*total as f64) * 100.; - let avail = bytesize::ByteSize::b(*avail); - format!("{} ({:.1}%)", avail, pct) - } - None => "?".into(), - }; - healthy_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}", - id = adv.id, - host = host, - addr = addr, - tags = cfg.tags.join(","), - zone = cfg.zone, - capacity = cfg.capacity_string(), - data_avail = data_avail, - )); - } else { - let prev_role = layout - .versions - .iter() - .rev() - .find_map(|x| match x.roles.get(&adv.id) { - Some(NodeRoleV(Some(cfg))) => Some(cfg), - _ => None, - }); - if let Some(cfg) = prev_role { - healthy_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...", - id = adv.id, - host = host, - addr = addr, - tags = cfg.tags.join(","), - zone = cfg.zone, - )); - } else { - let new_role = match layout.staging.get().roles.get(&adv.id) { - Some(NodeRoleV(Some(_))) => "pending...", - _ => "NO ROLE ASSIGNED", - }; - healthy_nodes.push(format!( - "{id:?}\t{h}\t{addr}\t\t\t{new_role}", - id = adv.id, - h = host, - addr = addr, - new_role = new_role, - )); - } - } - } - format_table(healthy_nodes); - - // Determine which nodes are unhealthy and print that to stdout - let status_map = status - .iter() - .map(|adv| (adv.id, adv)) - .collect::>(); - - let tf = timeago::Formatter::new(); - let mut drain_msg = false; - let mut failed_nodes = vec!["ID\tHostname\tTags\tZone\tCapacity\tLast seen".to_string()]; - let mut listed = HashSet::new(); - for ver in layout.versions.iter().rev() { - for (node, _, role) in ver.roles.items().iter() { - let cfg = match role { - NodeRoleV(Some(role)) if role.capacity.is_some() => role, - _ => continue, - }; - - if listed.contains(node) { - continue; - } - listed.insert(*node); - - let adv = status_map.get(node); - if adv.map(|x| x.is_up).unwrap_or(false) { - continue; - } - - // Node is in a layout version, is not a gateway node, and is not up: - // it is in a failed state, add proper line to the output - let (host, last_seen) = match adv { - Some(adv) => ( - adv.status.hostname.as_deref().unwrap_or("?"), - adv.last_seen_secs_ago - .map(|s| tf.convert(Duration::from_secs(s))) - .unwrap_or_else(|| "never seen".into()), - ), - None => ("??", "never seen".into()), - }; - let capacity = if ver.version == layout.current().version { - cfg.capacity_string() - } else { - drain_msg = true; - "draining metadata...".to_string() - }; - failed_nodes.push(format!( - "{id:?}\t{host}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", - id = node, - host = host, - tags = cfg.tags.join(","), - zone = cfg.zone, - capacity = capacity, - last_seen = last_seen, - )); - } - } - - if failed_nodes.len() > 1 { - println!("\n==== FAILED NODES ===="); - format_table(failed_nodes); - if drain_msg { - println!(); - println!("Your cluster is expecting to drain data from nodes that are currently unavailable."); - println!("If these nodes are definitely dead, please review the layout history with"); - println!( - "`garage layout history` and use `garage layout skip-dead-nodes` to force progress." - ); - } - } - - if print_staging_role_changes(&layout) { - println!(); - println!("Please use `garage layout show` to check the proposed new layout and apply it."); - println!(); - } - - Ok(()) -} - -pub async fn cmd_connect( - rpc_cli: &Endpoint, - rpc_host: NodeID, - args: ConnectNodeOpt, -) -> Result<(), Error> { - match rpc_cli - .call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL) - .await?? - { - SystemRpc::Ok => { - println!("Success."); - Ok(()) - } - m => Err(Error::unexpected_rpc_message(m)), - } -} - pub async fn cmd_admin( rpc_cli: &Endpoint, rpc_host: NodeID, diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index f053eef4..d0b62fc7 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -10,147 +10,6 @@ use garage_rpc::*; use crate::cli::*; -pub async fn cli_layout_command_dispatch( - cmd: LayoutOperation, - system_rpc_endpoint: &Endpoint, - rpc_host: NodeID, -) -> Result<(), Error> { - match cmd { - LayoutOperation::Assign(assign_opt) => { - cmd_assign_role(system_rpc_endpoint, rpc_host, assign_opt).await - } - LayoutOperation::Remove(remove_opt) => { - cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await - } - LayoutOperation::Show => cmd_show_layout(system_rpc_endpoint, rpc_host).await, - LayoutOperation::Apply(apply_opt) => { - cmd_apply_layout(system_rpc_endpoint, rpc_host, apply_opt).await - } - LayoutOperation::Revert(revert_opt) => { - cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await - } - LayoutOperation::Config(config_opt) => { - cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await - } - LayoutOperation::History => cmd_layout_history(system_rpc_endpoint, rpc_host).await, - LayoutOperation::SkipDeadNodes(assume_sync_opt) => { - cmd_layout_skip_dead_nodes(system_rpc_endpoint, rpc_host, assume_sync_opt).await - } - } -} - -pub async fn cmd_assign_role( - rpc_cli: &Endpoint, - rpc_host: NodeID, - args: AssignRoleOpt, -) -> Result<(), Error> { - let status = match rpc_cli - .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await?? - { - SystemRpc::ReturnKnownNodes(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - let all_nodes = layout.get_all_nodes(); - - let added_nodes = args - .node_ids - .iter() - .map(|node_id| { - find_matching_node( - status - .iter() - .map(|adv| adv.id) - .chain(all_nodes.iter().cloned()), - node_id, - ) - }) - .collect::, _>>()?; - - let mut roles = layout.current().roles.clone(); - roles.merge(&layout.staging.get().roles); - - for replaced in args.replace.iter() { - let replaced_node = find_matching_node(all_nodes.iter().cloned(), replaced)?; - match roles.get(&replaced_node) { - Some(NodeRoleV(Some(_))) => { - layout - .staging - .get_mut() - .roles - .merge(&roles.update_mutator(replaced_node, NodeRoleV(None))); - } - _ => { - return Err(Error::Message(format!( - "Cannot replace node {:?} as it is not currently in planned layout", - replaced_node - ))); - } - } - } - - if args.capacity.is_some() && args.gateway { - return Err(Error::Message( - "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); - } - if args.capacity == Some(ByteSize::b(0)) { - return Err(Error::Message("Invalid capacity value: 0".into())); - } - - for added_node in added_nodes { - let new_entry = match roles.get(&added_node) { - Some(NodeRoleV(Some(old))) => { - let capacity = match args.capacity { - Some(c) => Some(c.as_u64()), - None if args.gateway => None, - None => old.capacity, - }; - let tags = if args.tags.is_empty() { - old.tags.clone() - } else { - args.tags.clone() - }; - NodeRole { - zone: args.zone.clone().unwrap_or_else(|| old.zone.to_string()), - capacity, - tags, - } - } - _ => { - let capacity = match args.capacity { - Some(c) => Some(c.as_u64()), - None if args.gateway => None, - None => return Err(Error::Message( - "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), - }; - NodeRole { - zone: args - .zone - .clone() - .ok_or("Please specify a zone with the -z flag")?, - capacity, - tags: args.tags.clone(), - } - } - }; - - layout - .staging - .get_mut() - .roles - .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); - } - - send_layout(rpc_cli, rpc_host, layout).await?; - - println!("Role changes are staged but not yet committed."); - println!("Use `garage layout show` to view staged role changes,"); - println!("and `garage layout apply` to enact staged changes."); - Ok(()) -} - pub async fn cmd_remove_role( rpc_cli: &Endpoint, rpc_host: NodeID, diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs index e131f62c..30f566e2 100644 --- a/src/garage/cli/mod.rs +++ b/src/garage/cli/mod.rs @@ -8,6 +8,5 @@ pub(crate) mod convert_db; pub(crate) use cmd::*; pub(crate) use init::*; -pub(crate) use layout::*; pub(crate) use structs::*; pub(crate) use util::*; diff --git a/src/garage/cli_v2/cluster.rs b/src/garage/cli_v2/cluster.rs new file mode 100644 index 00000000..0b5b9559 --- /dev/null +++ b/src/garage/cli_v2/cluster.rs @@ -0,0 +1,188 @@ +use format_table::format_table; + +use garage_util::error::*; + +use garage_api::admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::util::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_status(&self) -> Result<(), Error> { + let status = self.api_request(GetClusterStatusRequest).await?; + let layout = self.api_request(GetClusterLayoutRequest).await?; + // TODO: layout history + + println!("==== HEALTHY NODES ===="); + let mut healthy_nodes = + vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()]; + for adv in status.nodes.iter().filter(|adv| adv.is_up) { + let host = adv.hostname.as_deref().unwrap_or("?"); + let addr = match adv.addr { + Some(addr) => addr.to_string(), + None => "N/A".to_string(), + }; + if let Some(cfg) = &adv.role { + let data_avail = match &adv.data_partition { + _ if cfg.capacity.is_none() => "N/A".into(), + Some(FreeSpaceResp { available, total }) => { + let pct = (*available as f64) / (*total as f64) * 100.; + let avail_str = bytesize::ByteSize::b(*available); + format!("{} ({:.1}%)", avail_str, pct) + } + None => "?".into(), + }; + healthy_nodes.push(format!( + "{id:.16}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}", + id = adv.id, + host = host, + addr = addr, + tags = cfg.tags.join(","), + zone = cfg.zone, + capacity = capacity_string(cfg.capacity), + data_avail = data_avail, + )); + } else { + /* + let prev_role = layout + .versions + .iter() + .rev() + .find_map(|x| match x.roles.get(&adv.id) { + Some(NodeRoleV(Some(cfg))) => Some(cfg), + _ => None, + }); + */ + let prev_role = Option::::None; //TODO + if let Some(cfg) = prev_role { + healthy_nodes.push(format!( + "{id:.16}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...", + id = adv.id, + host = host, + addr = addr, + tags = cfg.tags.join(","), + zone = cfg.zone, + )); + } else { + let new_role = match layout.staged_role_changes.iter().find(|x| x.id == adv.id) + { + Some(_) => "pending...", + _ => "NO ROLE ASSIGNED", + }; + healthy_nodes.push(format!( + "{id:?}\t{h}\t{addr}\t\t\t{new_role}", + id = adv.id, + h = host, + addr = addr, + new_role = new_role, + )); + } + } + } + format_table(healthy_nodes); + + // Determine which nodes are unhealthy and print that to stdout + // TODO: do we need this, or can it be done in the GetClusterStatus handler? + let status_map = status + .nodes + .iter() + .map(|adv| (&adv.id, adv)) + .collect::>(); + + let tf = timeago::Formatter::new(); + let mut drain_msg = false; + let mut failed_nodes = vec!["ID\tHostname\tTags\tZone\tCapacity\tLast seen".to_string()]; + let mut listed = HashSet::new(); + //for ver in layout.versions.iter().rev() { + for ver in [&layout].iter() { + for cfg in ver.roles.iter() { + let node = &cfg.id; + if listed.contains(node.as_str()) { + continue; + } + listed.insert(node.as_str()); + + let adv = status_map.get(node); + if adv.map(|x| x.is_up).unwrap_or(false) { + continue; + } + + // Node is in a layout version, is not a gateway node, and is not up: + // it is in a failed state, add proper line to the output + let (host, last_seen) = match adv { + Some(adv) => ( + adv.hostname.as_deref().unwrap_or("?"), + adv.last_seen_secs_ago + .map(|s| tf.convert(Duration::from_secs(s))) + .unwrap_or_else(|| "never seen".into()), + ), + None => ("??", "never seen".into()), + }; + /* + let capacity = if ver.version == layout.current().version { + cfg.capacity_string() + } else { + drain_msg = true; + "draining metadata...".to_string() + }; + */ + let capacity = capacity_string(cfg.capacity); + + failed_nodes.push(format!( + "{id:?}\t{host}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", + id = node, + host = host, + tags = cfg.tags.join(","), + zone = cfg.zone, + capacity = capacity, + last_seen = last_seen, + )); + } + } + + if failed_nodes.len() > 1 { + println!("\n==== FAILED NODES ===="); + format_table(failed_nodes); + if drain_msg { + println!(); + println!("Your cluster is expecting to drain data from nodes that are currently unavailable."); + println!( + "If these nodes are definitely dead, please review the layout history with" + ); + println!( + "`garage layout history` and use `garage layout skip-dead-nodes` to force progress." + ); + } + } + + if print_staging_role_changes(&layout) { + println!(); + println!( + "Please use `garage layout show` to check the proposed new layout and apply it." + ); + println!(); + } + + Ok(()) + } + + pub async fn cmd_connect(&self, opt: ConnectNodeOpt) -> Result<(), Error> { + let res = self + .api_request(ConnectClusterNodesRequest(vec![opt.node])) + .await?; + if res.0.len() != 1 { + return Err(Error::Message(format!("unexpected response: {:?}", res))); + } + let res = res.0.into_iter().next().unwrap(); + if res.success { + println!("Success."); + Ok(()) + } else { + Err(Error::Message(format!( + "Failure: {}", + res.error.unwrap_or_default() + ))) + } + } +} diff --git a/src/garage/cli_v2/layout.rs b/src/garage/cli_v2/layout.rs new file mode 100644 index 00000000..ccd1886f --- /dev/null +++ b/src/garage/cli_v2/layout.rs @@ -0,0 +1,119 @@ +use bytesize::ByteSize; +use format_table::format_table; + +use garage_util::error::*; + +use garage_api::admin::api::*; + +use crate::cli::layout as cli_v1; +use crate::cli::structs::*; +use crate::cli_v2::util::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn layout_command_dispatch(&self, cmd: LayoutOperation) -> Result<(), Error> { + match cmd { + LayoutOperation::Assign(assign_opt) => self.cmd_assign_role(assign_opt).await, + + // TODO + LayoutOperation::Remove(remove_opt) => { + cli_v1::cmd_remove_role(&self.system_rpc_endpoint, self.rpc_host, remove_opt).await + } + LayoutOperation::Show => { + cli_v1::cmd_show_layout(&self.system_rpc_endpoint, self.rpc_host).await + } + LayoutOperation::Apply(apply_opt) => { + cli_v1::cmd_apply_layout(&self.system_rpc_endpoint, self.rpc_host, apply_opt).await + } + LayoutOperation::Revert(revert_opt) => { + cli_v1::cmd_revert_layout(&self.system_rpc_endpoint, self.rpc_host, revert_opt) + .await + } + LayoutOperation::Config(config_opt) => { + cli_v1::cmd_config_layout(&self.system_rpc_endpoint, self.rpc_host, config_opt) + .await + } + LayoutOperation::History => { + cli_v1::cmd_layout_history(&self.system_rpc_endpoint, self.rpc_host).await + } + LayoutOperation::SkipDeadNodes(assume_sync_opt) => { + cli_v1::cmd_layout_skip_dead_nodes( + &self.system_rpc_endpoint, + self.rpc_host, + assume_sync_opt, + ) + .await + } + } + } + + pub async fn cmd_assign_role(&self, opt: AssignRoleOpt) -> Result<(), Error> { + let status = self.api_request(GetClusterStatusRequest).await?; + let layout = self.api_request(GetClusterLayoutRequest).await?; + + let all_node_ids_iter = status + .nodes + .iter() + .map(|x| x.id.as_str()) + .chain(layout.roles.iter().map(|x| x.id.as_str())); + + let mut actions = vec![]; + + for node in opt.replace.iter() { + let id = find_matching_node(all_node_ids_iter.clone(), &node)?; + + actions.push(NodeRoleChange { + id, + action: NodeRoleChangeEnum::Remove { remove: true }, + }); + } + + for node in opt.node_ids.iter() { + let id = find_matching_node(all_node_ids_iter.clone(), &node)?; + + let current = get_staged_or_current_role(&id, &layout); + + let zone = opt + .zone + .clone() + .or_else(|| current.as_ref().map(|c| c.zone.clone())) + .ok_or_message("Please specify a zone with the -z flag")?; + + let capacity = if opt.gateway { + if opt.capacity.is_some() { + return Err(Error::Message("Please specify only -c or -g".into())); + } + None + } else if let Some(cap) = opt.capacity { + Some(cap.as_u64()) + } else { + current.as_ref().ok_or_message("Please specify a capacity with the -c flag, or set node explicitly as gateway with -g")?.capacity + }; + + let tags = if !opt.tags.is_empty() { + opt.tags.clone() + } else if let Some(cur) = current.as_ref() { + cur.tags.clone() + } else { + vec![] + }; + + actions.push(NodeRoleChange { + id, + action: NodeRoleChangeEnum::Update { + zone, + capacity, + tags, + }, + }); + } + + self.api_request(UpdateClusterLayoutRequest(actions)) + .await?; + + println!("Role changes are staged but not yet committed."); + println!("Use `garage layout show` to view staged role changes,"); + println!("and `garage layout apply` to enact staged changes."); + Ok(()) + } +} diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs index 6cf068c6..2fe45e29 100644 --- a/src/garage/cli_v2/mod.rs +++ b/src/garage/cli_v2/mod.rs @@ -1,12 +1,15 @@ +pub mod util; + +pub mod cluster; +pub mod layout; + use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::sync::Arc; use std::time::Duration; -use format_table::format_table; use garage_util::error::*; -use garage_rpc::layout::*; use garage_rpc::system::*; use garage_rpc::*; @@ -14,7 +17,9 @@ use garage_api::admin::api::*; use garage_api::admin::EndpointHandler as AdminApiEndpoint; use crate::admin::*; -use crate::cli::*; +use crate::cli as cli_v1; +use crate::cli::structs::*; +use crate::cli::Command; pub struct Cli { pub system_rpc_endpoint: Arc>, @@ -24,13 +29,64 @@ pub struct Cli { impl Cli { pub async fn handle(&self, cmd: Command) -> Result<(), Error> { - println!("{:?}", self.api_request(GetClusterStatusRequest).await?); - Ok(()) - /* match cmd { - _ => todo!(), + Command::Status => self.cmd_status().await, + Command::Node(NodeOperation::Connect(connect_opt)) => { + self.cmd_connect(connect_opt).await + } + Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await, + + // TODO + Command::Bucket(bo) => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::BucketOperation(bo), + ) + .await + .ok_or_message("xoxo"), + Command::Key(ko) => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::KeyOperation(ko), + ) + .await + .ok_or_message("xoxo"), + Command::Repair(ro) => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::LaunchRepair(ro), + ) + .await + .ok_or_message("xoxo"), + Command::Stats(so) => { + cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so)) + .await + .ok_or_message("xoxo") + } + Command::Worker(wo) => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::Worker(wo), + ) + .await + .ok_or_message("xoxo"), + Command::Block(bo) => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::BlockOperation(bo), + ) + .await + .ok_or_message("xoxo"), + Command::Meta(mo) => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::MetaOperation(mo), + ) + .await + .ok_or_message("xoxo"), + + _ => unreachable!(), } - */ } pub async fn api_request(&self, req: T) -> Result<::Response, Error> diff --git a/src/garage/cli_v2/util.rs b/src/garage/cli_v2/util.rs new file mode 100644 index 00000000..78399b0d --- /dev/null +++ b/src/garage/cli_v2/util.rs @@ -0,0 +1,115 @@ +use bytesize::ByteSize; +use format_table::format_table; + +use garage_util::error::Error; + +use garage_api::admin::api::*; + +pub fn capacity_string(v: Option) -> String { + match v { + Some(c) => ByteSize::b(c).to_string_as(false), + None => "gateway".to_string(), + } +} + +pub fn get_staged_or_current_role( + id: &str, + layout: &GetClusterLayoutResponse, +) -> Option { + for node in layout.staged_role_changes.iter() { + if node.id == id { + return match &node.action { + NodeRoleChangeEnum::Remove { .. } => None, + NodeRoleChangeEnum::Update { + zone, + capacity, + tags, + } => Some(NodeRoleResp { + id: id.to_string(), + zone: zone.to_string(), + capacity: *capacity, + tags: tags.clone(), + }), + }; + } + } + + for node in layout.roles.iter() { + if node.id == id { + return Some(node.clone()); + } + } + + None +} + +pub fn find_matching_node<'a>( + cand: impl std::iter::Iterator, + pattern: &'a str, +) -> Result { + let mut candidates = vec![]; + for c in cand { + if c.starts_with(pattern) && !candidates.contains(&c) { + candidates.push(c); + } + } + if candidates.len() != 1 { + Err(Error::Message(format!( + "{} nodes match '{}'", + candidates.len(), + pattern, + ))) + } else { + Ok(candidates[0].to_string()) + } +} + +pub fn print_staging_role_changes(layout: &GetClusterLayoutResponse) -> bool { + let has_role_changes = !layout.staged_role_changes.is_empty(); + + // TODO!! Layout parameters + let has_layout_changes = false; + + if has_role_changes || has_layout_changes { + println!(); + println!("==== STAGED ROLE CHANGES ===="); + if has_role_changes { + let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; + for change in layout.staged_role_changes.iter() { + match &change.action { + NodeRoleChangeEnum::Update { + tags, + zone, + capacity, + } => { + let tags = tags.join(","); + table.push(format!( + "{:.16}\t{}\t{}\t{}", + change.id, + tags, + zone, + capacity_string(*capacity), + )); + } + NodeRoleChangeEnum::Remove { .. } => { + table.push(format!("{:.16}\tREMOVED", change.id)); + } + } + } + format_table(table); + println!(); + } + //TODO + /* + if has_layout_changes { + println!( + "Zone redundancy: {}", + staging.parameters.get().zone_redundancy + ); + } + */ + true + } else { + false + } +} diff --git a/src/garage/main.rs b/src/garage/main.rs index 8b5af5ea..08c7cee7 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -35,8 +35,6 @@ use garage_util::error::*; use garage_rpc::system::*; use garage_rpc::*; -use garage_model::helper::error::Error as HelperError; - use admin::*; use cli::*; use secrets::Secrets;