2021-11-09 12:24:04 +01:00
use std ::cmp ::Ordering ;
2022-05-01 09:57:05 +02:00
use std ::collections ::HashMap ;
2022-09-21 14:39:59 +02:00
use std ::collections ::HashSet ;
use hex ::ToHex ;
2021-11-09 12:24:04 +01:00
use serde ::{ Deserialize , Serialize } ;
use garage_util ::crdt ::{ AutoCrdt , Crdt , LwwMap } ;
use garage_util ::data ::* ;
2022-05-01 09:54:19 +02:00
2022-09-21 14:39:59 +02:00
use crate ::graph_algo ::* ;
2021-11-09 12:24:04 +01:00
use crate ::ring ::* ;
2022-09-21 14:39:59 +02:00
use std ::convert ::TryInto ;
//The Message type will be used to collect information on the algorithm.
type Message = Vec < String > ;
2021-11-09 12:24:04 +01:00
/// The layout of the cluster, i.e. the list of roles
/// which are assigned to each cluster node
#[ derive(Clone, Debug, Serialize, Deserialize) ]
pub struct ClusterLayout {
pub version : u64 ,
pub replication_factor : usize ,
2022-09-21 14:39:59 +02:00
#[ serde(default= " default_one " ) ]
pub zone_redundancy : usize ,
//This attribute is only used to retain the previously computed partition size,
//to know to what extent does it change with the layout update.
#[ serde(default= " default_zero " ) ]
pub partition_size : u32 ,
2021-11-09 12:24:04 +01:00
pub roles : LwwMap < Uuid , NodeRoleV > ,
/// node_id_vec: a vector of node IDs with a role assigned
/// in the system (this includes gateway nodes).
/// The order here is different than the vec stored by `roles`, because:
2022-09-21 14:39:59 +02:00
/// 1. non-gateway nodes are first so that they have lower numbers holding
/// in u8 (the number of non-gateway nodes is at most 256).
2021-11-09 12:24:04 +01:00
/// 2. nodes that don't have a role are excluded (but they need to
/// stay in the CRDT as tombstones)
pub node_id_vec : Vec < Uuid > ,
/// the assignation of data partitions to node, the values
/// are indices in node_id_vec
#[ serde(with = " serde_bytes " ) ]
pub ring_assignation_data : Vec < CompactNodeType > ,
/// Role changes which are staged for the next version of the layout
pub staging : LwwMap < Uuid , NodeRoleV > ,
pub staging_hash : Hash ,
}
2022-09-21 14:39:59 +02:00
fn default_one ( ) -> usize {
return 1 ;
}
fn default_zero ( ) -> u32 {
return 0 ;
}
const NB_PARTITIONS : usize = 1 usize < < PARTITION_BITS ;
2021-11-09 12:24:04 +01:00
#[ derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize) ]
pub struct NodeRoleV ( pub Option < NodeRole > ) ;
impl AutoCrdt for NodeRoleV {
const WARN_IF_DIFFERENT : bool = true ;
}
/// The user-assigned roles of cluster nodes
#[ derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize) ]
pub struct NodeRole {
/// Datacenter at which this entry belong. This information might be used to perform a better
/// geodistribution
pub zone : String ,
/// The (relative) capacity of the node
/// If this is set to None, the node does not participate in storing data for the system
/// and is only active as an API gateway to other nodes
pub capacity : Option < u32 > ,
/// A set of tags to recognize the node
pub tags : Vec < String > ,
}
impl NodeRole {
pub fn capacity_string ( & self ) -> String {
match self . capacity {
Some ( c ) = > format! ( " {} " , c ) ,
None = > " gateway " . to_string ( ) ,
}
}
2022-09-21 14:39:59 +02:00
pub fn tags_string ( & self ) -> String {
let mut tags = String ::new ( ) ;
if self . tags . len ( ) = = 0 {
return tags
}
tags . push_str ( & self . tags [ 0 ] . clone ( ) ) ;
for t in 1 .. self . tags . len ( ) {
tags . push_str ( " , " ) ;
tags . push_str ( & self . tags [ t ] . clone ( ) ) ;
}
return tags ;
}
2021-11-09 12:24:04 +01:00
}
impl ClusterLayout {
2022-09-21 14:39:59 +02:00
pub fn new ( replication_factor : usize , zone_redundancy : usize ) -> Self {
2021-11-09 12:24:04 +01:00
let empty_lwwmap = LwwMap ::new ( ) ;
let empty_lwwmap_hash = blake2sum ( & rmp_to_vec_all_named ( & empty_lwwmap ) . unwrap ( ) [ .. ] ) ;
ClusterLayout {
version : 0 ,
replication_factor ,
2022-09-21 14:39:59 +02:00
zone_redundancy ,
partition_size : 0 ,
2021-11-09 12:24:04 +01:00
roles : LwwMap ::new ( ) ,
node_id_vec : Vec ::new ( ) ,
ring_assignation_data : Vec ::new ( ) ,
staging : empty_lwwmap ,
staging_hash : empty_lwwmap_hash ,
}
}
pub fn merge ( & mut self , other : & ClusterLayout ) -> bool {
match other . version . cmp ( & self . version ) {
Ordering ::Greater = > {
* self = other . clone ( ) ;
true
}
Ordering ::Equal = > {
self . staging . merge ( & other . staging ) ;
let new_staging_hash = blake2sum ( & rmp_to_vec_all_named ( & self . staging ) . unwrap ( ) [ .. ] ) ;
let changed = new_staging_hash ! = self . staging_hash ;
self . staging_hash = new_staging_hash ;
changed
}
Ordering ::Less = > false ,
}
}
/// Returns a list of IDs of nodes that currently have
/// a role in the cluster
pub fn node_ids ( & self ) -> & [ Uuid ] {
& self . node_id_vec [ .. ]
}
pub fn num_nodes ( & self ) -> usize {
self . node_id_vec . len ( )
}
/// Returns the role of a node in the layout
pub fn node_role ( & self , node : & Uuid ) -> Option < & NodeRole > {
match self . roles . get ( node ) {
Some ( NodeRoleV ( Some ( v ) ) ) = > Some ( v ) ,
_ = > None ,
}
}
2022-09-21 14:39:59 +02:00
///Returns the uuids of the non_gateway nodes in self.node_id_vec.
pub fn useful_nodes ( & self ) -> Vec < Uuid > {
let mut result = Vec ::< Uuid > ::new ( ) ;
for uuid in self . node_id_vec . iter ( ) {
match self . node_role ( uuid ) {
Some ( role ) if role . capacity ! = None = > result . push ( * uuid ) ,
_ = > ( )
}
}
return result ;
}
///Given a node uuids, this function returns the label of its zone
pub fn get_node_zone ( & self , uuid : & Uuid ) -> Result < String , String > {
match self . node_role ( uuid ) {
Some ( role ) = > return Ok ( role . zone . clone ( ) ) ,
_ = > return Err ( " The Uuid does not correspond to a node present in the cluster. " . to_string ( ) )
}
}
///Given a node uuids, this function returns its capacity or fails if it does not have any
pub fn get_node_capacity ( & self , uuid : & Uuid ) -> Result < u32 , String > {
match self . node_role ( uuid ) {
Some ( NodeRole { capacity : Some ( cap ) , zone : _ , tags : _ } ) = > return Ok ( * cap ) ,
_ = > return Err ( " The Uuid does not correspond to a node present in the cluster or this node does not have a positive capacity. " . to_string ( ) )
}
}
///Returns the sum of capacities of non gateway nodes in the cluster
pub fn get_total_capacity ( & self ) -> Result < u32 , String > {
let mut total_capacity = 0 ;
for uuid in self . useful_nodes ( ) . iter ( ) {
total_capacity + = self . get_node_capacity ( uuid ) ? ;
}
return Ok ( total_capacity ) ;
}
2021-11-09 12:24:04 +01:00
/// Check a cluster layout for internal consistency
/// returns true if consistent, false if error
pub fn check ( & self ) -> bool {
// Check that the hash of the staging data is correct
let staging_hash = blake2sum ( & rmp_to_vec_all_named ( & self . staging ) . unwrap ( ) [ .. ] ) ;
if staging_hash ! = self . staging_hash {
return false ;
}
// Check that node_id_vec contains the correct list of nodes
let mut expected_nodes = self
. roles
. items ( )
. iter ( )
. filter ( | ( _ , _ , v ) | v . 0. is_some ( ) )
. map ( | ( id , _ , _ ) | * id )
. collect ::< Vec < _ > > ( ) ;
expected_nodes . sort ( ) ;
let mut node_id_vec = self . node_id_vec . clone ( ) ;
node_id_vec . sort ( ) ;
if expected_nodes ! = node_id_vec {
return false ;
}
// Check that the assignation data has the correct length
if self . ring_assignation_data . len ( ) ! = ( 1 < < PARTITION_BITS ) * self . replication_factor {
return false ;
}
// Check that the assigned nodes are correct identifiers
// of nodes that are assigned a role
// and that role is not the role of a gateway nodes
for x in self . ring_assignation_data . iter ( ) {
if * x as usize > = self . node_id_vec . len ( ) {
return false ;
}
let node = self . node_id_vec [ * x as usize ] ;
match self . roles . get ( & node ) {
Some ( NodeRoleV ( Some ( x ) ) ) if x . capacity . is_some ( ) = > ( ) ,
_ = > return false ,
}
}
true
}
2022-09-21 14:39:59 +02:00
}
impl ClusterLayout {
2022-05-01 09:57:05 +02:00
/// This function calculates a new partition-to-node assignation.
2022-09-21 14:39:59 +02:00
/// The computed assignation respects the node replication factor
/// and the zone redundancy parameter It maximizes the capacity of a
2022-05-01 09:57:05 +02:00
/// partition (assuming all partitions have the same size).
/// Among such optimal assignation, it minimizes the distance to
/// the former assignation (if any) to minimize the amount of
2022-09-21 14:39:59 +02:00
/// data to be moved.
pub fn calculate_partition_assignation ( & mut self , replication :usize , redundancy :usize ) -> Result < Message , String > {
2022-05-01 09:57:05 +02:00
//The nodes might have been updated, some might have been deleted.
//So we need to first update the list of nodes and retrieve the
//assignation.
2022-09-21 14:39:59 +02:00
//We update the node ids, since the node list might have changed with the staged
//changes in the layout. We retrieve the old_assignation reframed with the new ids
let old_assignation_opt = self . update_node_id_vec ( ) ? ;
self . replication_factor = replication ;
self . zone_redundancy = redundancy ;
let mut msg = Message ::new ( ) ;
msg . push ( format! ( " Computation of a new cluster layout where partitions are
replicated { } times on at least { } distinct zones . " , replication, redundancy));
//We generate for once numerical ids for the zone, to use them as indices in the
//flow graphs.
let ( id_to_zone , zone_to_id ) = self . generate_zone_ids ( ) ? ;
msg . push ( format! ( " The cluster contains {} nodes spread over {} zones. " ,
self . useful_nodes ( ) . len ( ) , id_to_zone . len ( ) ) ) ;
//We compute the optimal partition size
let partition_size = self . compute_optimal_partition_size ( & zone_to_id ) ? ;
if old_assignation_opt ! = None {
msg . push ( format! ( " Given the replication and redundancy constraint, the
optimal size of a partition is { } . In the previous layout , it used to
be { } . " , partition_size, self.partition_size));
}
else {
msg . push ( format! ( " Given the replication and redundancy constraints, the
optimal size of a partition is { } . " , partition_size));
}
self . partition_size = partition_size ;
//We compute a first flow/assignment that is heuristically close to the previous
//assignment
let mut gflow = self . compute_candidate_assignment ( & zone_to_id , & old_assignation_opt ) ? ;
if let Some ( assoc ) = & old_assignation_opt {
//We minimize the distance to the previous assignment.
self . minimize_rebalance_load ( & mut gflow , & zone_to_id , & assoc ) ? ;
}
msg . append ( & mut self . output_stat ( & gflow , & old_assignation_opt , & zone_to_id , & id_to_zone ) ? ) ;
//We update the layout structure
self . update_ring_from_flow ( id_to_zone . len ( ) , & gflow ) ? ;
return Ok ( msg ) ;
}
2022-05-01 09:57:05 +02:00
/// The LwwMap of node roles might have changed. This function updates the node_id_vec
/// and returns the assignation given by ring, with the new indices of the nodes, and
2022-09-21 14:39:59 +02:00
/// None if the node is not present anymore.
2022-05-01 09:57:05 +02:00
/// We work with the assumption that only this function and calculate_new_assignation
/// do modify assignation_ring and node_id_vec.
2022-09-21 14:39:59 +02:00
fn update_node_id_vec ( & mut self ) -> Result < Option < Vec < Vec < usize > > > , String > {
// (1) We compute the new node list
//Non gateway nodes should be coded on 8bits, hence they must be first in the list
//We build the new node ids
let mut new_non_gateway_nodes : Vec < Uuid > = self . roles . items ( ) . iter ( )
. filter ( | ( _ , _ , v ) |
match & v . 0 { Some ( r ) if r . capacity ! = None = > true , _ = > false } )
. map ( | ( k , _ , _ ) | * k ) . collect ( ) ;
if new_non_gateway_nodes . len ( ) > MAX_NODE_NUMBER {
return Err ( format! ( " There are more than {} non-gateway nodes in the new layout. This is not allowed. " , MAX_NODE_NUMBER ) . to_string ( ) ) ;
}
let mut new_gateway_nodes : Vec < Uuid > = self . roles . items ( ) . iter ( )
. filter ( | ( _ , _ , v ) |
match v { NodeRoleV ( Some ( r ) ) if r . capacity = = None = > true , _ = > false } )
. map ( | ( k , _ , _ ) | * k ) . collect ( ) ;
let nb_useful_nodes = new_non_gateway_nodes . len ( ) ;
let mut new_node_id_vec = Vec ::< Uuid > ::new ( ) ;
new_node_id_vec . append ( & mut new_non_gateway_nodes ) ;
new_node_id_vec . append ( & mut new_gateway_nodes ) ;
// (2) We retrieve the old association
//We rewrite the old association with the new indices. We only consider partition
//to node assignations where the node is still in use.
let nb_partitions = 1 usize < < PARTITION_BITS ;
let mut old_assignation = vec! [ Vec ::< usize > ::new ( ) ; nb_partitions ] ;
if self . ring_assignation_data . len ( ) = = 0 {
//This is a new association
return Ok ( None ) ;
}
if self . ring_assignation_data . len ( ) ! = nb_partitions * self . replication_factor {
return Err ( " The old assignation does not have a size corresponding to the old replication factor or the number of partitions. " . to_string ( ) ) ;
}
//We build a translation table between the uuid and new ids
let mut uuid_to_new_id = HashMap ::< Uuid , usize > ::new ( ) ;
//We add the indices of only the new non-gateway nodes that can be used in the
//association ring
for i in 0 .. nb_useful_nodes {
uuid_to_new_id . insert ( new_node_id_vec [ i ] , i ) ;
}
let rf = self . replication_factor ;
for p in 0 .. nb_partitions {
for old_id in & self . ring_assignation_data [ p * rf .. ( p + 1 ) * rf ] {
let uuid = self . node_id_vec [ * old_id as usize ] ;
if uuid_to_new_id . contains_key ( & uuid ) {
old_assignation [ p ] . push ( uuid_to_new_id [ & uuid ] ) ;
}
}
}
//We write the results
self . node_id_vec = new_node_id_vec ;
self . ring_assignation_data = Vec ::< CompactNodeType > ::new ( ) ;
return Ok ( Some ( old_assignation ) ) ;
2022-05-01 09:57:05 +02:00
}
2022-09-21 14:39:59 +02:00
///This function generates ids for the zone of the nodes appearing in
///self.node_id_vec.
fn generate_zone_ids ( & self ) -> Result < ( Vec < String > , HashMap < String , usize > ) , String > {
let mut id_to_zone = Vec ::< String > ::new ( ) ;
let mut zone_to_id = HashMap ::< String , usize > ::new ( ) ;
for uuid in self . node_id_vec . iter ( ) {
if self . roles . get ( uuid ) = = None {
return Err ( " The uuid was not found in the node roles (this should not happen, it might be a critical error). " . to_string ( ) ) ;
}
match self . node_role ( & uuid ) {
Some ( r ) = > if ! zone_to_id . contains_key ( & r . zone ) & & r . capacity ! = None {
zone_to_id . insert ( r . zone . clone ( ) , id_to_zone . len ( ) ) ;
id_to_zone . push ( r . zone . clone ( ) ) ;
}
_ = > ( )
}
}
return Ok ( ( id_to_zone , zone_to_id ) ) ;
}
///This function computes by dichotomy the largest realizable partition size, given
///the layout.
fn compute_optimal_partition_size ( & self , zone_to_id : & HashMap < String , usize > ) -> Result < u32 , String > {
let nb_partitions = 1 usize < < PARTITION_BITS ;
let empty_set = HashSet ::< ( usize , usize ) > ::new ( ) ;
let mut g = self . generate_flow_graph ( 1 , zone_to_id , & empty_set ) ? ;
g . compute_maximal_flow ( ) ? ;
if g . get_flow_value ( ) ? < ( nb_partitions * self . replication_factor ) . try_into ( ) . unwrap ( ) {
return Err ( " The storage capacity of he cluster is to small. It is impossible to store partitions of size 1. " . to_string ( ) ) ;
}
let mut s_down = 1 ;
let mut s_up = self . get_total_capacity ( ) ? ;
while s_down + 1 < s_up {
g = self . generate_flow_graph ( ( s_down + s_up ) / 2 , zone_to_id , & empty_set ) ? ;
g . compute_maximal_flow ( ) ? ;
if g . get_flow_value ( ) ? < ( nb_partitions * self . replication_factor ) . try_into ( ) . unwrap ( ) {
s_up = ( s_down + s_up ) / 2 ;
}
else {
s_down = ( s_down + s_up ) / 2 ;
}
}
return Ok ( s_down ) ;
}
fn generate_graph_vertices ( nb_zones : usize , nb_nodes : usize ) -> Vec < Vertex > {
let mut vertices = vec! [ Vertex ::Source , Vertex ::Sink ] ;
for p in 0 .. NB_PARTITIONS {
vertices . push ( Vertex ::Pup ( p ) ) ;
vertices . push ( Vertex ::Pdown ( p ) ) ;
for z in 0 .. nb_zones {
vertices . push ( Vertex ::PZ ( p , z ) ) ;
}
}
for n in 0 .. nb_nodes {
vertices . push ( Vertex ::N ( n ) ) ;
}
return vertices ;
}
fn generate_flow_graph ( & self , size : u32 , zone_to_id : & HashMap < String , usize > , exclude_assoc : & HashSet < ( usize , usize ) > ) -> Result < Graph < FlowEdge > , String > {
let vertices = ClusterLayout ::generate_graph_vertices ( zone_to_id . len ( ) ,
self . useful_nodes ( ) . len ( ) ) ;
let mut g = Graph ::< FlowEdge > ::new ( & vertices ) ;
let nb_zones = zone_to_id . len ( ) ;
for p in 0 .. NB_PARTITIONS {
g . add_edge ( Vertex ::Source , Vertex ::Pup ( p ) , self . zone_redundancy as u32 ) ? ;
g . add_edge ( Vertex ::Source , Vertex ::Pdown ( p ) , ( self . replication_factor - self . zone_redundancy ) as u32 ) ? ;
for z in 0 .. nb_zones {
g . add_edge ( Vertex ::Pup ( p ) , Vertex ::PZ ( p , z ) , 1 ) ? ;
g . add_edge ( Vertex ::Pdown ( p ) , Vertex ::PZ ( p , z ) ,
self . replication_factor as u32 ) ? ;
}
}
for n in 0 .. self . useful_nodes ( ) . len ( ) {
let node_capacity = self . get_node_capacity ( & self . node_id_vec [ n ] ) ? ;
let node_zone = zone_to_id [ & self . get_node_zone ( & self . node_id_vec [ n ] ) ? ] ;
g . add_edge ( Vertex ::N ( n ) , Vertex ::Sink , node_capacity / size ) ? ;
for p in 0 .. NB_PARTITIONS {
if ! exclude_assoc . contains ( & ( p , n ) ) {
g . add_edge ( Vertex ::PZ ( p , node_zone ) , Vertex ::N ( n ) , 1 ) ? ;
}
}
}
return Ok ( g ) ;
}
fn compute_candidate_assignment ( & self , zone_to_id : & HashMap < String , usize > ,
old_assoc_opt : & Option < Vec < Vec < usize > > > ) -> Result < Graph < FlowEdge > , String > {
//We list the edges that are not used in the old association
let mut exclude_edge = HashSet ::< ( usize , usize ) > ::new ( ) ;
if let Some ( old_assoc ) = old_assoc_opt {
let nb_nodes = self . useful_nodes ( ) . len ( ) ;
for p in 0 .. NB_PARTITIONS {
for n in 0 .. nb_nodes {
exclude_edge . insert ( ( p , n ) ) ;
}
for n in old_assoc [ p ] . iter ( ) {
exclude_edge . remove ( & ( p , * n ) ) ;
}
}
}
//We compute the best flow using only the edges used in the old assoc
let mut g = self . generate_flow_graph ( self . partition_size , zone_to_id , & exclude_edge ) ? ;
g . compute_maximal_flow ( ) ? ;
for ( p , n ) in exclude_edge . iter ( ) {
let node_zone = zone_to_id [ & self . get_node_zone ( & self . node_id_vec [ * n ] ) ? ] ;
g . add_edge ( Vertex ::PZ ( * p , node_zone ) , Vertex ::N ( * n ) , 1 ) ? ;
}
g . compute_maximal_flow ( ) ? ;
return Ok ( g ) ;
}
fn minimize_rebalance_load ( & self , gflow : & mut Graph < FlowEdge > , zone_to_id : & HashMap < String , usize > , old_assoc : & Vec < Vec < usize > > ) -> Result < ( ) , String > {
let mut cost = CostFunction ::new ( ) ;
for p in 0 .. NB_PARTITIONS {
for n in old_assoc [ p ] . iter ( ) {
let node_zone = zone_to_id [ & self . get_node_zone ( & self . node_id_vec [ * n ] ) ? ] ;
cost . insert ( ( Vertex ::PZ ( p , node_zone ) , Vertex ::N ( * n ) ) , - 1 ) ;
}
}
let nb_nodes = self . useful_nodes ( ) . len ( ) ;
let path_length = 4 * nb_nodes ;
gflow . optimize_flow_with_cost ( & cost , path_length ) ? ;
return Ok ( ( ) ) ;
}
fn update_ring_from_flow ( & mut self , nb_zones : usize , gflow : & Graph < FlowEdge > ) -> Result < ( ) , String > {
self . ring_assignation_data = Vec ::< CompactNodeType > ::new ( ) ;
for p in 0 .. NB_PARTITIONS {
for z in 0 .. nb_zones {
let assoc_vertex = gflow . get_positive_flow_from ( Vertex ::PZ ( p , z ) ) ? ;
for vertex in assoc_vertex . iter ( ) {
match vertex {
Vertex ::N ( n ) = > self . ring_assignation_data . push ( ( * n ) . try_into ( ) . unwrap ( ) ) ,
_ = > ( )
}
}
}
}
if self . ring_assignation_data . len ( ) ! = NB_PARTITIONS * self . replication_factor {
return Err ( " Critical Error : the association ring we produced does not have the right size. " . to_string ( ) ) ;
}
return Ok ( ( ) ) ;
}
//This function returns a message summing up the partition repartition of the new
//layout.
fn output_stat ( & self , gflow : & Graph < FlowEdge > ,
old_assoc_opt : & Option < Vec < Vec < usize > > > ,
zone_to_id : & HashMap < String , usize > ,
id_to_zone : & Vec < String > ) -> Result < Message , String > {
let mut msg = Message ::new ( ) ;
let nb_partitions = 1 usize < < PARTITION_BITS ;
let used_cap = self . partition_size * nb_partitions as u32 *
self . replication_factor as u32 ;
let total_cap = self . get_total_capacity ( ) ? ;
let percent_cap = 100.0 * ( used_cap as f32 ) / ( total_cap as f32 ) ;
msg . push ( format! ( " Available capacity / Total cluster capacity: {} / {} ( {:.1} %) " ,
used_cap , total_cap , percent_cap ) ) ;
msg . push ( format! ( " If the percentage is to low, it might be that the replication/redundancy constraints force the use of nodes/zones with small storage capacities.
You might want to rebalance the storage capacities or relax the constraints . See the detailed statistics below and look for saturated nodes / zones . " ));
msg . push ( format! ( " Recall that because of the replication, the actual available storage capacity is {} / {} = {} . " , used_cap , self . replication_factor , used_cap / self . replication_factor as u32 ) ) ;
//We define and fill in the following tables
let storing_nodes = self . useful_nodes ( ) ;
let mut new_partitions = vec! [ 0 ; storing_nodes . len ( ) ] ;
let mut stored_partitions = vec! [ 0 ; storing_nodes . len ( ) ] ;
let mut new_partitions_zone = vec! [ 0 ; id_to_zone . len ( ) ] ;
let mut stored_partitions_zone = vec! [ 0 ; id_to_zone . len ( ) ] ;
for p in 0 .. nb_partitions {
for z in 0 .. id_to_zone . len ( ) {
let pz_nodes = gflow . get_positive_flow_from ( Vertex ::PZ ( p , z ) ) ? ;
if pz_nodes . len ( ) > 0 {
stored_partitions_zone [ z ] + = 1 ;
}
for vert in pz_nodes . iter ( ) {
if let Vertex ::N ( n ) = * vert {
stored_partitions [ n ] + = 1 ;
if let Some ( old_assoc ) = old_assoc_opt {
if ! old_assoc [ p ] . contains ( & n ) {
new_partitions [ n ] + = 1 ;
}
}
}
}
if let Some ( old_assoc ) = old_assoc_opt {
let mut old_zones_of_p = Vec ::< usize > ::new ( ) ;
for n in old_assoc [ p ] . iter ( ) {
old_zones_of_p . push (
zone_to_id [ & self . get_node_zone ( & self . node_id_vec [ * n ] ) ? ] ) ;
}
if ! old_zones_of_p . contains ( & z ) {
new_partitions_zone [ z ] + = 1 ;
}
}
}
}
//We display the statistics
if * old_assoc_opt ! = None {
let total_new_partitions : usize = new_partitions . iter ( ) . sum ( ) ;
msg . push ( format! ( " A total of {} new copies of partitions need to be \
transferred . " , total_new_partitions));
}
msg . push ( format! ( " " ) ) ;
msg . push ( format! ( " Detailed statistics by zones and nodes. " ) ) ;
for z in 0 .. id_to_zone . len ( ) {
let mut nodes_of_z = Vec ::< usize > ::new ( ) ;
for n in 0 .. storing_nodes . len ( ) {
if self . get_node_zone ( & self . node_id_vec [ n ] ) ? = = id_to_zone [ z ] {
nodes_of_z . push ( n ) ;
}
}
let replicated_partitions : usize = nodes_of_z . iter ( )
. map ( | n | stored_partitions [ * n ] ) . sum ( ) ;
msg . push ( format! ( " " ) ) ;
if * old_assoc_opt ! = None {
msg . push ( format! ( " Zone {} : {} distinct partitions stored ( {} new, \
{ } partition copies ) " , id_to_zone[z], stored_partitions_zone[z],
new_partitions_zone [ z ] , replicated_partitions ) ) ;
}
else {
msg . push ( format! ( " Zone {} : {} distinct partitions stored ( {} partition \
copies ) " ,
id_to_zone [ z ] , stored_partitions_zone [ z ] , replicated_partitions ) ) ;
}
let available_cap_z : u32 = self . partition_size * replicated_partitions as u32 ;
let mut total_cap_z = 0 ;
for n in nodes_of_z . iter ( ) {
total_cap_z + = self . get_node_capacity ( & self . node_id_vec [ * n ] ) ? ;
}
let percent_cap_z = 100.0 * ( available_cap_z as f32 ) / ( total_cap_z as f32 ) ;
msg . push ( format! ( " Available capacity / Total capacity: {} / {} ( {:.1} %). " ,
available_cap_z , total_cap_z , percent_cap_z ) ) ;
msg . push ( format! ( " " ) ) ;
for n in nodes_of_z . iter ( ) {
let available_cap_n = stored_partitions [ * n ] as u32 * self . partition_size ;
let total_cap_n = self . get_node_capacity ( & self . node_id_vec [ * n ] ) ? ;
let tags_n = ( self . node_role ( & self . node_id_vec [ * n ] )
. ok_or ( " Node not found. " ) ) ? . tags_string ( ) ;
msg . push ( format! ( " Node {} : {} partitions ( {} new) ; \
available / total capacity : { } / { } ( { :. 1 } % ) ; tags :{ } " ,
& self . node_id_vec [ * n ] . to_vec ( ) . encode_hex ::< String > ( ) ,
stored_partitions [ * n ] ,
new_partitions [ * n ] , available_cap_n , total_cap_n ,
( available_cap_n as f32 ) / ( total_cap_n as f32 ) * 100.0 ,
tags_n ) ) ;
}
}
return Ok ( msg ) ;
}
2022-05-01 09:57:05 +02:00
}
2021-11-09 12:24:04 +01:00
2022-09-21 14:39:59 +02:00
//====================================================================================
2022-05-01 09:54:19 +02:00
#[ cfg(test) ]
mod tests {
2022-05-01 09:57:05 +02:00
use super ::* ;
use itertools ::Itertools ;
fn check_assignation ( cl : & ClusterLayout ) {
//Check that input data has the right format
let nb_partitions = 1 usize < < PARTITION_BITS ;
assert! ( [ 1 , 2 , 3 ] . contains ( & cl . replication_factor ) ) ;
assert! ( cl . ring_assignation_data . len ( ) = = nb_partitions * cl . replication_factor ) ;
let ( node_zone , node_capacity ) = cl . get_node_zone_capacity ( ) ;
//Check that is is a correct assignation with zone redundancy
let rf = cl . replication_factor ;
for i in 0 .. nb_partitions {
assert! (
rf = = cl . ring_assignation_data [ rf * i .. rf * ( i + 1 ) ]
. iter ( )
. map ( | nod | node_zone [ * nod as usize ] . clone ( ) )
. unique ( )
. count ( )
) ;
}
let nb_nodes = cl . node_id_vec . len ( ) ;
//Check optimality
let node_nb_part = ( 0 .. nb_nodes )
. map ( | i | {
cl . ring_assignation_data
. iter ( )
. filter ( | x | * * x = = i as u8 )
. count ( )
} )
. collect ::< Vec < _ > > ( ) ;
let zone_vec = node_zone . iter ( ) . unique ( ) . collect ::< Vec < _ > > ( ) ;
let zone_nb_part = zone_vec
. iter ( )
. map ( | z | {
cl . ring_assignation_data
. iter ( )
. filter ( | x | node_zone [ * * x as usize ] = = * * z )
. count ( )
} )
. collect ::< Vec < _ > > ( ) ;
//Check optimality of the zone assignation : would it be better for the
//node_capacity/node_partitions ratio to change the assignation of a partition
if let Some ( idmin ) = ( 0 .. nb_nodes ) . min_by ( | i , j | {
( node_capacity [ * i ] * node_nb_part [ * j ] as u32 )
. cmp ( & ( node_capacity [ * j ] * node_nb_part [ * i ] as u32 ) )
} ) {
if let Some ( idnew ) = ( 0 .. nb_nodes )
. filter ( | i | {
if let Some ( p ) = zone_vec . iter ( ) . position ( | z | * * z = = node_zone [ * i ] ) {
zone_nb_part [ p ] < nb_partitions
} else {
false
}
} )
. max_by ( | i , j | {
( node_capacity [ * i ] * ( node_nb_part [ * j ] as u32 + 1 ) )
. cmp ( & ( node_capacity [ * j ] * ( node_nb_part [ * i ] as u32 + 1 ) ) )
} ) {
assert! (
node_capacity [ idmin ] * ( node_nb_part [ idnew ] as u32 + 1 )
> = node_capacity [ idnew ] * node_nb_part [ idmin ] as u32
) ;
}
}
//In every zone, check optimality of the nod assignation
for z in zone_vec {
let node_of_z_iter = ( 0 .. nb_nodes ) . filter ( | id | node_zone [ * id ] = = * z ) ;
if let Some ( idmin ) = node_of_z_iter . clone ( ) . min_by ( | i , j | {
( node_capacity [ * i ] * node_nb_part [ * j ] as u32 )
. cmp ( & ( node_capacity [ * j ] * node_nb_part [ * i ] as u32 ) )
} ) {
if let Some ( idnew ) = node_of_z_iter . min_by ( | i , j | {
( node_capacity [ * i ] * ( node_nb_part [ * j ] as u32 + 1 ) )
. cmp ( & ( node_capacity [ * j ] * ( node_nb_part [ * i ] as u32 + 1 ) ) )
} ) {
assert! (
node_capacity [ idmin ] * ( node_nb_part [ idnew ] as u32 + 1 )
> = node_capacity [ idnew ] * node_nb_part [ idmin ] as u32
) ;
}
}
}
}
fn update_layout (
cl : & mut ClusterLayout ,
node_id_vec : & Vec < u8 > ,
node_capacity_vec : & Vec < u32 > ,
node_zone_vec : & Vec < String > ,
) {
for i in 0 .. node_id_vec . len ( ) {
if let Some ( x ) = FixedBytes32 ::try_from ( & [ i as u8 ; 32 ] ) {
cl . node_id_vec . push ( x ) ;
}
2021-11-09 12:24:04 +01:00
2022-05-01 09:57:05 +02:00
let update = cl . roles . update_mutator (
cl . node_id_vec [ i ] ,
NodeRoleV ( Some ( NodeRole {
zone : ( node_zone_vec [ i ] . to_string ( ) ) ,
capacity : ( Some ( node_capacity_vec [ i ] ) ) ,
tags : ( vec! [ ] ) ,
} ) ) ,
) ;
cl . roles . merge ( & update ) ;
}
}
#[ test ]
fn test_assignation ( ) {
let mut node_id_vec = vec! [ 1 , 2 , 3 ] ;
let mut node_capacity_vec = vec! [ 4000 , 1000 , 2000 ] ;
let mut node_zone_vec = vec! [ " A " , " B " , " C " ]
. into_iter ( )
. map ( | x | x . to_string ( ) )
. collect ( ) ;
let mut cl = ClusterLayout {
node_id_vec : vec ! [ ] ,
2021-11-09 12:24:04 +01:00
2022-05-01 09:57:05 +02:00
roles : LwwMap ::new ( ) ,
2021-11-09 12:24:04 +01:00
2022-05-01 09:57:05 +02:00
replication_factor : 3 ,
ring_assignation_data : vec ! [ ] ,
version : 0 ,
staging : LwwMap ::new ( ) ,
staging_hash : sha256sum ( & [ 1 ; 32 ] ) ,
} ;
update_layout ( & mut cl , & node_id_vec , & node_capacity_vec , & node_zone_vec ) ;
cl . calculate_partition_assignation ( ) ;
check_assignation ( & cl ) ;
node_id_vec = vec! [ 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ] ;
node_capacity_vec = vec! [ 4000 , 1000 , 1000 , 3000 , 1000 , 1000 , 2000 , 10000 , 2000 ] ;
node_zone_vec = vec! [ " A " , " B " , " C " , " C " , " C " , " B " , " G " , " H " , " I " ]
. into_iter ( )
. map ( | x | x . to_string ( ) )
. collect ( ) ;
update_layout ( & mut cl , & node_id_vec , & node_capacity_vec , & node_zone_vec ) ;
cl . calculate_partition_assignation ( ) ;
check_assignation ( & cl ) ;
node_capacity_vec = vec! [ 4000 , 1000 , 2000 , 7000 , 1000 , 1000 , 2000 , 10000 , 2000 ] ;
update_layout ( & mut cl , & node_id_vec , & node_capacity_vec , & node_zone_vec ) ;
cl . calculate_partition_assignation ( ) ;
check_assignation ( & cl ) ;
node_capacity_vec = vec! [ 4000 , 4000 , 2000 , 7000 , 1000 , 9000 , 2000 , 10 , 2000 ] ;
update_layout ( & mut cl , & node_id_vec , & node_capacity_vec , & node_zone_vec ) ;
cl . calculate_partition_assignation ( ) ;
check_assignation ( & cl ) ;
}
}