2022-05-01 07:57:05 +00:00
|
|
|
use std::cmp::min;
|
2021-11-09 11:24:04 +00:00
|
|
|
use std::cmp::Ordering;
|
2022-05-01 07:57:05 +00:00
|
|
|
use std::collections::HashMap;
|
2021-11-09 11:24:04 +00:00
|
|
|
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
2022-05-01 07:57:05 +00:00
|
|
|
use garage_util::bipartite::*;
|
2021-11-09 11:24:04 +00:00
|
|
|
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
|
|
|
|
use garage_util::data::*;
|
2022-05-01 07:54:19 +00:00
|
|
|
|
|
|
|
use rand::prelude::SliceRandom;
|
2021-11-09 11:24:04 +00:00
|
|
|
|
|
|
|
use crate::ring::*;
|
|
|
|
|
|
|
|
/// The layout of the cluster, i.e. the list of roles
|
|
|
|
/// which are assigned to each cluster node
|
|
|
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
|
|
pub struct ClusterLayout {
|
|
|
|
pub version: u64,
|
|
|
|
|
|
|
|
pub replication_factor: usize,
|
|
|
|
pub roles: LwwMap<Uuid, NodeRoleV>,
|
|
|
|
|
|
|
|
/// node_id_vec: a vector of node IDs with a role assigned
|
|
|
|
/// in the system (this includes gateway nodes).
|
|
|
|
/// The order here is different than the vec stored by `roles`, because:
|
|
|
|
/// 1. non-gateway nodes are first so that they have lower numbers
|
|
|
|
/// 2. nodes that don't have a role are excluded (but they need to
|
|
|
|
/// stay in the CRDT as tombstones)
|
|
|
|
pub node_id_vec: Vec<Uuid>,
|
|
|
|
/// the assignation of data partitions to node, the values
|
|
|
|
/// are indices in node_id_vec
|
|
|
|
#[serde(with = "serde_bytes")]
|
|
|
|
pub ring_assignation_data: Vec<CompactNodeType>,
|
|
|
|
|
|
|
|
/// Role changes which are staged for the next version of the layout
|
|
|
|
pub staging: LwwMap<Uuid, NodeRoleV>,
|
|
|
|
pub staging_hash: Hash,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
|
|
|
pub struct NodeRoleV(pub Option<NodeRole>);
|
|
|
|
|
|
|
|
impl AutoCrdt for NodeRoleV {
|
|
|
|
const WARN_IF_DIFFERENT: bool = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// The user-assigned roles of cluster nodes
|
|
|
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
|
|
|
pub struct NodeRole {
|
|
|
|
/// Datacenter at which this entry belong. This information might be used to perform a better
|
|
|
|
/// geodistribution
|
|
|
|
pub zone: String,
|
|
|
|
/// The (relative) capacity of the node
|
|
|
|
/// If this is set to None, the node does not participate in storing data for the system
|
|
|
|
/// and is only active as an API gateway to other nodes
|
|
|
|
pub capacity: Option<u32>,
|
|
|
|
/// A set of tags to recognize the node
|
|
|
|
pub tags: Vec<String>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl NodeRole {
|
|
|
|
pub fn capacity_string(&self) -> String {
|
|
|
|
match self.capacity {
|
|
|
|
Some(c) => format!("{}", c),
|
|
|
|
None => "gateway".to_string(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ClusterLayout {
|
|
|
|
pub fn new(replication_factor: usize) -> Self {
|
|
|
|
let empty_lwwmap = LwwMap::new();
|
|
|
|
let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]);
|
|
|
|
|
|
|
|
ClusterLayout {
|
|
|
|
version: 0,
|
|
|
|
replication_factor,
|
|
|
|
roles: LwwMap::new(),
|
|
|
|
node_id_vec: Vec::new(),
|
|
|
|
ring_assignation_data: Vec::new(),
|
|
|
|
staging: empty_lwwmap,
|
|
|
|
staging_hash: empty_lwwmap_hash,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn merge(&mut self, other: &ClusterLayout) -> bool {
|
|
|
|
match other.version.cmp(&self.version) {
|
|
|
|
Ordering::Greater => {
|
|
|
|
*self = other.clone();
|
|
|
|
true
|
|
|
|
}
|
|
|
|
Ordering::Equal => {
|
|
|
|
self.staging.merge(&other.staging);
|
|
|
|
|
|
|
|
let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
|
|
|
let changed = new_staging_hash != self.staging_hash;
|
|
|
|
|
|
|
|
self.staging_hash = new_staging_hash;
|
|
|
|
|
|
|
|
changed
|
|
|
|
}
|
|
|
|
Ordering::Less => false,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns a list of IDs of nodes that currently have
|
|
|
|
/// a role in the cluster
|
|
|
|
pub fn node_ids(&self) -> &[Uuid] {
|
|
|
|
&self.node_id_vec[..]
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn num_nodes(&self) -> usize {
|
|
|
|
self.node_id_vec.len()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the role of a node in the layout
|
|
|
|
pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> {
|
|
|
|
match self.roles.get(node) {
|
|
|
|
Some(NodeRoleV(Some(v))) => Some(v),
|
|
|
|
_ => None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Check a cluster layout for internal consistency
|
|
|
|
/// returns true if consistent, false if error
|
|
|
|
pub fn check(&self) -> bool {
|
|
|
|
// Check that the hash of the staging data is correct
|
|
|
|
let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
|
|
|
if staging_hash != self.staging_hash {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check that node_id_vec contains the correct list of nodes
|
|
|
|
let mut expected_nodes = self
|
|
|
|
.roles
|
|
|
|
.items()
|
|
|
|
.iter()
|
|
|
|
.filter(|(_, _, v)| v.0.is_some())
|
|
|
|
.map(|(id, _, _)| *id)
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
expected_nodes.sort();
|
|
|
|
let mut node_id_vec = self.node_id_vec.clone();
|
|
|
|
node_id_vec.sort();
|
|
|
|
if expected_nodes != node_id_vec {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check that the assignation data has the correct length
|
|
|
|
if self.ring_assignation_data.len() != (1 << PARTITION_BITS) * self.replication_factor {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check that the assigned nodes are correct identifiers
|
|
|
|
// of nodes that are assigned a role
|
|
|
|
// and that role is not the role of a gateway nodes
|
|
|
|
for x in self.ring_assignation_data.iter() {
|
|
|
|
if *x as usize >= self.node_id_vec.len() {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
let node = self.node_id_vec[*x as usize];
|
|
|
|
match self.roles.get(&node) {
|
|
|
|
Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (),
|
|
|
|
_ => return false,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
true
|
|
|
|
}
|
|
|
|
|
2022-05-01 07:57:05 +00:00
|
|
|
/// This function calculates a new partition-to-node assignation.
|
|
|
|
/// The computed assignation maximizes the capacity of a
|
|
|
|
/// partition (assuming all partitions have the same size).
|
|
|
|
/// Among such optimal assignation, it minimizes the distance to
|
|
|
|
/// the former assignation (if any) to minimize the amount of
|
|
|
|
/// data to be moved. A heuristic ensures node triplets
|
|
|
|
/// dispersion (in garage_util::bipartite::optimize_matching()).
|
|
|
|
pub fn calculate_partition_assignation(&mut self) -> bool {
|
|
|
|
//The nodes might have been updated, some might have been deleted.
|
|
|
|
//So we need to first update the list of nodes and retrieve the
|
|
|
|
//assignation.
|
|
|
|
let old_node_assignation = self.update_nodes_and_ring();
|
|
|
|
|
|
|
|
let (node_zone, _) = self.get_node_zone_capacity();
|
|
|
|
|
|
|
|
//We compute the optimal number of partition to assign to
|
|
|
|
//every node and zone.
|
|
|
|
if let Some((part_per_nod, part_per_zone)) = self.optimal_proportions() {
|
|
|
|
//We collect part_per_zone in a vec to not rely on the
|
|
|
|
//arbitrary order in which elements are iterated in
|
|
|
|
//Hashmap::iter()
|
|
|
|
let part_per_zone_vec = part_per_zone
|
|
|
|
.iter()
|
|
|
|
.map(|(x, y)| (x.clone(), *y))
|
|
|
|
.collect::<Vec<(String, usize)>>();
|
|
|
|
//We create an indexing of the zones
|
|
|
|
let mut zone_id = HashMap::<String, usize>::new();
|
|
|
|
for i in 0..part_per_zone_vec.len() {
|
|
|
|
zone_id.insert(part_per_zone_vec[i].0.clone(), i);
|
|
|
|
}
|
2021-11-09 11:24:04 +00:00
|
|
|
|
2022-05-01 07:57:05 +00:00
|
|
|
//We compute a candidate for the new partition to zone
|
|
|
|
//assignation.
|
|
|
|
let nb_zones = part_per_zone.len();
|
|
|
|
let nb_nodes = part_per_nod.len();
|
|
|
|
let nb_partitions = 1 << PARTITION_BITS;
|
|
|
|
let left_cap_vec = vec![self.replication_factor as u32; nb_partitions];
|
|
|
|
let right_cap_vec = part_per_zone_vec.iter().map(|(_, y)| *y as u32).collect();
|
|
|
|
let mut zone_assignation = dinic_compute_matching(left_cap_vec, right_cap_vec);
|
|
|
|
|
|
|
|
//We create the structure for the partition-to-node assignation.
|
|
|
|
let mut node_assignation = vec![vec![None; self.replication_factor]; nb_partitions];
|
|
|
|
//We will decrement part_per_nod to keep track of the number
|
|
|
|
//of partitions that we still have to associate.
|
|
|
|
let mut part_per_nod = part_per_nod.clone();
|
|
|
|
|
|
|
|
//We minimize the distance to the former assignation(if any)
|
|
|
|
|
|
|
|
//We get the id of the zones of the former assignation
|
|
|
|
//(and the id no_zone if there is no node assignated)
|
|
|
|
let no_zone = part_per_zone_vec.len();
|
|
|
|
let old_zone_assignation: Vec<Vec<usize>> = old_node_assignation
|
|
|
|
.iter()
|
|
|
|
.map(|x| {
|
|
|
|
x.iter()
|
|
|
|
.map(|id| match *id {
|
|
|
|
Some(i) => zone_id[&node_zone[i]],
|
|
|
|
None => no_zone,
|
|
|
|
})
|
|
|
|
.collect()
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
//We minimize the distance to the former zone assignation
|
|
|
|
zone_assignation =
|
|
|
|
optimize_matching(&old_zone_assignation, &zone_assignation, nb_zones + 1); //+1 for no_zone
|
|
|
|
|
|
|
|
//We need to assign partitions to nodes in their zone
|
|
|
|
//We first put the nodes assignation that can stay the same
|
|
|
|
for i in 0..nb_partitions {
|
|
|
|
for j in 0..self.replication_factor {
|
|
|
|
if let Some(Some(former_node)) = old_node_assignation[i].iter().find(|x| {
|
|
|
|
if let Some(id) = x {
|
|
|
|
zone_id[&node_zone[*id]] == zone_assignation[i][j]
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
}) {
|
|
|
|
if part_per_nod[*former_node] > 0 {
|
|
|
|
node_assignation[i][j] = Some(*former_node);
|
|
|
|
part_per_nod[*former_node] -= 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-11-09 11:24:04 +00:00
|
|
|
|
2022-05-01 07:57:05 +00:00
|
|
|
//We complete the assignation of partitions to nodes
|
|
|
|
let mut rng = rand::thread_rng();
|
|
|
|
for i in 0..nb_partitions {
|
|
|
|
for j in 0..self.replication_factor {
|
|
|
|
if node_assignation[i][j] == None {
|
|
|
|
let possible_nodes: Vec<usize> = (0..nb_nodes)
|
|
|
|
.filter(|id| {
|
|
|
|
zone_id[&node_zone[*id]] == zone_assignation[i][j]
|
|
|
|
&& part_per_nod[*id] > 0
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
assert!(possible_nodes.len() > 0);
|
|
|
|
//We randomly pick a node
|
|
|
|
if let Some(nod) = possible_nodes.choose(&mut rng) {
|
|
|
|
node_assignation[i][j] = Some(*nod);
|
|
|
|
part_per_nod[*nod] -= 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
//We write the assignation in the 1D table
|
|
|
|
self.ring_assignation_data = Vec::<CompactNodeType>::new();
|
|
|
|
for i in 0..nb_partitions {
|
|
|
|
for j in 0..self.replication_factor {
|
|
|
|
if let Some(id) = node_assignation[i][j] {
|
|
|
|
self.ring_assignation_data.push(id as CompactNodeType);
|
|
|
|
} else {
|
|
|
|
assert!(false)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-11-09 11:24:04 +00:00
|
|
|
|
2022-05-01 07:57:05 +00:00
|
|
|
true
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// The LwwMap of node roles might have changed. This function updates the node_id_vec
|
|
|
|
/// and returns the assignation given by ring, with the new indices of the nodes, and
|
|
|
|
/// None of the node is not present anymore.
|
|
|
|
/// We work with the assumption that only this function and calculate_new_assignation
|
|
|
|
/// do modify assignation_ring and node_id_vec.
|
|
|
|
fn update_nodes_and_ring(&mut self) -> Vec<Vec<Option<usize>>> {
|
|
|
|
let nb_partitions = 1usize << PARTITION_BITS;
|
|
|
|
let mut node_assignation = vec![vec![None; self.replication_factor]; nb_partitions];
|
|
|
|
let rf = self.replication_factor;
|
|
|
|
let ring = &self.ring_assignation_data;
|
|
|
|
|
|
|
|
let new_node_id_vec: Vec<Uuid> = self.roles.items().iter().map(|(k, _, _)| *k).collect();
|
|
|
|
|
|
|
|
if ring.len() == rf * nb_partitions {
|
|
|
|
for i in 0..nb_partitions {
|
|
|
|
for j in 0..self.replication_factor {
|
|
|
|
node_assignation[i][j] = new_node_id_vec
|
|
|
|
.iter()
|
|
|
|
.position(|id| *id == self.node_id_vec[ring[i * rf + j] as usize]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
self.node_id_vec = new_node_id_vec;
|
|
|
|
self.ring_assignation_data = vec![];
|
|
|
|
return node_assignation;
|
|
|
|
}
|
|
|
|
|
|
|
|
///This function compute the number of partition to assign to
|
|
|
|
///every node and zone, so that every partition is replicated
|
|
|
|
///self.replication_factor times and the capacity of a partition
|
|
|
|
///is maximized.
|
|
|
|
fn optimal_proportions(&mut self) -> Option<(Vec<usize>, HashMap<String, usize>)> {
|
|
|
|
let mut zone_capacity: HashMap<String, u32> = HashMap::new();
|
|
|
|
|
|
|
|
let (node_zone, node_capacity) = self.get_node_zone_capacity();
|
|
|
|
let nb_nodes = self.node_id_vec.len();
|
|
|
|
|
|
|
|
for i in 0..nb_nodes {
|
|
|
|
if zone_capacity.contains_key(&node_zone[i]) {
|
|
|
|
zone_capacity.insert(
|
|
|
|
node_zone[i].clone(),
|
|
|
|
zone_capacity[&node_zone[i]] + node_capacity[i],
|
|
|
|
);
|
|
|
|
} else {
|
|
|
|
zone_capacity.insert(node_zone[i].clone(), node_capacity[i]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
//Compute the optimal number of partitions per zone
|
|
|
|
let sum_capacities: u32 = zone_capacity.values().sum();
|
|
|
|
|
|
|
|
if sum_capacities <= 0 {
|
|
|
|
println!("No storage capacity in the network.");
|
|
|
|
return None;
|
|
|
|
}
|
|
|
|
|
|
|
|
let nb_partitions = 1 << PARTITION_BITS;
|
|
|
|
|
|
|
|
//Initially we would like to use zones porportionally to
|
|
|
|
//their capacity.
|
|
|
|
//However, a large zone can be associated to at most
|
|
|
|
//nb_partitions to ensure replication of the date.
|
|
|
|
//So we take the min with nb_partitions:
|
|
|
|
let mut part_per_zone: HashMap<String, usize> = zone_capacity
|
|
|
|
.iter()
|
|
|
|
.map(|(k, v)| {
|
|
|
|
(
|
|
|
|
k.clone(),
|
|
|
|
min(
|
|
|
|
nb_partitions,
|
|
|
|
(self.replication_factor * nb_partitions * *v as usize)
|
|
|
|
/ sum_capacities as usize,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
//The replication_factor-1 upper bounds the number of
|
|
|
|
//part_per_zones that are greater than nb_partitions
|
|
|
|
for _ in 1..self.replication_factor {
|
|
|
|
//The number of partitions that are not assignated to
|
|
|
|
//a zone that takes nb_partitions.
|
|
|
|
let sum_capleft: u32 = zone_capacity
|
|
|
|
.keys()
|
|
|
|
.filter(|k| part_per_zone[*k] < nb_partitions)
|
|
|
|
.map(|k| zone_capacity[k])
|
|
|
|
.sum();
|
|
|
|
|
|
|
|
//The number of replication of the data that we need
|
|
|
|
//to ensure.
|
|
|
|
let repl_left = self.replication_factor
|
|
|
|
- part_per_zone
|
|
|
|
.values()
|
|
|
|
.filter(|x| **x == nb_partitions)
|
|
|
|
.count();
|
|
|
|
if repl_left == 0 {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
for k in zone_capacity.keys() {
|
|
|
|
if part_per_zone[k] != nb_partitions {
|
|
|
|
part_per_zone.insert(
|
|
|
|
k.to_string(),
|
|
|
|
min(
|
|
|
|
nb_partitions,
|
|
|
|
(nb_partitions * zone_capacity[k] as usize * repl_left)
|
|
|
|
/ sum_capleft as usize,
|
|
|
|
),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
//Now we divide the zone's partition share proportionally
|
|
|
|
//between their nodes.
|
|
|
|
|
|
|
|
let mut part_per_nod: Vec<usize> = (0..nb_nodes)
|
|
|
|
.map(|i| {
|
|
|
|
(part_per_zone[&node_zone[i]] * node_capacity[i] as usize)
|
|
|
|
/ zone_capacity[&node_zone[i]] as usize
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
//We must update the part_per_zone to make it correspond to
|
|
|
|
//part_per_nod (because of integer rounding)
|
|
|
|
part_per_zone = part_per_zone.iter().map(|(k, _)| (k.clone(), 0)).collect();
|
|
|
|
for i in 0..nb_nodes {
|
|
|
|
part_per_zone.insert(
|
|
|
|
node_zone[i].clone(),
|
|
|
|
part_per_zone[&node_zone[i]] + part_per_nod[i],
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
//Because of integer rounding, the total sum of part_per_nod
|
|
|
|
//might not be replication_factor*nb_partitions.
|
|
|
|
// We need at most to add 1 to every non maximal value of
|
|
|
|
// part_per_nod. The capacity of a partition will be bounded
|
|
|
|
// by the minimal value of
|
|
|
|
// node_capacity_vec[i]/part_per_nod[i]
|
|
|
|
// so we try to maximize this minimal value, keeping the
|
|
|
|
// part_per_zone capped
|
|
|
|
|
|
|
|
let discrepancy: usize =
|
|
|
|
nb_partitions * self.replication_factor - part_per_nod.iter().sum::<usize>();
|
|
|
|
|
|
|
|
//We use a stupid O(N^2) algorithm. If the number of nodes
|
|
|
|
//is actually expected to be high, one should optimize this.
|
|
|
|
|
|
|
|
for _ in 0..discrepancy {
|
|
|
|
if let Some(idmax) = (0..nb_nodes)
|
|
|
|
.filter(|i| part_per_zone[&node_zone[*i]] < nb_partitions)
|
|
|
|
.max_by(|i, j| {
|
|
|
|
(node_capacity[*i] * (part_per_nod[*j] + 1) as u32)
|
|
|
|
.cmp(&(node_capacity[*j] * (part_per_nod[*i] + 1) as u32))
|
|
|
|
}) {
|
|
|
|
part_per_nod[idmax] += 1;
|
|
|
|
part_per_zone.insert(
|
|
|
|
node_zone[idmax].clone(),
|
|
|
|
part_per_zone[&node_zone[idmax]] + 1,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
2021-11-09 11:24:04 +00:00
|
|
|
|
2022-05-01 07:57:05 +00:00
|
|
|
//We check the algorithm consistency
|
|
|
|
|
|
|
|
let discrepancy: usize =
|
|
|
|
nb_partitions * self.replication_factor - part_per_nod.iter().sum::<usize>();
|
|
|
|
assert!(discrepancy == 0);
|
|
|
|
assert!(if let Some(v) = part_per_zone.values().max() {
|
|
|
|
*v <= nb_partitions
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
});
|
|
|
|
|
|
|
|
Some((part_per_nod, part_per_zone))
|
|
|
|
}
|
|
|
|
|
|
|
|
//Returns vectors of zone and capacity; indexed by the same (temporary)
|
|
|
|
//indices as node_id_vec.
|
|
|
|
fn get_node_zone_capacity(&self) -> (Vec<String>, Vec<u32>) {
|
|
|
|
let node_zone = self
|
|
|
|
.node_id_vec
|
|
|
|
.iter()
|
|
|
|
.map(|id_nod| match self.node_role(id_nod) {
|
|
|
|
Some(NodeRole {
|
|
|
|
zone,
|
|
|
|
capacity: _,
|
|
|
|
tags: _,
|
|
|
|
}) => zone.clone(),
|
|
|
|
_ => "".to_string(),
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
let node_capacity = self
|
|
|
|
.node_id_vec
|
|
|
|
.iter()
|
|
|
|
.map(|id_nod| match self.node_role(id_nod) {
|
|
|
|
Some(NodeRole {
|
|
|
|
zone: _,
|
|
|
|
capacity,
|
|
|
|
tags: _,
|
|
|
|
}) => {
|
|
|
|
if let Some(c) = capacity {
|
|
|
|
*c
|
|
|
|
} else {
|
|
|
|
0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ => 0,
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
(node_zone, node_capacity)
|
|
|
|
}
|
|
|
|
}
|
2021-11-09 11:24:04 +00:00
|
|
|
|
2022-05-01 07:54:19 +00:00
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
2022-05-01 07:57:05 +00:00
|
|
|
use super::*;
|
|
|
|
use itertools::Itertools;
|
|
|
|
|
|
|
|
fn check_assignation(cl: &ClusterLayout) {
|
|
|
|
//Check that input data has the right format
|
|
|
|
let nb_partitions = 1usize << PARTITION_BITS;
|
|
|
|
assert!([1, 2, 3].contains(&cl.replication_factor));
|
|
|
|
assert!(cl.ring_assignation_data.len() == nb_partitions * cl.replication_factor);
|
|
|
|
|
|
|
|
let (node_zone, node_capacity) = cl.get_node_zone_capacity();
|
|
|
|
|
|
|
|
//Check that is is a correct assignation with zone redundancy
|
|
|
|
let rf = cl.replication_factor;
|
|
|
|
for i in 0..nb_partitions {
|
|
|
|
assert!(
|
|
|
|
rf == cl.ring_assignation_data[rf * i..rf * (i + 1)]
|
|
|
|
.iter()
|
|
|
|
.map(|nod| node_zone[*nod as usize].clone())
|
|
|
|
.unique()
|
|
|
|
.count()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
let nb_nodes = cl.node_id_vec.len();
|
|
|
|
//Check optimality
|
|
|
|
let node_nb_part = (0..nb_nodes)
|
|
|
|
.map(|i| {
|
|
|
|
cl.ring_assignation_data
|
|
|
|
.iter()
|
|
|
|
.filter(|x| **x == i as u8)
|
|
|
|
.count()
|
|
|
|
})
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
|
|
|
|
let zone_vec = node_zone.iter().unique().collect::<Vec<_>>();
|
|
|
|
let zone_nb_part = zone_vec
|
|
|
|
.iter()
|
|
|
|
.map(|z| {
|
|
|
|
cl.ring_assignation_data
|
|
|
|
.iter()
|
|
|
|
.filter(|x| node_zone[**x as usize] == **z)
|
|
|
|
.count()
|
|
|
|
})
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
|
|
|
|
//Check optimality of the zone assignation : would it be better for the
|
|
|
|
//node_capacity/node_partitions ratio to change the assignation of a partition
|
|
|
|
|
|
|
|
if let Some(idmin) = (0..nb_nodes).min_by(|i, j| {
|
|
|
|
(node_capacity[*i] * node_nb_part[*j] as u32)
|
|
|
|
.cmp(&(node_capacity[*j] * node_nb_part[*i] as u32))
|
|
|
|
}) {
|
|
|
|
if let Some(idnew) = (0..nb_nodes)
|
|
|
|
.filter(|i| {
|
|
|
|
if let Some(p) = zone_vec.iter().position(|z| **z == node_zone[*i]) {
|
|
|
|
zone_nb_part[p] < nb_partitions
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.max_by(|i, j| {
|
|
|
|
(node_capacity[*i] * (node_nb_part[*j] as u32 + 1))
|
|
|
|
.cmp(&(node_capacity[*j] * (node_nb_part[*i] as u32 + 1)))
|
|
|
|
}) {
|
|
|
|
assert!(
|
|
|
|
node_capacity[idmin] * (node_nb_part[idnew] as u32 + 1)
|
|
|
|
>= node_capacity[idnew] * node_nb_part[idmin] as u32
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
//In every zone, check optimality of the nod assignation
|
|
|
|
for z in zone_vec {
|
|
|
|
let node_of_z_iter = (0..nb_nodes).filter(|id| node_zone[*id] == *z);
|
|
|
|
if let Some(idmin) = node_of_z_iter.clone().min_by(|i, j| {
|
|
|
|
(node_capacity[*i] * node_nb_part[*j] as u32)
|
|
|
|
.cmp(&(node_capacity[*j] * node_nb_part[*i] as u32))
|
|
|
|
}) {
|
|
|
|
if let Some(idnew) = node_of_z_iter.min_by(|i, j| {
|
|
|
|
(node_capacity[*i] * (node_nb_part[*j] as u32 + 1))
|
|
|
|
.cmp(&(node_capacity[*j] * (node_nb_part[*i] as u32 + 1)))
|
|
|
|
}) {
|
|
|
|
assert!(
|
|
|
|
node_capacity[idmin] * (node_nb_part[idnew] as u32 + 1)
|
|
|
|
>= node_capacity[idnew] * node_nb_part[idmin] as u32
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn update_layout(
|
|
|
|
cl: &mut ClusterLayout,
|
|
|
|
node_id_vec: &Vec<u8>,
|
|
|
|
node_capacity_vec: &Vec<u32>,
|
|
|
|
node_zone_vec: &Vec<String>,
|
|
|
|
) {
|
|
|
|
for i in 0..node_id_vec.len() {
|
|
|
|
if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) {
|
|
|
|
cl.node_id_vec.push(x);
|
|
|
|
}
|
2021-11-09 11:24:04 +00:00
|
|
|
|
2022-05-01 07:57:05 +00:00
|
|
|
let update = cl.roles.update_mutator(
|
|
|
|
cl.node_id_vec[i],
|
|
|
|
NodeRoleV(Some(NodeRole {
|
|
|
|
zone: (node_zone_vec[i].to_string()),
|
|
|
|
capacity: (Some(node_capacity_vec[i])),
|
|
|
|
tags: (vec![]),
|
|
|
|
})),
|
|
|
|
);
|
|
|
|
cl.roles.merge(&update);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_assignation() {
|
|
|
|
let mut node_id_vec = vec![1, 2, 3];
|
|
|
|
let mut node_capacity_vec = vec![4000, 1000, 2000];
|
|
|
|
let mut node_zone_vec = vec!["A", "B", "C"]
|
|
|
|
.into_iter()
|
|
|
|
.map(|x| x.to_string())
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
let mut cl = ClusterLayout {
|
|
|
|
node_id_vec: vec![],
|
2021-11-09 11:24:04 +00:00
|
|
|
|
2022-05-01 07:57:05 +00:00
|
|
|
roles: LwwMap::new(),
|
2021-11-09 11:24:04 +00:00
|
|
|
|
2022-05-01 07:57:05 +00:00
|
|
|
replication_factor: 3,
|
|
|
|
ring_assignation_data: vec![],
|
|
|
|
version: 0,
|
|
|
|
staging: LwwMap::new(),
|
|
|
|
staging_hash: sha256sum(&[1; 32]),
|
|
|
|
};
|
|
|
|
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
|
|
|
|
cl.calculate_partition_assignation();
|
|
|
|
check_assignation(&cl);
|
|
|
|
|
|
|
|
node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
|
|
|
|
node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000];
|
|
|
|
node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"]
|
|
|
|
.into_iter()
|
|
|
|
.map(|x| x.to_string())
|
|
|
|
.collect();
|
|
|
|
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
|
|
|
|
cl.calculate_partition_assignation();
|
|
|
|
check_assignation(&cl);
|
|
|
|
|
|
|
|
node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000];
|
|
|
|
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
|
|
|
|
cl.calculate_partition_assignation();
|
|
|
|
check_assignation(&cl);
|
|
|
|
|
|
|
|
node_capacity_vec = vec![4000, 4000, 2000, 7000, 1000, 9000, 2000, 10, 2000];
|
|
|
|
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
|
|
|
|
cl.calculate_partition_assignation();
|
|
|
|
check_assignation(&cl);
|
|
|
|
}
|
|
|
|
}
|