garage node configure --replace <old_node_id> <new_node_id>

This commit is contained in:
Alex 2021-03-18 21:49:12 +01:00
parent e19e267b7e
commit a1014224d3
1 changed files with 36 additions and 28 deletions

View File

@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use garage_util::error::Error;
use garage_util::data::UUID;
use garage_util::time::*;
use garage_rpc::membership::*;
@ -82,6 +83,10 @@ pub struct ConfigureNodeOpt {
/// Optionnal node tag
#[structopt(short = "t", long = "tag")]
tag: Option<String>,
/// Replaced node(s): list of node IDs that will be removed from the current cluster
#[structopt(long = "replace")]
replace: Vec<String>,
}
#[derive(StructOpt, Debug)]
@ -379,6 +384,24 @@ pub async fn cmd_status(
Ok(())
}
pub fn find_matching_node(cand: impl std::iter::Iterator<Item=UUID>, pattern: &str) -> Result<UUID, Error> {
let mut candidates = vec![];
for c in cand {
if hex::encode(&c).starts_with(&pattern) {
candidates.push(c);
}
}
if candidates.len() != 1 {
Err(Error::Message(format!(
"{} nodes match '{}'",
candidates.len(),
pattern,
)))
} else {
Ok(candidates[0])
}
}
pub async fn cmd_configure(
rpc_cli: RpcAddrClient<Message>,
rpc_host: SocketAddr,
@ -392,18 +415,7 @@ pub async fn cmd_configure(
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let mut candidates = vec![];
for adv in status.iter() {
if hex::encode(&adv.id).starts_with(&args.node_id) {
candidates.push(adv.id);
}
}
if candidates.len() != 1 {
return Err(Error::Message(format!(
"{} matching nodes",
candidates.len()
)));
}
let added_node = find_matching_node(status.iter().map(|x| x.id), &args.node_id)?;
let mut config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
@ -413,7 +425,14 @@ pub async fn cmd_configure(
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let new_entry = match config.members.get(&candidates[0]) {
for replaced in args.replace.iter() {
let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
if config.members.remove(&replaced_node).is_none() {
return Err(Error::Message(format!("Cannot replace node {:?} as it is not in current configuration", replaced_node)));
}
}
let new_entry = match config.members.get(&added_node) {
None => NetworkConfigEntry {
datacenter: args
.datacenter
@ -430,7 +449,7 @@ pub async fn cmd_configure(
},
};
config.members.insert(candidates[0].clone(), new_entry);
config.members.insert(added_node, new_entry);
config.version += 1;
rpc_cli
@ -456,27 +475,16 @@ pub async fn cmd_remove(
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let mut candidates = vec![];
for (key, _) in config.members.iter() {
if hex::encode(key).starts_with(&args.node_id) {
candidates.push(*key);
}
}
if candidates.len() != 1 {
return Err(Error::Message(format!(
"{} matching nodes",
candidates.len()
)));
}
let deleted_node = find_matching_node(config.members.keys().cloned(), &args.node_id)?;
if !args.yes {
return Err(Error::Message(format!(
"Add the flag --yes to really remove {:?} from the cluster",
candidates[0]
deleted_node
)));
}
config.members.remove(&candidates[0]);
config.members.remove(&deleted_node);
config.version += 1;
rpc_cli