Garage v0.9 #473
2 changed files with 132 additions and 109 deletions
|
@ -6,10 +6,10 @@ use std::cmp::{max, min};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
//Vertex data structures used in all the graphs used in layout.rs.
|
///Vertex data structures used in all the graphs used in layout.rs.
|
||||||
//usize parameters correspond to node/zone/partitions ids.
|
///usize parameters correspond to node/zone/partitions ids.
|
||||||
//To understand the vertex roles below, please refer to the formal description
|
///To understand the vertex roles below, please refer to the formal description
|
||||||
//of the layout computation algorithm.
|
///of the layout computation algorithm.
|
||||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||||
pub enum Vertex {
|
pub enum Vertex {
|
||||||
Source,
|
Source,
|
||||||
|
@ -20,8 +20,7 @@ pub enum Vertex {
|
||||||
Sink,
|
Sink,
|
||||||
}
|
}
|
||||||
|
|
||||||
//Edge data structure for the flow algorithm.
|
///Edge data structure for the flow algorithm.
|
||||||
//The graph is stored as an adjacency list
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
#[derive(Clone, Copy, Debug)]
|
||||||
pub struct FlowEdge {
|
pub struct FlowEdge {
|
||||||
cap: u32, //flow maximal capacity of the edge
|
cap: u32, //flow maximal capacity of the edge
|
||||||
|
@ -30,8 +29,7 @@ pub struct FlowEdge {
|
||||||
rev: usize, //index of the reversed edge (v, self) in the edge list of vertex v
|
rev: usize, //index of the reversed edge (v, self) in the edge list of vertex v
|
||||||
}
|
}
|
||||||
|
|
||||||
//Edge data structure for the detection of negative cycles.
|
///Edge data structure for the detection of negative cycles.
|
||||||
//The graph is stored as a list of edges (u,v).
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
#[derive(Clone, Copy, Debug)]
|
||||||
pub struct WeightedEdge {
|
pub struct WeightedEdge {
|
||||||
w: i32, //weight of the edge
|
w: i32, //weight of the edge
|
||||||
|
@ -42,13 +40,14 @@ pub trait Edge: Clone + Copy {}
|
||||||
impl Edge for FlowEdge {}
|
impl Edge for FlowEdge {}
|
||||||
impl Edge for WeightedEdge {}
|
impl Edge for WeightedEdge {}
|
||||||
|
|
||||||
//Struct for the graph structure. We do encapsulation here to be able to both
|
///Struct for the graph structure. We do encapsulation here to be able to both
|
||||||
//provide user friendly Vertex enum to address vertices, and to use usize indices
|
///provide user friendly Vertex enum to address vertices, and to use internally usize
|
||||||
//and Vec instead of HashMap in the graph algorithm to optimize execution speed.
|
///indices and Vec instead of HashMap in the graph algorithm to optimize execution speed.
|
||||||
pub struct Graph<E: Edge> {
|
pub struct Graph<E: Edge> {
|
||||||
vertextoid: HashMap<Vertex, usize>,
|
vertextoid: HashMap<Vertex, usize>,
|
||||||
idtovertex: Vec<Vertex>,
|
idtovertex: Vec<Vertex>,
|
||||||
|
|
||||||
|
//The graph is stored as an adjacency list
|
||||||
graph: Vec<Vec<E>>,
|
graph: Vec<Vec<E>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,8 +68,8 @@ impl<E: Edge> Graph<E> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Graph<FlowEdge> {
|
impl Graph<FlowEdge> {
|
||||||
//This function adds a directed edge to the graph with capacity c, and the
|
///This function adds a directed edge to the graph with capacity c, and the
|
||||||
//corresponding reversed edge with capacity 0.
|
///corresponding reversed edge with capacity 0.
|
||||||
pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u32) -> Result<(), String> {
|
pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u32) -> Result<(), String> {
|
||||||
if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) {
|
if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) {
|
||||||
return Err("The graph does not contain the provided vertex.".to_string());
|
return Err("The graph does not contain the provided vertex.".to_string());
|
||||||
|
@ -94,8 +93,8 @@ impl Graph<FlowEdge> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
//This function returns the list of vertices that receive a positive flow from
|
///This function returns the list of vertices that receive a positive flow from
|
||||||
//vertex v.
|
///vertex v.
|
||||||
pub fn get_positive_flow_from(&self, v: Vertex) -> Result<Vec<Vertex>, String> {
|
pub fn get_positive_flow_from(&self, v: Vertex) -> Result<Vec<Vertex>, String> {
|
||||||
if !self.vertextoid.contains_key(&v) {
|
if !self.vertextoid.contains_key(&v) {
|
||||||
return Err("The graph does not contain the provided vertex.".to_string());
|
return Err("The graph does not contain the provided vertex.".to_string());
|
||||||
|
@ -110,7 +109,7 @@ impl Graph<FlowEdge> {
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
//This function returns the value of the flow incoming to v.
|
///This function returns the value of the flow incoming to v.
|
||||||
pub fn get_inflow(&self, v: Vertex) -> Result<i32, String> {
|
pub fn get_inflow(&self, v: Vertex) -> Result<i32, String> {
|
||||||
if !self.vertextoid.contains_key(&v) {
|
if !self.vertextoid.contains_key(&v) {
|
||||||
return Err("The graph does not contain the provided vertex.".to_string());
|
return Err("The graph does not contain the provided vertex.".to_string());
|
||||||
|
@ -123,7 +122,7 @@ impl Graph<FlowEdge> {
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
//This function returns the value of the flow outgoing from v.
|
///This function returns the value of the flow outgoing from v.
|
||||||
pub fn get_outflow(&self, v: Vertex) -> Result<i32, String> {
|
pub fn get_outflow(&self, v: Vertex) -> Result<i32, String> {
|
||||||
if !self.vertextoid.contains_key(&v) {
|
if !self.vertextoid.contains_key(&v) {
|
||||||
return Err("The graph does not contain the provided vertex.".to_string());
|
return Err("The graph does not contain the provided vertex.".to_string());
|
||||||
|
@ -136,14 +135,14 @@ impl Graph<FlowEdge> {
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
//This function computes the flow total value by computing the outgoing flow
|
///This function computes the flow total value by computing the outgoing flow
|
||||||
//from the source.
|
///from the source.
|
||||||
pub fn get_flow_value(&mut self) -> Result<i32, String> {
|
pub fn get_flow_value(&mut self) -> Result<i32, String> {
|
||||||
self.get_outflow(Vertex::Source)
|
self.get_outflow(Vertex::Source)
|
||||||
}
|
}
|
||||||
|
|
||||||
//This function shuffles the order of the edge lists. It keeps the ids of the
|
///This function shuffles the order of the edge lists. It keeps the ids of the
|
||||||
//reversed edges consistent.
|
///reversed edges consistent.
|
||||||
fn shuffle_edges(&mut self) {
|
fn shuffle_edges(&mut self) {
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
for i in 0..self.graph.len() {
|
for i in 0..self.graph.len() {
|
||||||
|
@ -157,7 +156,7 @@ impl Graph<FlowEdge> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Computes an upper bound of the flow n the graph
|
///Computes an upper bound of the flow on the graph
|
||||||
pub fn flow_upper_bound(&self) -> u32 {
|
pub fn flow_upper_bound(&self) -> u32 {
|
||||||
let idsource = self.vertextoid[&Vertex::Source];
|
let idsource = self.vertextoid[&Vertex::Source];
|
||||||
let mut flow_upper_bound = 0;
|
let mut flow_upper_bound = 0;
|
||||||
|
@ -167,9 +166,9 @@ impl Graph<FlowEdge> {
|
||||||
flow_upper_bound
|
flow_upper_bound
|
||||||
}
|
}
|
||||||
|
|
||||||
//This function computes the maximal flow using Dinic's algorithm. It starts with
|
///This function computes the maximal flow using Dinic's algorithm. It starts with
|
||||||
//the flow values already present in the graph. So it is possible to add some edge to
|
///the flow values already present in the graph. So it is possible to add some edge to
|
||||||
//the graph, compute a flow, add other edges, update the flow.
|
///the graph, compute a flow, add other edges, update the flow.
|
||||||
pub fn compute_maximal_flow(&mut self) -> Result<(), String> {
|
pub fn compute_maximal_flow(&mut self) -> Result<(), String> {
|
||||||
if !self.vertextoid.contains_key(&Vertex::Source) {
|
if !self.vertextoid.contains_key(&Vertex::Source) {
|
||||||
return Err("The graph does not contain a source.".to_string());
|
return Err("The graph does not contain a source.".to_string());
|
||||||
|
@ -270,11 +269,11 @@ impl Graph<FlowEdge> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
//This function takes a flow, and a cost function on the edges, and tries to find an
|
///This function takes a flow, and a cost function on the edges, and tries to find an
|
||||||
// equivalent flow with a better cost, by finding improving overflow cycles. It uses
|
/// equivalent flow with a better cost, by finding improving overflow cycles. It uses
|
||||||
// as subroutine the Bellman Ford algorithm run up to path_length.
|
/// as subroutine the Bellman Ford algorithm run up to path_length.
|
||||||
// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and only
|
/// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and
|
||||||
// one needs to be present in the cost function.
|
/// only one needs to be present in the cost function.
|
||||||
pub fn optimize_flow_with_cost(
|
pub fn optimize_flow_with_cost(
|
||||||
&mut self,
|
&mut self,
|
||||||
cost: &CostFunction,
|
cost: &CostFunction,
|
||||||
|
@ -309,7 +308,7 @@ impl Graph<FlowEdge> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
//Construct the weighted graph G_f from the flow and the cost function
|
///Construct the weighted graph G_f from the flow and the cost function
|
||||||
fn build_cost_graph(&self, cost: &CostFunction) -> Result<Graph<WeightedEdge>, String> {
|
fn build_cost_graph(&self, cost: &CostFunction) -> Result<Graph<WeightedEdge>, String> {
|
||||||
let mut g = Graph::<WeightedEdge>::new(&self.idtovertex);
|
let mut g = Graph::<WeightedEdge>::new(&self.idtovertex);
|
||||||
let nb_vertices = self.idtovertex.len();
|
let nb_vertices = self.idtovertex.len();
|
||||||
|
@ -334,7 +333,7 @@ impl Graph<FlowEdge> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Graph<WeightedEdge> {
|
impl Graph<WeightedEdge> {
|
||||||
//This function adds a single directed weighted edge to the graph.
|
///This function adds a single directed weighted edge to the graph.
|
||||||
pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i32) -> Result<(), String> {
|
pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i32) -> Result<(), String> {
|
||||||
if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) {
|
if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) {
|
||||||
return Err("The graph does not contain the provided vertex.".to_string());
|
return Err("The graph does not contain the provided vertex.".to_string());
|
||||||
|
@ -345,12 +344,12 @@ impl Graph<WeightedEdge> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
//This function lists the negative cycles it manages to find after path_length
|
///This function lists the negative cycles it manages to find after path_length
|
||||||
//iterations of the main loop of the Bellman-Ford algorithm. For the classical
|
///iterations of the main loop of the Bellman-Ford algorithm. For the classical
|
||||||
//algorithm, path_length needs to be equal to the number of vertices. However,
|
///algorithm, path_length needs to be equal to the number of vertices. However,
|
||||||
//for particular graph structures like our case, the algorithm is still correct
|
///for particular graph structures like in our case, the algorithm is still correct
|
||||||
//when path_length is the length of the longest possible simple path.
|
///when path_length is the length of the longest possible simple path.
|
||||||
//See the formal description of the algorithm for more details.
|
///See the formal description of the algorithm for more details.
|
||||||
fn list_negative_cycles(&self, path_length: usize) -> Vec<Vec<Vertex>> {
|
fn list_negative_cycles(&self, path_length: usize) -> Vec<Vec<Vertex>> {
|
||||||
let nb_vertices = self.graph.len();
|
let nb_vertices = self.graph.len();
|
||||||
|
|
||||||
|
@ -384,8 +383,8 @@ impl Graph<WeightedEdge> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//This function returns the list of cycles of a directed 1 forest. It does not
|
///This function returns the list of cycles of a directed 1 forest. It does not
|
||||||
//check for the consistency of the input.
|
///check for the consistency of the input.
|
||||||
fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> {
|
fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> {
|
||||||
let mut cycles = Vec::<Vec<usize>>::new();
|
let mut cycles = Vec::<Vec<usize>>::new();
|
||||||
let mut time_of_discovery = vec![None; forest.len()];
|
let mut time_of_discovery = vec![None; forest.len()];
|
||||||
|
|
|
@ -17,6 +17,8 @@ use crate::ring::*;
|
||||||
|
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
|
|
||||||
|
const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
|
||||||
|
|
||||||
//The Message type will be used to collect information on the algorithm.
|
//The Message type will be used to collect information on the algorithm.
|
||||||
type Message = Vec<String>;
|
type Message = Vec<String>;
|
||||||
|
|
||||||
|
@ -28,9 +30,11 @@ pub struct ClusterLayout {
|
||||||
|
|
||||||
pub replication_factor: usize,
|
pub replication_factor: usize,
|
||||||
|
|
||||||
//This attribute is only used to retain the previously computed partition size,
|
///This attribute is only used to retain the previously computed partition size,
|
||||||
//to know to what extent does it change with the layout update.
|
///to know to what extent does it change with the layout update.
|
||||||
pub partition_size: u32,
|
pub partition_size: u32,
|
||||||
|
///Parameters used to compute the assignation currently given by
|
||||||
|
///ring_assignation_data
|
||||||
pub parameters: LayoutParameters,
|
pub parameters: LayoutParameters,
|
||||||
|
|
||||||
pub roles: LwwMap<Uuid, NodeRoleV>,
|
pub roles: LwwMap<Uuid, NodeRoleV>,
|
||||||
|
@ -48,8 +52,9 @@ pub struct ClusterLayout {
|
||||||
#[serde(with = "serde_bytes")]
|
#[serde(with = "serde_bytes")]
|
||||||
pub ring_assignation_data: Vec<CompactNodeType>,
|
pub ring_assignation_data: Vec<CompactNodeType>,
|
||||||
|
|
||||||
/// Role changes which are staged for the next version of the layout
|
/// Parameters to be used in the next partition assignation computation.
|
||||||
pub staged_parameters: Lww<LayoutParameters>,
|
pub staged_parameters: Lww<LayoutParameters>,
|
||||||
|
/// Role changes which are staged for the next version of the layout
|
||||||
pub staging: LwwMap<Uuid, NodeRoleV>,
|
pub staging: LwwMap<Uuid, NodeRoleV>,
|
||||||
pub staging_hash: Hash,
|
pub staging_hash: Hash,
|
||||||
}
|
}
|
||||||
|
@ -65,8 +70,6 @@ impl AutoCrdt for LayoutParameters {
|
||||||
const WARN_IF_DIFFERENT: bool = true;
|
const WARN_IF_DIFFERENT: bool = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
|
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct NodeRoleV(pub Option<NodeRole>);
|
pub struct NodeRoleV(pub Option<NodeRole>);
|
||||||
|
|
||||||
|
@ -77,12 +80,13 @@ impl AutoCrdt for NodeRoleV {
|
||||||
/// The user-assigned roles of cluster nodes
|
/// The user-assigned roles of cluster nodes
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct NodeRole {
|
pub struct NodeRole {
|
||||||
/// Datacenter at which this entry belong. This information might be used to perform a better
|
/// Datacenter at which this entry belong. This information is used to
|
||||||
/// geodistribution
|
/// perform a better geodistribution
|
||||||
pub zone: String,
|
pub zone: String,
|
||||||
/// The (relative) capacity of the node
|
/// The capacity of the node
|
||||||
/// If this is set to None, the node does not participate in storing data for the system
|
/// If this is set to None, the node does not participate in storing data for the system
|
||||||
/// and is only active as an API gateway to other nodes
|
/// and is only active as an API gateway to other nodes
|
||||||
|
// TODO : change the capacity to u64 and use byte unit input/output
|
||||||
pub capacity: Option<u32>,
|
pub capacity: Option<u32>,
|
||||||
/// A set of tags to recognize the node
|
/// A set of tags to recognize the node
|
||||||
pub tags: Vec<String>,
|
pub tags: Vec<String>,
|
||||||
|
@ -110,6 +114,7 @@ impl NodeRole {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Implementation of the ClusterLayout methods unrelated to the assignation algorithm.
|
||||||
impl ClusterLayout {
|
impl ClusterLayout {
|
||||||
pub fn new(replication_factor: usize) -> Self {
|
pub fn new(replication_factor: usize) -> Self {
|
||||||
//We set the default zone redundancy to be equal to the replication factor,
|
//We set the default zone redundancy to be equal to the replication factor,
|
||||||
|
@ -231,7 +236,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
|
|
||||||
///Returns the uuids of the non_gateway nodes in self.node_id_vec.
|
///Returns the uuids of the non_gateway nodes in self.node_id_vec.
|
||||||
pub fn useful_nodes(&self) -> Vec<Uuid> {
|
pub fn nongateway_nodes(&self) -> Vec<Uuid> {
|
||||||
let mut result = Vec::<Uuid>::new();
|
let mut result = Vec::<Uuid>::new();
|
||||||
for uuid in self.node_id_vec.iter() {
|
for uuid in self.node_id_vec.iter() {
|
||||||
match self.node_role(uuid) {
|
match self.node_role(uuid) {
|
||||||
|
@ -291,13 +296,14 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
///Returns the sum of capacities of non gateway nodes in the cluster
|
///Returns the sum of capacities of non gateway nodes in the cluster
|
||||||
pub fn get_total_capacity(&self) -> Result<u32, Error> {
|
pub fn get_total_capacity(&self) -> Result<u32, Error> {
|
||||||
let mut total_capacity = 0;
|
let mut total_capacity = 0;
|
||||||
for uuid in self.useful_nodes().iter() {
|
for uuid in self.nongateway_nodes().iter() {
|
||||||
total_capacity += self.get_node_capacity(uuid)?;
|
total_capacity += self.get_node_capacity(uuid)?;
|
||||||
}
|
}
|
||||||
Ok(total_capacity)
|
Ok(total_capacity)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check a cluster layout for internal consistency
|
/// Check a cluster layout for internal consistency
|
||||||
|
/// (assignation, roles, parameters, partition size)
|
||||||
/// returns true if consistent, false if error
|
/// returns true if consistent, false if error
|
||||||
pub fn check(&self) -> bool {
|
pub fn check(&self) -> bool {
|
||||||
// Check that the hash of the staging data is correct
|
// Check that the hash of the staging data is correct
|
||||||
|
@ -377,7 +383,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
//Check that the partition size stored is the one computed by the asignation
|
//Check that the partition size stored is the one computed by the asignation
|
||||||
//algorithm.
|
//algorithm.
|
||||||
let cl2 = self.clone();
|
let cl2 = self.clone();
|
||||||
let (_, zone_to_id) = cl2.generate_useful_zone_ids().expect("Critical Error");
|
let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().expect("Critical Error");
|
||||||
match cl2.compute_optimal_partition_size(&zone_to_id) {
|
match cl2.compute_optimal_partition_size(&zone_to_id) {
|
||||||
Ok(s) if s != self.partition_size => return false,
|
Ok(s) if s != self.partition_size => return false,
|
||||||
Err(_) => return false,
|
Err(_) => return false,
|
||||||
|
@ -388,6 +394,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Implementation of the ClusterLayout methods related to the assignation algorithm.
|
||||||
impl ClusterLayout {
|
impl ClusterLayout {
|
||||||
/// This function calculates a new partition-to-node assignation.
|
/// This function calculates a new partition-to-node assignation.
|
||||||
/// The computed assignation respects the node replication factor
|
/// The computed assignation respects the node replication factor
|
||||||
|
@ -397,16 +404,13 @@ impl ClusterLayout {
|
||||||
/// the former assignation (if any) to minimize the amount of
|
/// the former assignation (if any) to minimize the amount of
|
||||||
/// data to be moved.
|
/// data to be moved.
|
||||||
// Staged role changes must be merged with nodes roles before calling this function,
|
// Staged role changes must be merged with nodes roles before calling this function,
|
||||||
// hence it must only be called from apply_staged_changes() and it is not public.
|
// hence it must only be called from apply_staged_changes() and hence is not public.
|
||||||
fn calculate_partition_assignation(&mut self) -> Result<Message, Error> {
|
fn calculate_partition_assignation(&mut self) -> Result<Message, Error> {
|
||||||
//The nodes might have been updated, some might have been deleted.
|
|
||||||
//So we need to first update the list of nodes and retrieve the
|
|
||||||
//assignation.
|
|
||||||
|
|
||||||
//We update the node ids, since the node role list might have changed with the
|
//We update the node ids, since the node role list might have changed with the
|
||||||
//changes in the layout. We retrieve the old_assignation reframed with the new ids
|
//changes in the layout. We retrieve the old_assignation reframed with new ids
|
||||||
let old_assignation_opt = self.update_node_id_vec()?;
|
let old_assignation_opt = self.update_node_id_vec()?;
|
||||||
|
|
||||||
|
//We update the parameters
|
||||||
self.parameters = self.staged_parameters.get().clone();
|
self.parameters = self.staged_parameters.get().clone();
|
||||||
|
|
||||||
let mut msg = Message::new();
|
let mut msg = Message::new();
|
||||||
|
@ -420,14 +424,14 @@ impl ClusterLayout {
|
||||||
|
|
||||||
//We generate for once numerical ids for the zones of non gateway nodes,
|
//We generate for once numerical ids for the zones of non gateway nodes,
|
||||||
//to use them as indices in the flow graphs.
|
//to use them as indices in the flow graphs.
|
||||||
let (id_to_zone, zone_to_id) = self.generate_useful_zone_ids()?;
|
let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
|
||||||
|
|
||||||
let nb_useful_nodes = self.useful_nodes().len();
|
let nb_nongateway_nodes = self.nongateway_nodes().len();
|
||||||
if nb_useful_nodes < self.replication_factor {
|
if nb_nongateway_nodes < self.replication_factor {
|
||||||
return Err(Error::Message(format!(
|
return Err(Error::Message(format!(
|
||||||
"The number of nodes with positive \
|
"The number of nodes with positive \
|
||||||
capacity ({}) is smaller than the replication factor ({}).",
|
capacity ({}) is smaller than the replication factor ({}).",
|
||||||
nb_useful_nodes, self.replication_factor
|
nb_nongateway_nodes, self.replication_factor
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
if id_to_zone.len() < self.parameters.zone_redundancy {
|
if id_to_zone.len() < self.parameters.zone_redundancy {
|
||||||
|
@ -457,6 +461,7 @@ impl ClusterLayout {
|
||||||
partition_size
|
partition_size
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
//We write the partition size.
|
||||||
self.partition_size = partition_size;
|
self.partition_size = partition_size;
|
||||||
|
|
||||||
if partition_size < 100 {
|
if partition_size < 100 {
|
||||||
|
@ -467,14 +472,15 @@ impl ClusterLayout {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
//We compute a first flow/assignment that is heuristically close to the previous
|
//We compute a first flow/assignation that is heuristically close to the previous
|
||||||
//assignment
|
//assignation
|
||||||
let mut gflow = self.compute_candidate_assignment(&zone_to_id, &old_assignation_opt)?;
|
let mut gflow = self.compute_candidate_assignation(&zone_to_id, &old_assignation_opt)?;
|
||||||
if let Some(assoc) = &old_assignation_opt {
|
if let Some(assoc) = &old_assignation_opt {
|
||||||
//We minimize the distance to the previous assignment.
|
//We minimize the distance to the previous assignation.
|
||||||
self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?;
|
self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//We display statistics of the computation
|
||||||
msg.append(&mut self.output_stat(
|
msg.append(&mut self.output_stat(
|
||||||
&gflow,
|
&gflow,
|
||||||
&old_assignation_opt,
|
&old_assignation_opt,
|
||||||
|
@ -538,14 +544,13 @@ impl ClusterLayout {
|
||||||
// (2) We retrieve the old association
|
// (2) We retrieve the old association
|
||||||
//We rewrite the old association with the new indices. We only consider partition
|
//We rewrite the old association with the new indices. We only consider partition
|
||||||
//to node assignations where the node is still in use.
|
//to node assignations where the node is still in use.
|
||||||
let nb_partitions = 1usize << PARTITION_BITS;
|
let mut old_assignation = vec![Vec::<usize>::new(); NB_PARTITIONS];
|
||||||
let mut old_assignation = vec![Vec::<usize>::new(); nb_partitions];
|
|
||||||
|
|
||||||
if self.ring_assignation_data.is_empty() {
|
if self.ring_assignation_data.is_empty() {
|
||||||
//This is a new association
|
//This is a new association
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
if self.ring_assignation_data.len() != nb_partitions * self.replication_factor {
|
if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor {
|
||||||
return Err(Error::Message(
|
return Err(Error::Message(
|
||||||
"The old assignation does not have a size corresponding to \
|
"The old assignation does not have a size corresponding to \
|
||||||
the old replication factor or the number of partitions."
|
the old replication factor or the number of partitions."
|
||||||
|
@ -580,11 +585,11 @@ impl ClusterLayout {
|
||||||
|
|
||||||
///This function generates ids for the zone of the nodes appearing in
|
///This function generates ids for the zone of the nodes appearing in
|
||||||
///self.node_id_vec.
|
///self.node_id_vec.
|
||||||
fn generate_useful_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> {
|
fn generate_nongateway_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> {
|
||||||
let mut id_to_zone = Vec::<String>::new();
|
let mut id_to_zone = Vec::<String>::new();
|
||||||
let mut zone_to_id = HashMap::<String, usize>::new();
|
let mut zone_to_id = HashMap::<String, usize>::new();
|
||||||
|
|
||||||
for uuid in self.useful_nodes().iter() {
|
for uuid in self.nongateway_nodes().iter() {
|
||||||
if self.roles.get(uuid) == None {
|
if self.roles.get(uuid) == None {
|
||||||
return Err(Error::Message(
|
return Err(Error::Message(
|
||||||
"The uuid was not found in the node roles (this should \
|
"The uuid was not found in the node roles (this should \
|
||||||
|
@ -603,17 +608,16 @@ impl ClusterLayout {
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function computes by dichotomy the largest realizable partition size, given
|
///This function computes by dichotomy the largest realizable partition size, given
|
||||||
///the layout.
|
///the layout roles and parameters.
|
||||||
fn compute_optimal_partition_size(
|
fn compute_optimal_partition_size(
|
||||||
&self,
|
&self,
|
||||||
zone_to_id: &HashMap<String, usize>,
|
zone_to_id: &HashMap<String, usize>,
|
||||||
) -> Result<u32, Error> {
|
) -> Result<u32, Error> {
|
||||||
let nb_partitions = 1usize << PARTITION_BITS;
|
|
||||||
let empty_set = HashSet::<(usize, usize)>::new();
|
let empty_set = HashSet::<(usize, usize)>::new();
|
||||||
let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?;
|
let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?;
|
||||||
g.compute_maximal_flow()?;
|
g.compute_maximal_flow()?;
|
||||||
if g.get_flow_value()?
|
if g.get_flow_value()?
|
||||||
< (nb_partitions * self.replication_factor)
|
< (NB_PARTITIONS * self.replication_factor)
|
||||||
.try_into()
|
.try_into()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
{
|
{
|
||||||
|
@ -630,7 +634,7 @@ impl ClusterLayout {
|
||||||
g = self.generate_flow_graph((s_down + s_up) / 2, zone_to_id, &empty_set)?;
|
g = self.generate_flow_graph((s_down + s_up) / 2, zone_to_id, &empty_set)?;
|
||||||
g.compute_maximal_flow()?;
|
g.compute_maximal_flow()?;
|
||||||
if g.get_flow_value()?
|
if g.get_flow_value()?
|
||||||
< (nb_partitions * self.replication_factor)
|
< (NB_PARTITIONS * self.replication_factor)
|
||||||
.try_into()
|
.try_into()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
{
|
{
|
||||||
|
@ -658,14 +662,21 @@ impl ClusterLayout {
|
||||||
vertices
|
vertices
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///Generates the graph to compute the maximal flow corresponding to the optimal
|
||||||
|
///partition assignation.
|
||||||
|
///exclude_assoc is the set of (partition, node) association that we are forbidden
|
||||||
|
///to use (hence we do not add the corresponding edge to the graph). This parameter
|
||||||
|
///is used to compute a first flow that uses only edges appearing in the previous
|
||||||
|
///assignation. This produces a solution that heuristically should be close to the
|
||||||
|
///previous one.
|
||||||
fn generate_flow_graph(
|
fn generate_flow_graph(
|
||||||
&self,
|
&self,
|
||||||
size: u32,
|
partition_size: u32,
|
||||||
zone_to_id: &HashMap<String, usize>,
|
zone_to_id: &HashMap<String, usize>,
|
||||||
exclude_assoc: &HashSet<(usize, usize)>,
|
exclude_assoc: &HashSet<(usize, usize)>,
|
||||||
) -> Result<Graph<FlowEdge>, Error> {
|
) -> Result<Graph<FlowEdge>, Error> {
|
||||||
let vertices =
|
let vertices =
|
||||||
ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.useful_nodes().len());
|
ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
|
||||||
let mut g = Graph::<FlowEdge>::new(&vertices);
|
let mut g = Graph::<FlowEdge>::new(&vertices);
|
||||||
let nb_zones = zone_to_id.len();
|
let nb_zones = zone_to_id.len();
|
||||||
let redundancy = self.parameters.zone_redundancy;
|
let redundancy = self.parameters.zone_redundancy;
|
||||||
|
@ -685,10 +696,10 @@ impl ClusterLayout {
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for n in 0..self.useful_nodes().len() {
|
for n in 0..self.nongateway_nodes().len() {
|
||||||
let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?;
|
let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?;
|
||||||
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?];
|
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?];
|
||||||
g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / size)?;
|
g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
|
||||||
for p in 0..NB_PARTITIONS {
|
for p in 0..NB_PARTITIONS {
|
||||||
if !exclude_assoc.contains(&(p, n)) {
|
if !exclude_assoc.contains(&(p, n)) {
|
||||||
g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?;
|
g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?;
|
||||||
|
@ -698,28 +709,34 @@ impl ClusterLayout {
|
||||||
Ok(g)
|
Ok(g)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn compute_candidate_assignment(
|
///This function computes a first optimal assignation (in the form of a flow graph).
|
||||||
|
fn compute_candidate_assignation(
|
||||||
&self,
|
&self,
|
||||||
zone_to_id: &HashMap<String, usize>,
|
zone_to_id: &HashMap<String, usize>,
|
||||||
old_assoc_opt: &Option<Vec<Vec<usize>>>,
|
prev_assign_opt: &Option<Vec<Vec<usize>>>,
|
||||||
) -> Result<Graph<FlowEdge>, Error> {
|
) -> Result<Graph<FlowEdge>, Error> {
|
||||||
//We list the edges that are not used in the old association
|
//We list the (partition,node) associations that are not used in the
|
||||||
|
//previous assignation
|
||||||
let mut exclude_edge = HashSet::<(usize, usize)>::new();
|
let mut exclude_edge = HashSet::<(usize, usize)>::new();
|
||||||
if let Some(old_assoc) = old_assoc_opt {
|
if let Some(prev_assign) = prev_assign_opt {
|
||||||
let nb_nodes = self.useful_nodes().len();
|
let nb_nodes = self.nongateway_nodes().len();
|
||||||
for (p, old_assoc_p) in old_assoc.iter().enumerate() {
|
for (p, prev_assign_p) in prev_assign.iter().enumerate() {
|
||||||
for n in 0..nb_nodes {
|
for n in 0..nb_nodes {
|
||||||
exclude_edge.insert((p, n));
|
exclude_edge.insert((p, n));
|
||||||
}
|
}
|
||||||
for n in old_assoc_p.iter() {
|
for n in prev_assign_p.iter() {
|
||||||
exclude_edge.remove(&(p, *n));
|
exclude_edge.remove(&(p, *n));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//We compute the best flow using only the edges used in the old assoc
|
//We compute the best flow using only the edges used in the previous assignation
|
||||||
let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?;
|
let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?;
|
||||||
g.compute_maximal_flow()?;
|
g.compute_maximal_flow()?;
|
||||||
|
|
||||||
|
//We add the excluded edges and compute the maximal flow with the full graph.
|
||||||
|
//The algorithm is such that it will start with the flow that we just computed
|
||||||
|
//and find ameliorating paths from that.
|
||||||
for (p, n) in exclude_edge.iter() {
|
for (p, n) in exclude_edge.iter() {
|
||||||
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
|
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
|
||||||
g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
|
g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
|
||||||
|
@ -728,26 +745,35 @@ impl ClusterLayout {
|
||||||
Ok(g)
|
Ok(g)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///This function updates the flow graph gflow to minimize the distance between
|
||||||
|
///its corresponding assignation and the previous one
|
||||||
fn minimize_rebalance_load(
|
fn minimize_rebalance_load(
|
||||||
&self,
|
&self,
|
||||||
gflow: &mut Graph<FlowEdge>,
|
gflow: &mut Graph<FlowEdge>,
|
||||||
zone_to_id: &HashMap<String, usize>,
|
zone_to_id: &HashMap<String, usize>,
|
||||||
old_assoc: &[Vec<usize>],
|
prev_assign: &[Vec<usize>],
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
//We define a cost function on the edges (pairs of vertices) corresponding
|
||||||
|
//to the distance between the two assignations.
|
||||||
let mut cost = CostFunction::new();
|
let mut cost = CostFunction::new();
|
||||||
for (p, assoc_p) in old_assoc.iter().enumerate() {
|
for (p, assoc_p) in prev_assign.iter().enumerate() {
|
||||||
for n in assoc_p.iter() {
|
for n in assoc_p.iter() {
|
||||||
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
|
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
|
||||||
cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1);
|
cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let nb_nodes = self.useful_nodes().len();
|
|
||||||
|
//We compute the maximal length of a simple path in gflow. It is used in the
|
||||||
|
//Bellman-Ford algorithm in optimize_flow_with_cost to set the number
|
||||||
|
//of iterations.
|
||||||
|
let nb_nodes = self.nongateway_nodes().len();
|
||||||
let path_length = 4 * nb_nodes;
|
let path_length = 4 * nb_nodes;
|
||||||
gflow.optimize_flow_with_cost(&cost, path_length)?;
|
gflow.optimize_flow_with_cost(&cost, path_length)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///This function updates the assignation ring from the flow graph.
|
||||||
fn update_ring_from_flow(
|
fn update_ring_from_flow(
|
||||||
&mut self,
|
&mut self,
|
||||||
nb_zones: usize,
|
nb_zones: usize,
|
||||||
|
@ -775,19 +801,18 @@ impl ClusterLayout {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
//This function returns a message summing up the partition repartition of the new
|
///This function returns a message summing up the partition repartition of the new
|
||||||
//layout.
|
///layout, and other statistics of the partition assignation computation.
|
||||||
fn output_stat(
|
fn output_stat(
|
||||||
&self,
|
&self,
|
||||||
gflow: &Graph<FlowEdge>,
|
gflow: &Graph<FlowEdge>,
|
||||||
old_assoc_opt: &Option<Vec<Vec<usize>>>,
|
prev_assign_opt: &Option<Vec<Vec<usize>>>,
|
||||||
zone_to_id: &HashMap<String, usize>,
|
zone_to_id: &HashMap<String, usize>,
|
||||||
id_to_zone: &[String],
|
id_to_zone: &[String],
|
||||||
) -> Result<Message, Error> {
|
) -> Result<Message, Error> {
|
||||||
let mut msg = Message::new();
|
let mut msg = Message::new();
|
||||||
|
|
||||||
let nb_partitions = 1usize << PARTITION_BITS;
|
let used_cap = self.partition_size * NB_PARTITIONS as u32 * self.replication_factor as u32;
|
||||||
let used_cap = self.partition_size * nb_partitions as u32 * self.replication_factor as u32;
|
|
||||||
let total_cap = self.get_total_capacity()?;
|
let total_cap = self.get_total_capacity()?;
|
||||||
let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32);
|
let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32);
|
||||||
msg.push("".into());
|
msg.push("".into());
|
||||||
|
@ -813,21 +838,21 @@ impl ClusterLayout {
|
||||||
));
|
));
|
||||||
|
|
||||||
//We define and fill in the following tables
|
//We define and fill in the following tables
|
||||||
let storing_nodes = self.useful_nodes();
|
let storing_nodes = self.nongateway_nodes();
|
||||||
let mut new_partitions = vec![0; storing_nodes.len()];
|
let mut new_partitions = vec![0; storing_nodes.len()];
|
||||||
let mut stored_partitions = vec![0; storing_nodes.len()];
|
let mut stored_partitions = vec![0; storing_nodes.len()];
|
||||||
|
|
||||||
let mut new_partitions_zone = vec![0; id_to_zone.len()];
|
let mut new_partitions_zone = vec![0; id_to_zone.len()];
|
||||||
let mut stored_partitions_zone = vec![0; id_to_zone.len()];
|
let mut stored_partitions_zone = vec![0; id_to_zone.len()];
|
||||||
|
|
||||||
for p in 0..nb_partitions {
|
for p in 0..NB_PARTITIONS {
|
||||||
for z in 0..id_to_zone.len() {
|
for z in 0..id_to_zone.len() {
|
||||||
let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p, z))?;
|
let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p, z))?;
|
||||||
if !pz_nodes.is_empty() {
|
if !pz_nodes.is_empty() {
|
||||||
stored_partitions_zone[z] += 1;
|
stored_partitions_zone[z] += 1;
|
||||||
if let Some(old_assoc) = old_assoc_opt {
|
if let Some(prev_assign) = prev_assign_opt {
|
||||||
let mut old_zones_of_p = Vec::<usize>::new();
|
let mut old_zones_of_p = Vec::<usize>::new();
|
||||||
for n in old_assoc[p].iter() {
|
for n in prev_assign[p].iter() {
|
||||||
old_zones_of_p
|
old_zones_of_p
|
||||||
.push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]);
|
.push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]);
|
||||||
}
|
}
|
||||||
|
@ -839,8 +864,8 @@ impl ClusterLayout {
|
||||||
for vert in pz_nodes.iter() {
|
for vert in pz_nodes.iter() {
|
||||||
if let Vertex::N(n) = *vert {
|
if let Vertex::N(n) = *vert {
|
||||||
stored_partitions[n] += 1;
|
stored_partitions[n] += 1;
|
||||||
if let Some(old_assoc) = old_assoc_opt {
|
if let Some(prev_assign) = prev_assign_opt {
|
||||||
if !old_assoc[p].contains(&n) {
|
if !prev_assign[p].contains(&n) {
|
||||||
new_partitions[n] += 1;
|
new_partitions[n] += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -849,7 +874,7 @@ impl ClusterLayout {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if *old_assoc_opt == None {
|
if *prev_assign_opt == None {
|
||||||
new_partitions = stored_partitions.clone();
|
new_partitions = stored_partitions.clone();
|
||||||
new_partitions_zone = stored_partitions_zone.clone();
|
new_partitions_zone = stored_partitions_zone.clone();
|
||||||
}
|
}
|
||||||
|
@ -857,7 +882,7 @@ impl ClusterLayout {
|
||||||
//We display the statistics
|
//We display the statistics
|
||||||
|
|
||||||
msg.push("".into());
|
msg.push("".into());
|
||||||
if *old_assoc_opt != None {
|
if *prev_assign_opt != None {
|
||||||
let total_new_partitions: usize = new_partitions.iter().sum();
|
let total_new_partitions: usize = new_partitions.iter().sum();
|
||||||
msg.push(format!(
|
msg.push(format!(
|
||||||
"A total of {} new copies of partitions need to be \
|
"A total of {} new copies of partitions need to be \
|
||||||
|
@ -950,9 +975,8 @@ mod tests {
|
||||||
fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> {
|
fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> {
|
||||||
let over_size = cl.partition_size + 1;
|
let over_size = cl.partition_size + 1;
|
||||||
let mut zone_token = HashMap::<String, usize>::new();
|
let mut zone_token = HashMap::<String, usize>::new();
|
||||||
let nb_partitions = 1usize << PARTITION_BITS;
|
|
||||||
|
|
||||||
let (zones, zone_to_id) = cl.generate_useful_zone_ids()?;
|
let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?;
|
||||||
|
|
||||||
if zones.is_empty() {
|
if zones.is_empty() {
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
|
@ -961,12 +985,12 @@ mod tests {
|
||||||
for z in zones.iter() {
|
for z in zones.iter() {
|
||||||
zone_token.insert(z.clone(), 0);
|
zone_token.insert(z.clone(), 0);
|
||||||
}
|
}
|
||||||
for uuid in cl.useful_nodes().iter() {
|
for uuid in cl.nongateway_nodes().iter() {
|
||||||
let z = cl.get_node_zone(uuid)?;
|
let z = cl.get_node_zone(uuid)?;
|
||||||
let c = cl.get_node_capacity(uuid)?;
|
let c = cl.get_node_capacity(uuid)?;
|
||||||
zone_token.insert(
|
zone_token.insert(
|
||||||
z.clone(),
|
z.clone(),
|
||||||
zone_token[&z] + min(nb_partitions, (c / over_size) as usize),
|
zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -978,15 +1002,15 @@ mod tests {
|
||||||
id_zone_token[zone_to_id[z]] = *t;
|
id_zone_token[zone_to_id[z]] = *t;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut nb_token = vec![0; nb_partitions];
|
let mut nb_token = vec![0; NB_PARTITIONS];
|
||||||
let mut last_zone = vec![zones.len(); nb_partitions];
|
let mut last_zone = vec![zones.len(); NB_PARTITIONS];
|
||||||
|
|
||||||
let mut curr_zone = 0;
|
let mut curr_zone = 0;
|
||||||
|
|
||||||
let redundancy = cl.parameters.zone_redundancy;
|
let redundancy = cl.parameters.zone_redundancy;
|
||||||
|
|
||||||
for replic in 0..cl.replication_factor {
|
for replic in 0..cl.replication_factor {
|
||||||
for p in 0..nb_partitions {
|
for p in 0..NB_PARTITIONS {
|
||||||
while id_zone_token[curr_zone] == 0
|
while id_zone_token[curr_zone] == 0
|
||||||
|| (last_zone[p] == curr_zone
|
|| (last_zone[p] == curr_zone
|
||||||
&& redundancy - nb_token[p] <= cl.replication_factor - replic)
|
&& redundancy - nb_token[p] <= cl.replication_factor - replic)
|
||||||
|
|
Loading…
Reference in a new issue