Work on the partition assignation algorithm #266

Merged
lx merged 4 commits from bug/layout into main 2022-03-24 14:30:08 +00:00
3 changed files with 99 additions and 60 deletions

View file

@ -43,7 +43,11 @@ pub async fn cmd_assign_role(
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
}; };
let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; let added_nodes = args
.node_ids
.iter()
.map(|node_id| find_matching_node(status.iter().map(|adv| adv.id), node_id))
.collect::<Result<Vec<_>, _>>()?;
let mut layout = fetch_layout(rpc_cli, rpc_host).await?; let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
@ -75,46 +79,51 @@ pub async fn cmd_assign_role(
return Err(Error::Message("Invalid capacity value: 0".into())); return Err(Error::Message("Invalid capacity value: 0".into()));
} }
let new_entry = match roles.get(&added_node) { for added_node in added_nodes {
Some(NodeRoleV(Some(old))) => { let new_entry = match roles.get(&added_node) {
let capacity = match args.capacity { Some(NodeRoleV(Some(old))) => {
Some(c) => Some(c), let capacity = match args.capacity {
None if args.gateway => None, Some(c) => Some(c),
None => old.capacity, None if args.gateway => None,
}; None => old.capacity,
let tags = if args.tags.is_empty() { };
old.tags.clone() let tags = if args.tags.is_empty() {
} else { old.tags.clone()
args.tags } else {
}; args.tags.clone()
NodeRole { };
zone: args.zone.unwrap_or_else(|| old.zone.to_string()), NodeRole {
capacity, zone: args.zone.clone().unwrap_or_else(|| old.zone.to_string()),
tags, capacity,
tags,
}
} }
} _ => {
_ => { let capacity = match args.capacity {
let capacity = match args.capacity { Some(c) => Some(c),
Some(c) => Some(c), None if args.gateway => None,
None if args.gateway => None, None => return Err(Error::Message(
None => return Err(Error::Message( "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), };
}; NodeRole {
NodeRole { zone: args
zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?, .zone
capacity, .clone()
tags: args.tags, .ok_or("Please specifiy a zone with the -z flag")?,
capacity,
tags: args.tags.clone(),
}
} }
} };
};
layout layout
.staging .staging
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
}
send_layout(rpc_cli, rpc_host, layout).await?; send_layout(rpc_cli, rpc_host, layout).await?;
println!("Role change is staged but not yet commited."); println!("Role changes are staged but not yet commited.");
println!("Use `garage layout show` to view staged role changes,"); println!("Use `garage layout show` to view staged role changes,");
println!("and `garage layout apply` to enact staged changes."); println!("and `garage layout apply` to enact staged changes.");
Ok(()) Ok(())

View file

@ -92,8 +92,9 @@ pub enum LayoutOperation {
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
pub struct AssignRoleOpt { pub struct AssignRoleOpt {
/// Node to which to assign role (prefix of hexadecimal node id) /// Node(s) to which to assign role (prefix of hexadecimal node id)
pub(crate) node_id: String, #[structopt(required = true)]
pub(crate) node_ids: Vec<String>,
/// Location (zone or datacenter) of the node /// Location (zone or datacenter) of the node
#[structopt(short = "z", long = "zone")] #[structopt(short = "z", long = "zone")]

View file

@ -172,30 +172,43 @@ impl ClusterLayout {
println!("Calculating updated partition assignation, this may take some time..."); println!("Calculating updated partition assignation, this may take some time...");
println!(); println!();
// Get old partition assignation
let old_partitions = self.parse_assignation_data(); let old_partitions = self.parse_assignation_data();
let mut partitions = old_partitions.clone(); // Start new partition assignation with nodes from old assignation where it is relevant
for part in partitions.iter_mut() { let mut partitions = old_partitions
part.nodes .iter()
.retain(|(_, info)| info.map(|x| x.capacity.is_some()).unwrap_or(false)); .map(|old_part| {
} let mut new_part = PartitionAss::new();
for node in old_part.nodes.iter() {
if let Some(role) = node.1 {
if role.capacity.is_some() {
new_part.add(None, n_zones, node.0, role);
}
}
}
new_part
})
.collect::<Vec<_>>();
// When nodes are removed, or when bootstraping an assignation from // In various cases, not enough nodes will have been added for all partitions
// scratch for a new cluster, the old partitions will have holes (or be empty). // in the step above (e.g. due to node removals, or new zones being added).
// Here we add more nodes to make a complete (sub-optimal) assignation, // Here we add more nodes to make a complete (but sub-optimal) assignation,
// using an initial partition assignation that is calculated using the multi-dc maglev trick // using an initial partition assignation that is calculated using the multi-dc maglev trick
match self.initial_partition_assignation() { match self.initial_partition_assignation() {
Some(initial_partitions) => { Some(initial_partitions) => {
for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) { for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) {
for (id, info) in ipart.nodes.iter() { for (id, info) in ipart.nodes.iter() {
if part.nodes.len() < self.replication_factor { if part.nodes.len() < self.replication_factor {
part.add(part.nodes.len() + 1, n_zones, id, info.unwrap()); part.add(None, n_zones, id, info.unwrap());
} }
} }
assert!(part.nodes.len() == self.replication_factor); assert!(part.nodes.len() == self.replication_factor);
} }
} }
None => { None => {
// Not enough nodes in cluster to build a correct assignation.
// Signal it by returning an error.
return false; return false;
} }
} }
@ -232,8 +245,13 @@ impl ClusterLayout {
let mut option = None; let mut option = None;
for (i, part) in partitions.iter_mut().enumerate() { for (i, part) in partitions.iter_mut().enumerate() {
for (irm, (idrm, _)) in part.nodes.iter().enumerate() { for (irm, (idrm, _)) in part.nodes.iter().enumerate() {
let suprm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32 let errratio = |node, parts| {
- target_partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32; let tgt = *target_partitions_per_node.get(node).unwrap() as f32;
(parts - tgt) / tgt
};
let square = |x| x * x;
let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32;
for (idadd, infoadd) in configured_nodes.iter() { for (idadd, infoadd) in configured_nodes.iter() {
// skip replacing a node by itself // skip replacing a node by itself
@ -242,14 +260,12 @@ impl ClusterLayout {
continue; continue;
} }
let supadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32
- target_partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32;
// We want to try replacing node idrm by node idadd // We want to try replacing node idrm by node idadd
// if that brings us close to our goal. // if that brings us close to our goal.
let square = |i: i32| i * i; let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32;
let oldcost = square(suprm) + square(supadd); let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd));
let newcost = square(suprm - 1) + square(supadd + 1); let newcost =
square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.));
if newcost >= oldcost { if newcost >= oldcost {
// not closer to our goal // not closer to our goal
continue; continue;
@ -259,7 +275,7 @@ impl ClusterLayout {
let mut newpart = part.clone(); let mut newpart = part.clone();
newpart.nodes.remove(irm); newpart.nodes.remove(irm);
if !newpart.add(newpart.nodes.len() + 1, n_zones, idadd, infoadd) { if !newpart.add(None, n_zones, idadd, infoadd) {
continue; continue;
} }
assert!(newpart.nodes.len() == self.replication_factor); assert!(newpart.nodes.len() == self.replication_factor);
@ -302,7 +318,9 @@ impl ClusterLayout {
// Show statistics // Show statistics
println!("New number of partitions per node:"); println!("New number of partitions per node:");
for (node, npart) in partitions_per_node.iter() { for (node, npart) in partitions_per_node.iter() {
println!("{:?}\t{}", node, npart); let tgt = *target_partitions_per_node.get(node).unwrap();
let pct = 100f32 * (*npart as f32) / (tgt as f32);
println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt);
} }
println!(); println!();
@ -394,7 +412,7 @@ impl ClusterLayout {
continue; continue;
} }
for (pos2, &qv) in q.iter().enumerate().skip(*pos) { for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
if partitions[qv].add(rep + 1, n_zones, node_id, node_info) { if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) {
remaining -= 1; remaining -= 1;
*pos = pos2 + 1; *pos = pos2 + 1;
break; break;
@ -551,15 +569,26 @@ impl<'a> PartitionAss<'a> {
} }
} }
// add is a key function in creating a PartitionAss, i.e. the list of nodes
// to which a partition is assigned. It tries to add a certain node id to the
// assignation, but checks that doing so is compatible with the NECESSARY
// condition that the partition assignation must be dispersed over different
// zones (datacenters) if enough zones exist. This is why it takes a n_zones
// parameter, which is the total number of zones that have existing nodes:
// if nodes in the assignation already cover all n_zones zones, then any node
// that is not yet in the assignation can be added. Otherwise, only nodes
// that are in a new zone can be added.
fn add( fn add(
&mut self, &mut self,
target_len: usize, target_len: Option<usize>,
n_zones: usize, n_zones: usize,
node: &'a Uuid, node: &'a Uuid,
role: &'a NodeRole, role: &'a NodeRole,
) -> bool { ) -> bool {
if self.nodes.len() != target_len - 1 { if let Some(tl) = target_len {
return false; if self.nodes.len() != tl - 1 {
return false;
}
} }
let p_zns = self let p_zns = self