From 413ab0eaedb12b8808897098263cefa4bc7a7663 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 10 Feb 2022 16:10:21 +0100 Subject: [PATCH 1/4] Small change to partition assignation algorithm This change helps ensure that nodes for each partition are spread over all datacenters, a property that wasn't ensured previously when going from a 2 DC deployment to a 3 DC deployment --- src/garage/cli/layout.rs | 18 +++++++++--------- src/rpc/layout.rs | 28 ++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 0d9e4fa4..e0aba1d1 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -196,6 +196,15 @@ pub async fn cmd_apply_layout( ) -> Result<(), Error> { let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + layout.roles.merge(&layout.staging); + + if !layout.calculate_partition_assignation() { + return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); + } + + layout.staging.clear(); + layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); + match apply_opt.version { None => { println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); @@ -209,15 +218,6 @@ pub async fn cmd_apply_layout( } } - layout.roles.merge(&layout.staging); - - if !layout.calculate_partition_assignation() { - return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); - } - - layout.staging.clear(); - layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); - layout.version += 1; send_layout(rpc_cli, rpc_host, layout).await?; diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 895dbf1c..a24bd9f3 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -172,12 +172,38 @@ impl ClusterLayout { println!("Calculating updated partition assignation, this may take some time..."); println!(); + // Get old partition assignation let old_partitions = self.parse_assignation_data(); + // Create new partition assignation starting from old one let mut partitions = old_partitions.clone(); + + // Cleanup steps in new partition assignation: + let min_keep_nodes_per_part = (self.replication_factor + 1) / 2; for part in partitions.iter_mut() { + // - remove from assignation nodes that don't have a role in the layout anymore part.nodes .retain(|(_, info)| info.map(|x| x.capacity.is_some()).unwrap_or(false)); + + // - remove from assignation some nodes that are in the same datacenter + // if we can, so that the later steps can ensure datacenter variety + // as much as possible (but still under the constraint that each partition + // should not move from at least a certain number of nodes that is + // min_keep_nodes_per_part) + 'rmloop: while part.nodes.len() > min_keep_nodes_per_part { + let mut zns_c = HashMap::<&str, usize>::new(); + for (_id, info) in part.nodes.iter() { + *zns_c.entry(info.unwrap().zone.as_str()).or_insert(0) += 1; + } + for i in 0..part.nodes.len() { + if zns_c[part.nodes[i].1.unwrap().zone.as_str()] > 1 { + part.nodes.remove(i); + continue 'rmloop; + } + } + + break; + } } // When nodes are removed, or when bootstraping an assignation from @@ -196,6 +222,8 @@ impl ClusterLayout { } } None => { + // Not enough nodes in cluster to build a correct assignation. + // Signal it by returning an error. return false; } } -- 2.43.0 From 7e0e2ffda282a923ec58295942b5c8a05e5645be Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 16 Mar 2022 14:42:42 +0100 Subject: [PATCH 2/4] Slight change and add comment to layout assignation algo --- src/rpc/layout.rs | 70 ++++++++++++++++++++++------------------------- 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index a24bd9f3..73b356ad 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -175,47 +175,32 @@ impl ClusterLayout { // Get old partition assignation let old_partitions = self.parse_assignation_data(); - // Create new partition assignation starting from old one - let mut partitions = old_partitions.clone(); - - // Cleanup steps in new partition assignation: - let min_keep_nodes_per_part = (self.replication_factor + 1) / 2; - for part in partitions.iter_mut() { - // - remove from assignation nodes that don't have a role in the layout anymore - part.nodes - .retain(|(_, info)| info.map(|x| x.capacity.is_some()).unwrap_or(false)); - - // - remove from assignation some nodes that are in the same datacenter - // if we can, so that the later steps can ensure datacenter variety - // as much as possible (but still under the constraint that each partition - // should not move from at least a certain number of nodes that is - // min_keep_nodes_per_part) - 'rmloop: while part.nodes.len() > min_keep_nodes_per_part { - let mut zns_c = HashMap::<&str, usize>::new(); - for (_id, info) in part.nodes.iter() { - *zns_c.entry(info.unwrap().zone.as_str()).or_insert(0) += 1; - } - for i in 0..part.nodes.len() { - if zns_c[part.nodes[i].1.unwrap().zone.as_str()] > 1 { - part.nodes.remove(i); - continue 'rmloop; + // Start new partition assignation with nodes from old assignation where it is relevant + let mut partitions = old_partitions + .iter() + .map(|old_part| { + let mut new_part = PartitionAss::new(); + for node in old_part.nodes.iter() { + if let Some(role) = node.1 { + if role.capacity.is_some() { + new_part.add(None, n_zones, node.0, role); + } } } + new_part + }) + .collect::>(); - break; - } - } - - // When nodes are removed, or when bootstraping an assignation from - // scratch for a new cluster, the old partitions will have holes (or be empty). - // Here we add more nodes to make a complete (sub-optimal) assignation, + // In various cases, not enough nodes will have been added for all partitions + // in the step above (e.g. due to node removals, or new zones being added). + // Here we add more nodes to make a complete (but sub-optimal) assignation, // using an initial partition assignation that is calculated using the multi-dc maglev trick match self.initial_partition_assignation() { Some(initial_partitions) => { for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) { for (id, info) in ipart.nodes.iter() { if part.nodes.len() < self.replication_factor { - part.add(part.nodes.len() + 1, n_zones, id, info.unwrap()); + part.add(None, n_zones, id, info.unwrap()); } } assert!(part.nodes.len() == self.replication_factor); @@ -287,7 +272,7 @@ impl ClusterLayout { let mut newpart = part.clone(); newpart.nodes.remove(irm); - if !newpart.add(newpart.nodes.len() + 1, n_zones, idadd, infoadd) { + if !newpart.add(None, n_zones, idadd, infoadd) { continue; } assert!(newpart.nodes.len() == self.replication_factor); @@ -422,7 +407,7 @@ impl ClusterLayout { continue; } for (pos2, &qv) in q.iter().enumerate().skip(*pos) { - if partitions[qv].add(rep + 1, n_zones, node_id, node_info) { + if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) { remaining -= 1; *pos = pos2 + 1; break; @@ -579,15 +564,26 @@ impl<'a> PartitionAss<'a> { } } + // add is a key function in creating a PartitionAss, i.e. the list of nodes + // to which a partition is assigned. It tries to add a certain node id to the + // assignation, but checks that doing so is compatible with the NECESSARY + // condition that the partition assignation must be dispersed over different + // zones (datacenters) if enough zones exist. This is why it takes a n_zones + // parameter, which is the total number of zones that have existing nodes: + // if nodes in the assignation already cover all n_zones zones, then any node + // that is not yet in the assignation can be added. Otherwise, only nodes + // that are in a new zone can be added. fn add( &mut self, - target_len: usize, + target_len: Option, n_zones: usize, node: &'a Uuid, role: &'a NodeRole, ) -> bool { - if self.nodes.len() != target_len - 1 { - return false; + if let Some(tl) = target_len { + if self.nodes.len() != tl - 1 { + return false; + } } let p_zns = self -- 2.43.0 From 2814d41842bd48b0015c5ae000a61113b7d806d7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 16 Mar 2022 14:43:04 +0100 Subject: [PATCH 3/4] Allow `garage layout assign` to assign to several nodes at once --- src/garage/cli/layout.rs | 97 +++++++++++++++++++++------------------ src/garage/cli/structs.rs | 5 +- 2 files changed, 56 insertions(+), 46 deletions(-) diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index e0aba1d1..e76f7737 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -43,7 +43,11 @@ pub async fn cmd_assign_role( resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; + let added_nodes = args + .node_ids + .iter() + .map(|node_id| find_matching_node(status.iter().map(|adv| adv.id), node_id)) + .collect::, _>>()?; let mut layout = fetch_layout(rpc_cli, rpc_host).await?; @@ -75,46 +79,51 @@ pub async fn cmd_assign_role( return Err(Error::Message("Invalid capacity value: 0".into())); } - let new_entry = match roles.get(&added_node) { - Some(NodeRoleV(Some(old))) => { - let capacity = match args.capacity { - Some(c) => Some(c), - None if args.gateway => None, - None => old.capacity, - }; - let tags = if args.tags.is_empty() { - old.tags.clone() - } else { - args.tags - }; - NodeRole { - zone: args.zone.unwrap_or_else(|| old.zone.to_string()), - capacity, - tags, + for added_node in added_nodes { + let new_entry = match roles.get(&added_node) { + Some(NodeRoleV(Some(old))) => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + None => old.capacity, + }; + let tags = if args.tags.is_empty() { + old.tags.clone() + } else { + args.tags.clone() + }; + NodeRole { + zone: args.zone.clone().unwrap_or_else(|| old.zone.to_string()), + capacity, + tags, + } } - } - _ => { - let capacity = match args.capacity { - Some(c) => Some(c), - None if args.gateway => None, - None => return Err(Error::Message( - "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), - }; - NodeRole { - zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?, - capacity, - tags: args.tags, + _ => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + None => return Err(Error::Message( + "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), + }; + NodeRole { + zone: args + .zone + .clone() + .ok_or("Please specifiy a zone with the -z flag")?, + capacity, + tags: args.tags.clone(), + } } - } - }; + }; - layout - .staging - .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); + layout + .staging + .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); + } send_layout(rpc_cli, rpc_host, layout).await?; - println!("Role change is staged but not yet commited."); + println!("Role changes are staged but not yet commited."); println!("Use `garage layout show` to view staged role changes,"); println!("and `garage layout apply` to enact staged changes."); Ok(()) @@ -196,15 +205,6 @@ pub async fn cmd_apply_layout( ) -> Result<(), Error> { let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - layout.roles.merge(&layout.staging); - - if !layout.calculate_partition_assignation() { - return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); - } - - layout.staging.clear(); - layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); - match apply_opt.version { None => { println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); @@ -218,6 +218,15 @@ pub async fn cmd_apply_layout( } } + layout.roles.merge(&layout.staging); + + if !layout.calculate_partition_assignation() { + return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); + } + + layout.staging.clear(); + layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); + layout.version += 1; send_layout(rpc_cli, rpc_host, layout).await?; diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 30cbb2da..a0c49aeb 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -92,8 +92,9 @@ pub enum LayoutOperation { #[derive(StructOpt, Debug)] pub struct AssignRoleOpt { - /// Node to which to assign role (prefix of hexadecimal node id) - pub(crate) node_id: String, + /// Node(s) to which to assign role (prefix of hexadecimal node id) + #[structopt(required = true)] + pub(crate) node_ids: Vec, /// Location (zone or datacenter) of the node #[structopt(short = "z", long = "zone")] -- 2.43.0 From 509d256c58ccb1aa0041569556465908453976b3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 17 Mar 2022 16:42:10 +0100 Subject: [PATCH 4/4] Make layout optimization work in relative terms --- src/rpc/layout.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 73b356ad..b9c02c21 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -245,8 +245,13 @@ impl ClusterLayout { let mut option = None; for (i, part) in partitions.iter_mut().enumerate() { for (irm, (idrm, _)) in part.nodes.iter().enumerate() { - let suprm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32 - - target_partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32; + let errratio = |node, parts| { + let tgt = *target_partitions_per_node.get(node).unwrap() as f32; + (parts - tgt) / tgt + }; + let square = |x| x * x; + + let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32; for (idadd, infoadd) in configured_nodes.iter() { // skip replacing a node by itself @@ -255,14 +260,12 @@ impl ClusterLayout { continue; } - let supadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32 - - target_partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32; - // We want to try replacing node idrm by node idadd // if that brings us close to our goal. - let square = |i: i32| i * i; - let oldcost = square(suprm) + square(supadd); - let newcost = square(suprm - 1) + square(supadd + 1); + let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32; + let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd)); + let newcost = + square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.)); if newcost >= oldcost { // not closer to our goal continue; @@ -315,7 +318,9 @@ impl ClusterLayout { // Show statistics println!("New number of partitions per node:"); for (node, npart) in partitions_per_node.iter() { - println!("{:?}\t{}", node, npart); + let tgt = *target_partitions_per_node.get(node).unwrap(); + let pct = 100f32 * (*npart as f32) / (tgt as f32); + println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt); } println!(); -- 2.43.0