diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 0d9e4fa4..e76f7737 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -43,7 +43,11 @@ pub async fn cmd_assign_role( resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; + let added_nodes = args + .node_ids + .iter() + .map(|node_id| find_matching_node(status.iter().map(|adv| adv.id), node_id)) + .collect::, _>>()?; let mut layout = fetch_layout(rpc_cli, rpc_host).await?; @@ -75,46 +79,51 @@ pub async fn cmd_assign_role( return Err(Error::Message("Invalid capacity value: 0".into())); } - let new_entry = match roles.get(&added_node) { - Some(NodeRoleV(Some(old))) => { - let capacity = match args.capacity { - Some(c) => Some(c), - None if args.gateway => None, - None => old.capacity, - }; - let tags = if args.tags.is_empty() { - old.tags.clone() - } else { - args.tags - }; - NodeRole { - zone: args.zone.unwrap_or_else(|| old.zone.to_string()), - capacity, - tags, + for added_node in added_nodes { + let new_entry = match roles.get(&added_node) { + Some(NodeRoleV(Some(old))) => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + None => old.capacity, + }; + let tags = if args.tags.is_empty() { + old.tags.clone() + } else { + args.tags.clone() + }; + NodeRole { + zone: args.zone.clone().unwrap_or_else(|| old.zone.to_string()), + capacity, + tags, + } } - } - _ => { - let capacity = match args.capacity { - Some(c) => Some(c), - None if args.gateway => None, - None => return Err(Error::Message( - "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), - }; - NodeRole { - zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?, - capacity, - tags: args.tags, + _ => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + None => return Err(Error::Message( + "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), + }; + NodeRole { + zone: args + .zone + .clone() + .ok_or("Please specifiy a zone with the -z flag")?, + capacity, + tags: args.tags.clone(), + } } - } - }; + }; - layout - .staging - .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); + layout + .staging + .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); + } send_layout(rpc_cli, rpc_host, layout).await?; - println!("Role change is staged but not yet commited."); + println!("Role changes are staged but not yet commited."); println!("Use `garage layout show` to view staged role changes,"); println!("and `garage layout apply` to enact staged changes."); Ok(()) diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 30cbb2da..a0c49aeb 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -92,8 +92,9 @@ pub enum LayoutOperation { #[derive(StructOpt, Debug)] pub struct AssignRoleOpt { - /// Node to which to assign role (prefix of hexadecimal node id) - pub(crate) node_id: String, + /// Node(s) to which to assign role (prefix of hexadecimal node id) + #[structopt(required = true)] + pub(crate) node_ids: Vec, /// Location (zone or datacenter) of the node #[structopt(short = "z", long = "zone")] diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 895dbf1c..b9c02c21 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -172,30 +172,43 @@ impl ClusterLayout { println!("Calculating updated partition assignation, this may take some time..."); println!(); + // Get old partition assignation let old_partitions = self.parse_assignation_data(); - let mut partitions = old_partitions.clone(); - for part in partitions.iter_mut() { - part.nodes - .retain(|(_, info)| info.map(|x| x.capacity.is_some()).unwrap_or(false)); - } + // Start new partition assignation with nodes from old assignation where it is relevant + let mut partitions = old_partitions + .iter() + .map(|old_part| { + let mut new_part = PartitionAss::new(); + for node in old_part.nodes.iter() { + if let Some(role) = node.1 { + if role.capacity.is_some() { + new_part.add(None, n_zones, node.0, role); + } + } + } + new_part + }) + .collect::>(); - // When nodes are removed, or when bootstraping an assignation from - // scratch for a new cluster, the old partitions will have holes (or be empty). - // Here we add more nodes to make a complete (sub-optimal) assignation, + // In various cases, not enough nodes will have been added for all partitions + // in the step above (e.g. due to node removals, or new zones being added). + // Here we add more nodes to make a complete (but sub-optimal) assignation, // using an initial partition assignation that is calculated using the multi-dc maglev trick match self.initial_partition_assignation() { Some(initial_partitions) => { for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) { for (id, info) in ipart.nodes.iter() { if part.nodes.len() < self.replication_factor { - part.add(part.nodes.len() + 1, n_zones, id, info.unwrap()); + part.add(None, n_zones, id, info.unwrap()); } } assert!(part.nodes.len() == self.replication_factor); } } None => { + // Not enough nodes in cluster to build a correct assignation. + // Signal it by returning an error. return false; } } @@ -232,8 +245,13 @@ impl ClusterLayout { let mut option = None; for (i, part) in partitions.iter_mut().enumerate() { for (irm, (idrm, _)) in part.nodes.iter().enumerate() { - let suprm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32 - - target_partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32; + let errratio = |node, parts| { + let tgt = *target_partitions_per_node.get(node).unwrap() as f32; + (parts - tgt) / tgt + }; + let square = |x| x * x; + + let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32; for (idadd, infoadd) in configured_nodes.iter() { // skip replacing a node by itself @@ -242,14 +260,12 @@ impl ClusterLayout { continue; } - let supadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32 - - target_partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32; - // We want to try replacing node idrm by node idadd // if that brings us close to our goal. - let square = |i: i32| i * i; - let oldcost = square(suprm) + square(supadd); - let newcost = square(suprm - 1) + square(supadd + 1); + let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32; + let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd)); + let newcost = + square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.)); if newcost >= oldcost { // not closer to our goal continue; @@ -259,7 +275,7 @@ impl ClusterLayout { let mut newpart = part.clone(); newpart.nodes.remove(irm); - if !newpart.add(newpart.nodes.len() + 1, n_zones, idadd, infoadd) { + if !newpart.add(None, n_zones, idadd, infoadd) { continue; } assert!(newpart.nodes.len() == self.replication_factor); @@ -302,7 +318,9 @@ impl ClusterLayout { // Show statistics println!("New number of partitions per node:"); for (node, npart) in partitions_per_node.iter() { - println!("{:?}\t{}", node, npart); + let tgt = *target_partitions_per_node.get(node).unwrap(); + let pct = 100f32 * (*npart as f32) / (tgt as f32); + println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt); } println!(); @@ -394,7 +412,7 @@ impl ClusterLayout { continue; } for (pos2, &qv) in q.iter().enumerate().skip(*pos) { - if partitions[qv].add(rep + 1, n_zones, node_id, node_info) { + if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) { remaining -= 1; *pos = pos2 + 1; break; @@ -551,15 +569,26 @@ impl<'a> PartitionAss<'a> { } } + // add is a key function in creating a PartitionAss, i.e. the list of nodes + // to which a partition is assigned. It tries to add a certain node id to the + // assignation, but checks that doing so is compatible with the NECESSARY + // condition that the partition assignation must be dispersed over different + // zones (datacenters) if enough zones exist. This is why it takes a n_zones + // parameter, which is the total number of zones that have existing nodes: + // if nodes in the assignation already cover all n_zones zones, then any node + // that is not yet in the assignation can be added. Otherwise, only nodes + // that are in a new zone can be added. fn add( &mut self, - target_len: usize, + target_len: Option, n_zones: usize, node: &'a Uuid, role: &'a NodeRole, ) -> bool { - if self.nodes.len() != target_len - 1 { - return false; + if let Some(tl) = target_len { + if self.nodes.len() != tl - 1 { + return false; + } } let p_zns = self