forked from Deuxfleurs/garage
Compare commits
4 commits
main
...
bug/layout
Author | SHA1 | Date | |
---|---|---|---|
a74b6baa3b | |||
b19dd1a3a2 | |||
f2a504bf9b | |||
a0ec6c49be |
3 changed files with 99 additions and 60 deletions
|
@ -43,7 +43,11 @@ pub async fn cmd_assign_role(
|
||||||
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
|
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
|
||||||
};
|
};
|
||||||
|
|
||||||
let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?;
|
let added_nodes = args
|
||||||
|
.node_ids
|
||||||
|
.iter()
|
||||||
|
.map(|node_id| find_matching_node(status.iter().map(|adv| adv.id), node_id))
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
|
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
|
||||||
|
|
||||||
|
@ -75,6 +79,7 @@ pub async fn cmd_assign_role(
|
||||||
return Err(Error::Message("Invalid capacity value: 0".into()));
|
return Err(Error::Message("Invalid capacity value: 0".into()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for added_node in added_nodes {
|
||||||
let new_entry = match roles.get(&added_node) {
|
let new_entry = match roles.get(&added_node) {
|
||||||
Some(NodeRoleV(Some(old))) => {
|
Some(NodeRoleV(Some(old))) => {
|
||||||
let capacity = match args.capacity {
|
let capacity = match args.capacity {
|
||||||
|
@ -85,10 +90,10 @@ pub async fn cmd_assign_role(
|
||||||
let tags = if args.tags.is_empty() {
|
let tags = if args.tags.is_empty() {
|
||||||
old.tags.clone()
|
old.tags.clone()
|
||||||
} else {
|
} else {
|
||||||
args.tags
|
args.tags.clone()
|
||||||
};
|
};
|
||||||
NodeRole {
|
NodeRole {
|
||||||
zone: args.zone.unwrap_or_else(|| old.zone.to_string()),
|
zone: args.zone.clone().unwrap_or_else(|| old.zone.to_string()),
|
||||||
capacity,
|
capacity,
|
||||||
tags,
|
tags,
|
||||||
}
|
}
|
||||||
|
@ -101,9 +106,12 @@ pub async fn cmd_assign_role(
|
||||||
"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
|
"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
|
||||||
};
|
};
|
||||||
NodeRole {
|
NodeRole {
|
||||||
zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?,
|
zone: args
|
||||||
|
.zone
|
||||||
|
.clone()
|
||||||
|
.ok_or("Please specifiy a zone with the -z flag")?,
|
||||||
capacity,
|
capacity,
|
||||||
tags: args.tags,
|
tags: args.tags.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -111,10 +119,11 @@ pub async fn cmd_assign_role(
|
||||||
layout
|
layout
|
||||||
.staging
|
.staging
|
||||||
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
|
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
|
||||||
|
}
|
||||||
|
|
||||||
send_layout(rpc_cli, rpc_host, layout).await?;
|
send_layout(rpc_cli, rpc_host, layout).await?;
|
||||||
|
|
||||||
println!("Role change is staged but not yet commited.");
|
println!("Role changes are staged but not yet commited.");
|
||||||
println!("Use `garage layout show` to view staged role changes,");
|
println!("Use `garage layout show` to view staged role changes,");
|
||||||
println!("and `garage layout apply` to enact staged changes.");
|
println!("and `garage layout apply` to enact staged changes.");
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -92,8 +92,9 @@ pub enum LayoutOperation {
|
||||||
|
|
||||||
#[derive(StructOpt, Debug)]
|
#[derive(StructOpt, Debug)]
|
||||||
pub struct AssignRoleOpt {
|
pub struct AssignRoleOpt {
|
||||||
/// Node to which to assign role (prefix of hexadecimal node id)
|
/// Node(s) to which to assign role (prefix of hexadecimal node id)
|
||||||
pub(crate) node_id: String,
|
#[structopt(required = true)]
|
||||||
|
pub(crate) node_ids: Vec<String>,
|
||||||
|
|
||||||
/// Location (zone or datacenter) of the node
|
/// Location (zone or datacenter) of the node
|
||||||
#[structopt(short = "z", long = "zone")]
|
#[structopt(short = "z", long = "zone")]
|
||||||
|
|
|
@ -172,30 +172,43 @@ impl ClusterLayout {
|
||||||
println!("Calculating updated partition assignation, this may take some time...");
|
println!("Calculating updated partition assignation, this may take some time...");
|
||||||
println!();
|
println!();
|
||||||
|
|
||||||
|
// Get old partition assignation
|
||||||
let old_partitions = self.parse_assignation_data();
|
let old_partitions = self.parse_assignation_data();
|
||||||
|
|
||||||
let mut partitions = old_partitions.clone();
|
// Start new partition assignation with nodes from old assignation where it is relevant
|
||||||
for part in partitions.iter_mut() {
|
let mut partitions = old_partitions
|
||||||
part.nodes
|
.iter()
|
||||||
.retain(|(_, info)| info.map(|x| x.capacity.is_some()).unwrap_or(false));
|
.map(|old_part| {
|
||||||
|
let mut new_part = PartitionAss::new();
|
||||||
|
for node in old_part.nodes.iter() {
|
||||||
|
if let Some(role) = node.1 {
|
||||||
|
if role.capacity.is_some() {
|
||||||
|
new_part.add(None, n_zones, node.0, role);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
new_part
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
// When nodes are removed, or when bootstraping an assignation from
|
// In various cases, not enough nodes will have been added for all partitions
|
||||||
// scratch for a new cluster, the old partitions will have holes (or be empty).
|
// in the step above (e.g. due to node removals, or new zones being added).
|
||||||
// Here we add more nodes to make a complete (sub-optimal) assignation,
|
// Here we add more nodes to make a complete (but sub-optimal) assignation,
|
||||||
// using an initial partition assignation that is calculated using the multi-dc maglev trick
|
// using an initial partition assignation that is calculated using the multi-dc maglev trick
|
||||||
match self.initial_partition_assignation() {
|
match self.initial_partition_assignation() {
|
||||||
Some(initial_partitions) => {
|
Some(initial_partitions) => {
|
||||||
for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) {
|
for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) {
|
||||||
for (id, info) in ipart.nodes.iter() {
|
for (id, info) in ipart.nodes.iter() {
|
||||||
if part.nodes.len() < self.replication_factor {
|
if part.nodes.len() < self.replication_factor {
|
||||||
part.add(part.nodes.len() + 1, n_zones, id, info.unwrap());
|
part.add(None, n_zones, id, info.unwrap());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert!(part.nodes.len() == self.replication_factor);
|
assert!(part.nodes.len() == self.replication_factor);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
|
// Not enough nodes in cluster to build a correct assignation.
|
||||||
|
// Signal it by returning an error.
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -232,8 +245,13 @@ impl ClusterLayout {
|
||||||
let mut option = None;
|
let mut option = None;
|
||||||
for (i, part) in partitions.iter_mut().enumerate() {
|
for (i, part) in partitions.iter_mut().enumerate() {
|
||||||
for (irm, (idrm, _)) in part.nodes.iter().enumerate() {
|
for (irm, (idrm, _)) in part.nodes.iter().enumerate() {
|
||||||
let suprm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32
|
let errratio = |node, parts| {
|
||||||
- target_partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32;
|
let tgt = *target_partitions_per_node.get(node).unwrap() as f32;
|
||||||
|
(parts - tgt) / tgt
|
||||||
|
};
|
||||||
|
let square = |x| x * x;
|
||||||
|
|
||||||
|
let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32;
|
||||||
|
|
||||||
for (idadd, infoadd) in configured_nodes.iter() {
|
for (idadd, infoadd) in configured_nodes.iter() {
|
||||||
// skip replacing a node by itself
|
// skip replacing a node by itself
|
||||||
|
@ -242,14 +260,12 @@ impl ClusterLayout {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let supadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32
|
|
||||||
- target_partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32;
|
|
||||||
|
|
||||||
// We want to try replacing node idrm by node idadd
|
// We want to try replacing node idrm by node idadd
|
||||||
// if that brings us close to our goal.
|
// if that brings us close to our goal.
|
||||||
let square = |i: i32| i * i;
|
let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32;
|
||||||
let oldcost = square(suprm) + square(supadd);
|
let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd));
|
||||||
let newcost = square(suprm - 1) + square(supadd + 1);
|
let newcost =
|
||||||
|
square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.));
|
||||||
if newcost >= oldcost {
|
if newcost >= oldcost {
|
||||||
// not closer to our goal
|
// not closer to our goal
|
||||||
continue;
|
continue;
|
||||||
|
@ -259,7 +275,7 @@ impl ClusterLayout {
|
||||||
let mut newpart = part.clone();
|
let mut newpart = part.clone();
|
||||||
|
|
||||||
newpart.nodes.remove(irm);
|
newpart.nodes.remove(irm);
|
||||||
if !newpart.add(newpart.nodes.len() + 1, n_zones, idadd, infoadd) {
|
if !newpart.add(None, n_zones, idadd, infoadd) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
assert!(newpart.nodes.len() == self.replication_factor);
|
assert!(newpart.nodes.len() == self.replication_factor);
|
||||||
|
@ -302,7 +318,9 @@ impl ClusterLayout {
|
||||||
// Show statistics
|
// Show statistics
|
||||||
println!("New number of partitions per node:");
|
println!("New number of partitions per node:");
|
||||||
for (node, npart) in partitions_per_node.iter() {
|
for (node, npart) in partitions_per_node.iter() {
|
||||||
println!("{:?}\t{}", node, npart);
|
let tgt = *target_partitions_per_node.get(node).unwrap();
|
||||||
|
let pct = 100f32 * (*npart as f32) / (tgt as f32);
|
||||||
|
println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt);
|
||||||
}
|
}
|
||||||
println!();
|
println!();
|
||||||
|
|
||||||
|
@ -394,7 +412,7 @@ impl ClusterLayout {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
|
for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
|
||||||
if partitions[qv].add(rep + 1, n_zones, node_id, node_info) {
|
if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) {
|
||||||
remaining -= 1;
|
remaining -= 1;
|
||||||
*pos = pos2 + 1;
|
*pos = pos2 + 1;
|
||||||
break;
|
break;
|
||||||
|
@ -551,16 +569,27 @@ impl<'a> PartitionAss<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add is a key function in creating a PartitionAss, i.e. the list of nodes
|
||||||
|
// to which a partition is assigned. It tries to add a certain node id to the
|
||||||
|
// assignation, but checks that doing so is compatible with the NECESSARY
|
||||||
|
// condition that the partition assignation must be dispersed over different
|
||||||
|
// zones (datacenters) if enough zones exist. This is why it takes a n_zones
|
||||||
|
// parameter, which is the total number of zones that have existing nodes:
|
||||||
|
// if nodes in the assignation already cover all n_zones zones, then any node
|
||||||
|
// that is not yet in the assignation can be added. Otherwise, only nodes
|
||||||
|
// that are in a new zone can be added.
|
||||||
fn add(
|
fn add(
|
||||||
&mut self,
|
&mut self,
|
||||||
target_len: usize,
|
target_len: Option<usize>,
|
||||||
n_zones: usize,
|
n_zones: usize,
|
||||||
node: &'a Uuid,
|
node: &'a Uuid,
|
||||||
role: &'a NodeRole,
|
role: &'a NodeRole,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
if self.nodes.len() != target_len - 1 {
|
if let Some(tl) = target_len {
|
||||||
|
if self.nodes.len() != tl - 1 {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let p_zns = self
|
let p_zns = self
|
||||||
.nodes
|
.nodes
|
||||||
|
|
Loading…
Reference in a new issue