use std::cmp::min; use std::cmp::Ordering; use std::collections::HashMap; use serde::{Deserialize, Serialize}; use garage_util::bipartite::*; use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::data::*; use rand::prelude::SliceRandom; use crate::ring::*; /// The layout of the cluster, i.e. the list of roles /// which are assigned to each cluster node #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ClusterLayout { pub version: u64, pub replication_factor: usize, pub roles: LwwMap, /// node_id_vec: a vector of node IDs with a role assigned /// in the system (this includes gateway nodes). /// The order here is different than the vec stored by `roles`, because: /// 1. non-gateway nodes are first so that they have lower numbers /// 2. nodes that don't have a role are excluded (but they need to /// stay in the CRDT as tombstones) pub node_id_vec: Vec, /// the assignation of data partitions to node, the values /// are indices in node_id_vec #[serde(with = "serde_bytes")] pub ring_assignation_data: Vec, /// Role changes which are staged for the next version of the layout pub staging: LwwMap, pub staging_hash: Hash, } #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRoleV(pub Option); impl AutoCrdt for NodeRoleV { const WARN_IF_DIFFERENT: bool = true; } /// The user-assigned roles of cluster nodes #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRole { /// Datacenter at which this entry belong. This information might be used to perform a better /// geodistribution pub zone: String, /// The (relative) capacity of the node /// If this is set to None, the node does not participate in storing data for the system /// and is only active as an API gateway to other nodes pub capacity: Option, /// A set of tags to recognize the node pub tags: Vec, } impl NodeRole { pub fn capacity_string(&self) -> String { match self.capacity { Some(c) => format!("{}", c), None => "gateway".to_string(), } } } impl ClusterLayout { pub fn new(replication_factor: usize) -> Self { let empty_lwwmap = LwwMap::new(); let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]); ClusterLayout { version: 0, replication_factor, roles: LwwMap::new(), node_id_vec: Vec::new(), ring_assignation_data: Vec::new(), staging: empty_lwwmap, staging_hash: empty_lwwmap_hash, } } pub fn merge(&mut self, other: &ClusterLayout) -> bool { match other.version.cmp(&self.version) { Ordering::Greater => { *self = other.clone(); true } Ordering::Equal => { self.staging.merge(&other.staging); let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); let changed = new_staging_hash != self.staging_hash; self.staging_hash = new_staging_hash; changed } Ordering::Less => false, } } /// Returns a list of IDs of nodes that currently have /// a role in the cluster pub fn node_ids(&self) -> &[Uuid] { &self.node_id_vec[..] } pub fn num_nodes(&self) -> usize { self.node_id_vec.len() } /// Returns the role of a node in the layout pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> { match self.roles.get(node) { Some(NodeRoleV(Some(v))) => Some(v), _ => None, } } /// Check a cluster layout for internal consistency /// returns true if consistent, false if error pub fn check(&self) -> bool { // Check that the hash of the staging data is correct let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); if staging_hash != self.staging_hash { return false; } // Check that node_id_vec contains the correct list of nodes let mut expected_nodes = self .roles .items() .iter() .filter(|(_, _, v)| v.0.is_some()) .map(|(id, _, _)| *id) .collect::>(); expected_nodes.sort(); let mut node_id_vec = self.node_id_vec.clone(); node_id_vec.sort(); if expected_nodes != node_id_vec { return false; } // Check that the assignation data has the correct length if self.ring_assignation_data.len() != (1 << PARTITION_BITS) * self.replication_factor { return false; } // Check that the assigned nodes are correct identifiers // of nodes that are assigned a role // and that role is not the role of a gateway nodes for x in self.ring_assignation_data.iter() { if *x as usize >= self.node_id_vec.len() { return false; } let node = self.node_id_vec[*x as usize]; match self.roles.get(&node) { Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (), _ => return false, } } true } /// This function calculates a new partition-to-node assignation. /// The computed assignation maximizes the capacity of a /// partition (assuming all partitions have the same size). /// Among such optimal assignation, it minimizes the distance to /// the former assignation (if any) to minimize the amount of /// data to be moved. A heuristic ensures node triplets /// dispersion (in garage_util::bipartite::optimize_matching()). pub fn calculate_partition_assignation(&mut self) -> bool { //The nodes might have been updated, some might have been deleted. //So we need to first update the list of nodes and retrieve the //assignation. let old_node_assignation = self.update_nodes_and_ring(); let (node_zone, _) = self.get_node_zone_capacity(); //We compute the optimal number of partition to assign to //every node and zone. if let Some((part_per_nod, part_per_zone)) = self.optimal_proportions() { //We collect part_per_zone in a vec to not rely on the //arbitrary order in which elements are iterated in //Hashmap::iter() let part_per_zone_vec = part_per_zone .iter() .map(|(x, y)| (x.clone(), *y)) .collect::>(); //We create an indexing of the zones let mut zone_id = HashMap::::new(); for (i, ppz) in part_per_zone_vec.iter().enumerate() { zone_id.insert(ppz.0.clone(), i); } //We compute a candidate for the new partition to zone //assignation. let nb_zones = part_per_zone.len(); let nb_nodes = part_per_nod.len(); let nb_partitions = 1 << PARTITION_BITS; let left_cap_vec = vec![self.replication_factor as u32; nb_partitions]; let right_cap_vec = part_per_zone_vec.iter().map(|(_, y)| *y as u32).collect(); let mut zone_assignation = dinic_compute_matching(left_cap_vec, right_cap_vec); //We create the structure for the partition-to-node assignation. let mut node_assignation = vec![vec![None; self.replication_factor]; nb_partitions]; //We will decrement part_per_nod to keep track of the number //of partitions that we still have to associate. let mut part_per_nod = part_per_nod; //We minimize the distance to the former assignation(if any) //We get the id of the zones of the former assignation //(and the id no_zone if there is no node assignated) let no_zone = part_per_zone_vec.len(); let old_zone_assignation: Vec> = old_node_assignation .iter() .map(|x| { x.iter() .map(|id| match *id { Some(i) => zone_id[&node_zone[i]], None => no_zone, }) .collect() }) .collect(); //We minimize the distance to the former zone assignation zone_assignation = optimize_matching(&old_zone_assignation, &zone_assignation, nb_zones + 1); //+1 for no_zone //We need to assign partitions to nodes in their zone //We first put the nodes assignation that can stay the same for i in 0..nb_partitions { for j in 0..self.replication_factor { if let Some(Some(former_node)) = old_node_assignation[i].iter().find(|x| { if let Some(id) = x { zone_id[&node_zone[*id]] == zone_assignation[i][j] } else { false } }) { if part_per_nod[*former_node] > 0 { node_assignation[i][j] = Some(*former_node); part_per_nod[*former_node] -= 1; } } } } //We complete the assignation of partitions to nodes let mut rng = rand::thread_rng(); for i in 0..nb_partitions { for j in 0..self.replication_factor { if node_assignation[i][j] == None { let possible_nodes: Vec = (0..nb_nodes) .filter(|id| { zone_id[&node_zone[*id]] == zone_assignation[i][j] && part_per_nod[*id] > 0 }) .collect(); assert!(!possible_nodes.is_empty()); //We randomly pick a node if let Some(nod) = possible_nodes.choose(&mut rng) { node_assignation[i][j] = Some(*nod); part_per_nod[*nod] -= 1; } } } } //We write the assignation in the 1D table self.ring_assignation_data = Vec::::new(); for ass in node_assignation { for nod in ass { if let Some(id) = nod { self.ring_assignation_data.push(id as CompactNodeType); } else { panic!() } } } true } else { false } } /// The LwwMap of node roles might have changed. This function updates the node_id_vec /// and returns the assignation given by ring, with the new indices of the nodes, and /// None of the node is not present anymore. /// We work with the assumption that only this function and calculate_new_assignation /// do modify assignation_ring and node_id_vec. fn update_nodes_and_ring(&mut self) -> Vec>> { let nb_partitions = 1usize << PARTITION_BITS; let mut node_assignation = vec![vec![None; self.replication_factor]; nb_partitions]; let rf = self.replication_factor; let ring = &self.ring_assignation_data; let new_node_id_vec: Vec = self.roles.items().iter().map(|(k, _, _)| *k).collect(); if ring.len() == rf * nb_partitions { for i in 0..nb_partitions { for j in 0..self.replication_factor { node_assignation[i][j] = new_node_id_vec .iter() .position(|id| *id == self.node_id_vec[ring[i * rf + j] as usize]); } } } self.node_id_vec = new_node_id_vec; self.ring_assignation_data = vec![]; node_assignation } ///This function compute the number of partition to assign to ///every node and zone, so that every partition is replicated ///self.replication_factor times and the capacity of a partition ///is maximized. fn optimal_proportions(&mut self) -> Option<(Vec, HashMap)> { let mut zone_capacity: HashMap = HashMap::new(); let (node_zone, node_capacity) = self.get_node_zone_capacity(); let nb_nodes = self.node_id_vec.len(); for i in 0..nb_nodes { if zone_capacity.contains_key(&node_zone[i]) { zone_capacity.insert( node_zone[i].clone(), zone_capacity[&node_zone[i]] + node_capacity[i], ); } else { zone_capacity.insert(node_zone[i].clone(), node_capacity[i]); } } //Compute the optimal number of partitions per zone let sum_capacities: u32 = zone_capacity.values().sum(); if sum_capacities == 0 { println!("No storage capacity in the network."); return None; } let nb_partitions = 1 << PARTITION_BITS; //Initially we would like to use zones porportionally to //their capacity. //However, a large zone can be associated to at most //nb_partitions to ensure replication of the date. //So we take the min with nb_partitions: let mut part_per_zone: HashMap = zone_capacity .iter() .map(|(k, v)| { ( k.clone(), min( nb_partitions, (self.replication_factor * nb_partitions * *v as usize) / sum_capacities as usize, ), ) }) .collect(); //The replication_factor-1 upper bounds the number of //part_per_zones that are greater than nb_partitions for _ in 1..self.replication_factor { //The number of partitions that are not assignated to //a zone that takes nb_partitions. let sum_capleft: u32 = zone_capacity .keys() .filter(|k| part_per_zone[*k] < nb_partitions) .map(|k| zone_capacity[k]) .sum(); //The number of replication of the data that we need //to ensure. let repl_left = self.replication_factor - part_per_zone .values() .filter(|x| **x == nb_partitions) .count(); if repl_left == 0 { break; } for k in zone_capacity.keys() { if part_per_zone[k] != nb_partitions { part_per_zone.insert( k.to_string(), min( nb_partitions, (nb_partitions * zone_capacity[k] as usize * repl_left) / sum_capleft as usize, ), ); } } } //Now we divide the zone's partition share proportionally //between their nodes. let mut part_per_nod: Vec = (0..nb_nodes) .map(|i| { (part_per_zone[&node_zone[i]] * node_capacity[i] as usize) / zone_capacity[&node_zone[i]] as usize }) .collect(); //We must update the part_per_zone to make it correspond to //part_per_nod (because of integer rounding) part_per_zone = part_per_zone.iter().map(|(k, _)| (k.clone(), 0)).collect(); for i in 0..nb_nodes { part_per_zone.insert( node_zone[i].clone(), part_per_zone[&node_zone[i]] + part_per_nod[i], ); } //Because of integer rounding, the total sum of part_per_nod //might not be replication_factor*nb_partitions. // We need at most to add 1 to every non maximal value of // part_per_nod. The capacity of a partition will be bounded // by the minimal value of // node_capacity_vec[i]/part_per_nod[i] // so we try to maximize this minimal value, keeping the // part_per_zone capped let discrepancy: usize = nb_partitions * self.replication_factor - part_per_nod.iter().sum::(); //We use a stupid O(N^2) algorithm. If the number of nodes //is actually expected to be high, one should optimize this. for _ in 0..discrepancy { if let Some(idmax) = (0..nb_nodes) .filter(|i| part_per_zone[&node_zone[*i]] < nb_partitions) .max_by(|i, j| { (node_capacity[*i] * (part_per_nod[*j] + 1) as u32) .cmp(&(node_capacity[*j] * (part_per_nod[*i] + 1) as u32)) }) { part_per_nod[idmax] += 1; part_per_zone.insert( node_zone[idmax].clone(), part_per_zone[&node_zone[idmax]] + 1, ); } } //We check the algorithm consistency let discrepancy: usize = nb_partitions * self.replication_factor - part_per_nod.iter().sum::(); assert!(discrepancy == 0); assert!(if let Some(v) = part_per_zone.values().max() { *v <= nb_partitions } else { false }); Some((part_per_nod, part_per_zone)) } //Returns vectors of zone and capacity; indexed by the same (temporary) //indices as node_id_vec. fn get_node_zone_capacity(&self) -> (Vec, Vec) { let node_zone = self .node_id_vec .iter() .map(|id_nod| match self.node_role(id_nod) { Some(NodeRole { zone, capacity: _, tags: _, }) => zone.clone(), _ => "".to_string(), }) .collect(); let node_capacity = self .node_id_vec .iter() .map(|id_nod| match self.node_role(id_nod) { Some(NodeRole { zone: _, capacity: Some(c), tags: _, }) => { *c } _ => 0, }) .collect(); (node_zone, node_capacity) } } #[cfg(test)] mod tests { use super::*; use itertools::Itertools; fn check_assignation(cl: &ClusterLayout) { //Check that input data has the right format let nb_partitions = 1usize << PARTITION_BITS; assert!([1, 2, 3].contains(&cl.replication_factor)); assert!(cl.ring_assignation_data.len() == nb_partitions * cl.replication_factor); let (node_zone, node_capacity) = cl.get_node_zone_capacity(); //Check that is is a correct assignation with zone redundancy let rf = cl.replication_factor; for i in 0..nb_partitions { assert!( rf == cl.ring_assignation_data[rf * i..rf * (i + 1)] .iter() .map(|nod| node_zone[*nod as usize].clone()) .unique() .count() ); } let nb_nodes = cl.node_id_vec.len(); //Check optimality let node_nb_part = (0..nb_nodes) .map(|i| { cl.ring_assignation_data .iter() .filter(|x| **x == i as u8) .count() }) .collect::>(); let zone_vec = node_zone.iter().unique().collect::>(); let zone_nb_part = zone_vec .iter() .map(|z| { cl.ring_assignation_data .iter() .filter(|x| node_zone[**x as usize] == **z) .count() }) .collect::>(); //Check optimality of the zone assignation : would it be better for the //node_capacity/node_partitions ratio to change the assignation of a partition if let Some(idmin) = (0..nb_nodes).min_by(|i, j| { (node_capacity[*i] * node_nb_part[*j] as u32) .cmp(&(node_capacity[*j] * node_nb_part[*i] as u32)) }) { if let Some(idnew) = (0..nb_nodes) .filter(|i| { if let Some(p) = zone_vec.iter().position(|z| **z == node_zone[*i]) { zone_nb_part[p] < nb_partitions } else { false } }) .max_by(|i, j| { (node_capacity[*i] * (node_nb_part[*j] as u32 + 1)) .cmp(&(node_capacity[*j] * (node_nb_part[*i] as u32 + 1))) }) { assert!( node_capacity[idmin] * (node_nb_part[idnew] as u32 + 1) >= node_capacity[idnew] * node_nb_part[idmin] as u32 ); } } //In every zone, check optimality of the nod assignation for z in zone_vec { let node_of_z_iter = (0..nb_nodes).filter(|id| node_zone[*id] == *z); if let Some(idmin) = node_of_z_iter.clone().min_by(|i, j| { (node_capacity[*i] * node_nb_part[*j] as u32) .cmp(&(node_capacity[*j] * node_nb_part[*i] as u32)) }) { if let Some(idnew) = node_of_z_iter.min_by(|i, j| { (node_capacity[*i] * (node_nb_part[*j] as u32 + 1)) .cmp(&(node_capacity[*j] * (node_nb_part[*i] as u32 + 1))) }) { assert!( node_capacity[idmin] * (node_nb_part[idnew] as u32 + 1) >= node_capacity[idnew] * node_nb_part[idmin] as u32 ); } } } } fn update_layout( cl: &mut ClusterLayout, node_id_vec: &Vec, node_capacity_vec: &Vec, node_zone_vec: &Vec, ) { for i in 0..node_id_vec.len() { if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) { cl.node_id_vec.push(x); } let update = cl.roles.update_mutator( cl.node_id_vec[i], NodeRoleV(Some(NodeRole { zone: (node_zone_vec[i].to_string()), capacity: (Some(node_capacity_vec[i])), tags: (vec![]), })), ); cl.roles.merge(&update); } } #[test] fn test_assignation() { let mut node_id_vec = vec![1, 2, 3]; let mut node_capacity_vec = vec![4000, 1000, 2000]; let mut node_zone_vec = vec!["A", "B", "C"] .into_iter() .map(|x| x.to_string()) .collect(); let mut cl = ClusterLayout { node_id_vec: vec![], roles: LwwMap::new(), replication_factor: 3, ring_assignation_data: vec![], version: 0, staging: LwwMap::new(), staging_hash: sha256sum(&[1; 32]), }; update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); cl.calculate_partition_assignation(); check_assignation(&cl); node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000]; node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"] .into_iter() .map(|x| x.to_string()) .collect(); update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); cl.calculate_partition_assignation(); check_assignation(&cl); node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000]; update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); cl.calculate_partition_assignation(); check_assignation(&cl); node_capacity_vec = vec![4000, 4000, 2000, 7000, 1000, 9000, 2000, 10, 2000]; update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); cl.calculate_partition_assignation(); check_assignation(&cl); } }