diff --git a/src/garage/cli.rs b/src/garage/cli.rs index 21bafebd..886cf384 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use structopt::StructOpt; use garage_util::error::Error; +use garage_util::data::UUID; use garage_util::time::*; use garage_rpc::membership::*; @@ -82,6 +83,10 @@ pub struct ConfigureNodeOpt { /// Optionnal node tag #[structopt(short = "t", long = "tag")] tag: Option, + + /// Replaced node(s): list of node IDs that will be removed from the current cluster + #[structopt(long = "replace")] + replace: Vec, } #[derive(StructOpt, Debug)] @@ -379,6 +384,24 @@ pub async fn cmd_status( Ok(()) } +pub fn find_matching_node(cand: impl std::iter::Iterator, pattern: &str) -> Result { + let mut candidates = vec![]; + for c in cand { + if hex::encode(&c).starts_with(&pattern) { + candidates.push(c); + } + } + if candidates.len() != 1 { + Err(Error::Message(format!( + "{} nodes match '{}'", + candidates.len(), + pattern, + ))) + } else { + Ok(candidates[0]) + } +} + pub async fn cmd_configure( rpc_cli: RpcAddrClient, rpc_host: SocketAddr, @@ -392,18 +415,7 @@ pub async fn cmd_configure( resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let mut candidates = vec![]; - for adv in status.iter() { - if hex::encode(&adv.id).starts_with(&args.node_id) { - candidates.push(adv.id); - } - } - if candidates.len() != 1 { - return Err(Error::Message(format!( - "{} matching nodes", - candidates.len() - ))); - } + let added_node = find_matching_node(status.iter().map(|x| x.id), &args.node_id)?; let mut config = match rpc_cli .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) @@ -413,7 +425,14 @@ pub async fn cmd_configure( resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let new_entry = match config.members.get(&candidates[0]) { + for replaced in args.replace.iter() { + let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?; + if config.members.remove(&replaced_node).is_none() { + return Err(Error::Message(format!("Cannot replace node {:?} as it is not in current configuration", replaced_node))); + } + } + + let new_entry = match config.members.get(&added_node) { None => NetworkConfigEntry { datacenter: args .datacenter @@ -430,7 +449,7 @@ pub async fn cmd_configure( }, }; - config.members.insert(candidates[0].clone(), new_entry); + config.members.insert(added_node, new_entry); config.version += 1; rpc_cli @@ -456,27 +475,16 @@ pub async fn cmd_remove( resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let mut candidates = vec![]; - for (key, _) in config.members.iter() { - if hex::encode(key).starts_with(&args.node_id) { - candidates.push(*key); - } - } - if candidates.len() != 1 { - return Err(Error::Message(format!( - "{} matching nodes", - candidates.len() - ))); - } + let deleted_node = find_matching_node(config.members.keys().cloned(), &args.node_id)?; if !args.yes { return Err(Error::Message(format!( "Add the flag --yes to really remove {:?} from the cluster", - candidates[0] + deleted_node ))); } - config.members.remove(&candidates[0]); + config.members.remove(&deleted_node); config.version += 1; rpc_cli