Garage v0.9 #473
5 changed files with 271 additions and 277 deletions
|
@ -86,7 +86,7 @@ fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse {
|
||||||
.map(|(k, _, v)| (hex::encode(k), v.0.clone()))
|
.map(|(k, _, v)| (hex::encode(k), v.0.clone()))
|
||||||
.collect(),
|
.collect(),
|
||||||
staged_role_changes: layout
|
staged_role_changes: layout
|
||||||
.staging
|
.staging_roles
|
||||||
.items()
|
.items()
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(k, _, v)| layout.roles.get(k) != Some(v))
|
.filter(|(k, _, v)| layout.roles.get(k) != Some(v))
|
||||||
|
@ -137,14 +137,14 @@ pub async fn handle_update_cluster_layout(
|
||||||
let mut layout = garage.system.get_cluster_layout();
|
let mut layout = garage.system.get_cluster_layout();
|
||||||
|
|
||||||
let mut roles = layout.roles.clone();
|
let mut roles = layout.roles.clone();
|
||||||
roles.merge(&layout.staging);
|
roles.merge(&layout.staging_roles);
|
||||||
|
|
||||||
for (node, role) in updates {
|
for (node, role) in updates {
|
||||||
let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?;
|
let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?;
|
||||||
let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
|
let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
|
||||||
|
|
||||||
layout
|
layout
|
||||||
.staging
|
.staging_roles
|
||||||
.merge(&roles.update_mutator(node, NodeRoleV(role)));
|
.merge(&roles.update_mutator(node, NodeRoleV(role)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -71,7 +71,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
let new_role = match layout.staging.get(&adv.id) {
|
let new_role = match layout.staging_roles.get(&adv.id) {
|
||||||
Some(NodeRoleV(Some(_))) => "(pending)",
|
Some(NodeRoleV(Some(_))) => "(pending)",
|
||||||
_ => "NO ROLE ASSIGNED",
|
_ => "NO ROLE ASSIGNED",
|
||||||
};
|
};
|
||||||
|
|
|
@ -63,14 +63,14 @@ pub async fn cmd_assign_role(
|
||||||
.collect::<Result<Vec<_>, _>>()?;
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
let mut roles = layout.roles.clone();
|
let mut roles = layout.roles.clone();
|
||||||
roles.merge(&layout.staging);
|
roles.merge(&layout.staging_roles);
|
||||||
|
|
||||||
for replaced in args.replace.iter() {
|
for replaced in args.replace.iter() {
|
||||||
let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?;
|
let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?;
|
||||||
match roles.get(&replaced_node) {
|
match roles.get(&replaced_node) {
|
||||||
Some(NodeRoleV(Some(_))) => {
|
Some(NodeRoleV(Some(_))) => {
|
||||||
layout
|
layout
|
||||||
.staging
|
.staging_roles
|
||||||
.merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
|
.merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
|
@ -128,7 +128,7 @@ pub async fn cmd_assign_role(
|
||||||
};
|
};
|
||||||
|
|
||||||
layout
|
layout
|
||||||
.staging
|
.staging_roles
|
||||||
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
|
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,13 +148,13 @@ pub async fn cmd_remove_role(
|
||||||
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
|
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
|
||||||
|
|
||||||
let mut roles = layout.roles.clone();
|
let mut roles = layout.roles.clone();
|
||||||
roles.merge(&layout.staging);
|
roles.merge(&layout.staging_roles);
|
||||||
|
|
||||||
let deleted_node =
|
let deleted_node =
|
||||||
find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;
|
find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;
|
||||||
|
|
||||||
layout
|
layout
|
||||||
.staging
|
.staging_roles
|
||||||
.merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));
|
.merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));
|
||||||
|
|
||||||
send_layout(rpc_cli, rpc_host, layout).await?;
|
send_layout(rpc_cli, rpc_host, layout).await?;
|
||||||
|
@ -278,7 +278,7 @@ pub async fn cmd_config_layout(
|
||||||
println!("The zone redundancy must be at least 1.");
|
println!("The zone redundancy must be at least 1.");
|
||||||
} else {
|
} else {
|
||||||
layout
|
layout
|
||||||
.staged_parameters
|
.staging_parameters
|
||||||
.update(LayoutParameters { zone_redundancy: r });
|
.update(LayoutParameters { zone_redundancy: r });
|
||||||
println!("The new zone redundancy has been saved ({}).", r);
|
println!("The new zone redundancy has been saved ({}).", r);
|
||||||
}
|
}
|
||||||
|
@ -352,13 +352,13 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn print_staging_parameters_changes(layout: &ClusterLayout) -> bool {
|
pub fn print_staging_parameters_changes(layout: &ClusterLayout) -> bool {
|
||||||
let has_changes = layout.staged_parameters.get().clone() != layout.parameters;
|
let has_changes = layout.staging_parameters.get().clone() != layout.parameters;
|
||||||
if has_changes {
|
if has_changes {
|
||||||
println!();
|
println!();
|
||||||
println!("==== NEW LAYOUT PARAMETERS ====");
|
println!("==== NEW LAYOUT PARAMETERS ====");
|
||||||
println!(
|
println!(
|
||||||
"Zone redundancy: {}",
|
"Zone redundancy: {}",
|
||||||
layout.staged_parameters.get().zone_redundancy
|
layout.staging_parameters.get().zone_redundancy
|
||||||
);
|
);
|
||||||
println!();
|
println!();
|
||||||
}
|
}
|
||||||
|
@ -367,7 +367,7 @@ pub fn print_staging_parameters_changes(layout: &ClusterLayout) -> bool {
|
||||||
|
|
||||||
pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
|
pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
|
||||||
let has_changes = layout
|
let has_changes = layout
|
||||||
.staging
|
.staging_roles
|
||||||
.items()
|
.items()
|
||||||
.iter()
|
.iter()
|
||||||
.any(|(k, _, v)| layout.roles.get(k) != Some(v));
|
.any(|(k, _, v)| layout.roles.get(k) != Some(v));
|
||||||
|
@ -376,7 +376,7 @@ pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
|
||||||
println!();
|
println!();
|
||||||
println!("==== STAGED ROLE CHANGES ====");
|
println!("==== STAGED ROLE CHANGES ====");
|
||||||
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
|
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
|
||||||
for (id, _, role) in layout.staging.items().iter() {
|
for (id, _, role) in layout.staging_roles.items().iter() {
|
||||||
if layout.roles.get(id) == Some(role) {
|
if layout.roles.get(id) == Some(role) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,33 +6,33 @@ use std::cmp::{max, min};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
///Vertex data structures used in all the graphs used in layout.rs.
|
/// Vertex data structures used in all the graphs used in layout.rs.
|
||||||
///usize parameters correspond to node/zone/partitions ids.
|
/// usize parameters correspond to node/zone/partitions ids.
|
||||||
///To understand the vertex roles below, please refer to the formal description
|
/// To understand the vertex roles below, please refer to the formal description
|
||||||
///of the layout computation algorithm.
|
/// of the layout computation algorithm.
|
||||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||||
pub enum Vertex {
|
pub enum Vertex {
|
||||||
Source,
|
Source,
|
||||||
Pup(usize), //The vertex p+ of partition p
|
Pup(usize), // The vertex p+ of partition p
|
||||||
Pdown(usize), //The vertex p- of partition p
|
Pdown(usize), // The vertex p- of partition p
|
||||||
PZ(usize, usize), //The vertex corresponding to x_(partition p, zone z)
|
PZ(usize, usize), // The vertex corresponding to x_(partition p, zone z)
|
||||||
N(usize), //The vertex corresponding to node n
|
N(usize), // The vertex corresponding to node n
|
||||||
Sink,
|
Sink,
|
||||||
}
|
}
|
||||||
|
|
||||||
///Edge data structure for the flow algorithm.
|
/// Edge data structure for the flow algorithm.
|
||||||
#[derive(Clone, Copy, Debug)]
|
#[derive(Clone, Copy, Debug)]
|
||||||
pub struct FlowEdge {
|
pub struct FlowEdge {
|
||||||
cap: u32, //flow maximal capacity of the edge
|
cap: u32, // flow maximal capacity of the edge
|
||||||
flow: i32, //flow value on the edge
|
flow: i32, // flow value on the edge
|
||||||
dest: usize, //destination vertex id
|
dest: usize, // destination vertex id
|
||||||
rev: usize, //index of the reversed edge (v, self) in the edge list of vertex v
|
rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v
|
||||||
}
|
}
|
||||||
|
|
||||||
///Edge data structure for the detection of negative cycles.
|
/// Edge data structure for the detection of negative cycles.
|
||||||
#[derive(Clone, Copy, Debug)]
|
#[derive(Clone, Copy, Debug)]
|
||||||
pub struct WeightedEdge {
|
pub struct WeightedEdge {
|
||||||
w: i32, //weight of the edge
|
w: i32, // weight of the edge
|
||||||
dest: usize,
|
dest: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,14 +40,14 @@ pub trait Edge: Clone + Copy {}
|
||||||
impl Edge for FlowEdge {}
|
impl Edge for FlowEdge {}
|
||||||
impl Edge for WeightedEdge {}
|
impl Edge for WeightedEdge {}
|
||||||
|
|
||||||
///Struct for the graph structure. We do encapsulation here to be able to both
|
/// Struct for the graph structure. We do encapsulation here to be able to both
|
||||||
///provide user friendly Vertex enum to address vertices, and to use internally usize
|
/// provide user friendly Vertex enum to address vertices, and to use internally usize
|
||||||
///indices and Vec instead of HashMap in the graph algorithm to optimize execution speed.
|
/// indices and Vec instead of HashMap in the graph algorithm to optimize execution speed.
|
||||||
pub struct Graph<E: Edge> {
|
pub struct Graph<E: Edge> {
|
||||||
vertextoid: HashMap<Vertex, usize>,
|
vertex_to_id: HashMap<Vertex, usize>,
|
||||||
idtovertex: Vec<Vertex>,
|
id_to_vertex: Vec<Vertex>,
|
||||||
|
|
||||||
//The graph is stored as an adjacency list
|
// The graph is stored as an adjacency list
|
||||||
graph: Vec<Vec<E>>,
|
graph: Vec<Vec<E>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,22 +60,30 @@ impl<E: Edge> Graph<E> {
|
||||||
map.insert(*vert, i);
|
map.insert(*vert, i);
|
||||||
}
|
}
|
||||||
Graph::<E> {
|
Graph::<E> {
|
||||||
vertextoid: map,
|
vertex_to_id: map,
|
||||||
idtovertex: vertices.to_vec(),
|
id_to_vertex: vertices.to_vec(),
|
||||||
graph: vec![Vec::<E>::new(); vertices.len()],
|
graph: vec![Vec::<E>::new(); vertices.len()],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_vertex_id(&self, v: &Vertex) -> Result<usize, String> {
|
||||||
|
self.vertex_to_id
|
||||||
|
.get(v)
|
||||||
|
.cloned()
|
||||||
|
.ok_or_else(|| format!("The graph does not contain vertex {:?}", v))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Graph<FlowEdge> {
|
impl Graph<FlowEdge> {
|
||||||
///This function adds a directed edge to the graph with capacity c, and the
|
/// This function adds a directed edge to the graph with capacity c, and the
|
||||||
///corresponding reversed edge with capacity 0.
|
/// corresponding reversed edge with capacity 0.
|
||||||
pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u32) -> Result<(), String> {
|
pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u32) -> Result<(), String> {
|
||||||
if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) {
|
let idu = self.get_vertex_id(&u)?;
|
||||||
return Err("The graph does not contain the provided vertex.".to_string());
|
let idv = self.get_vertex_id(&v)?;
|
||||||
|
if idu == idv {
|
||||||
|
return Err("Cannot add edge from vertex to itself in flow graph".into());
|
||||||
}
|
}
|
||||||
let idu = self.vertextoid[&u];
|
|
||||||
let idv = self.vertextoid[&v];
|
|
||||||
let rev_u = self.graph[idu].len();
|
let rev_u = self.graph[idu].len();
|
||||||
let rev_v = self.graph[idv].len();
|
let rev_v = self.graph[idv].len();
|
||||||
self.graph[idu].push(FlowEdge {
|
self.graph[idu].push(FlowEdge {
|
||||||
|
@ -93,28 +101,22 @@ impl Graph<FlowEdge> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function returns the list of vertices that receive a positive flow from
|
/// This function returns the list of vertices that receive a positive flow from
|
||||||
///vertex v.
|
/// vertex v.
|
||||||
pub fn get_positive_flow_from(&self, v: Vertex) -> Result<Vec<Vertex>, String> {
|
pub fn get_positive_flow_from(&self, v: Vertex) -> Result<Vec<Vertex>, String> {
|
||||||
if !self.vertextoid.contains_key(&v) {
|
let idv = self.get_vertex_id(&v)?;
|
||||||
return Err("The graph does not contain the provided vertex.".to_string());
|
|
||||||
}
|
|
||||||
let idv = self.vertextoid[&v];
|
|
||||||
let mut result = Vec::<Vertex>::new();
|
let mut result = Vec::<Vertex>::new();
|
||||||
for edge in self.graph[idv].iter() {
|
for edge in self.graph[idv].iter() {
|
||||||
if edge.flow > 0 {
|
if edge.flow > 0 {
|
||||||
result.push(self.idtovertex[edge.dest]);
|
result.push(self.id_to_vertex[edge.dest]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function returns the value of the flow incoming to v.
|
/// This function returns the value of the flow incoming to v.
|
||||||
pub fn get_inflow(&self, v: Vertex) -> Result<i32, String> {
|
pub fn get_inflow(&self, v: Vertex) -> Result<i32, String> {
|
||||||
if !self.vertextoid.contains_key(&v) {
|
let idv = self.get_vertex_id(&v)?;
|
||||||
return Err("The graph does not contain the provided vertex.".to_string());
|
|
||||||
}
|
|
||||||
let idv = self.vertextoid[&v];
|
|
||||||
let mut result = 0;
|
let mut result = 0;
|
||||||
for edge in self.graph[idv].iter() {
|
for edge in self.graph[idv].iter() {
|
||||||
result += max(0, self.graph[edge.dest][edge.rev].flow);
|
result += max(0, self.graph[edge.dest][edge.rev].flow);
|
||||||
|
@ -122,12 +124,9 @@ impl Graph<FlowEdge> {
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function returns the value of the flow outgoing from v.
|
/// This function returns the value of the flow outgoing from v.
|
||||||
pub fn get_outflow(&self, v: Vertex) -> Result<i32, String> {
|
pub fn get_outflow(&self, v: Vertex) -> Result<i32, String> {
|
||||||
if !self.vertextoid.contains_key(&v) {
|
let idv = self.get_vertex_id(&v)?;
|
||||||
return Err("The graph does not contain the provided vertex.".to_string());
|
|
||||||
}
|
|
||||||
let idv = self.vertextoid[&v];
|
|
||||||
let mut result = 0;
|
let mut result = 0;
|
||||||
for edge in self.graph[idv].iter() {
|
for edge in self.graph[idv].iter() {
|
||||||
result += max(0, edge.flow);
|
result += max(0, edge.flow);
|
||||||
|
@ -135,19 +134,19 @@ impl Graph<FlowEdge> {
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function computes the flow total value by computing the outgoing flow
|
/// This function computes the flow total value by computing the outgoing flow
|
||||||
///from the source.
|
/// from the source.
|
||||||
pub fn get_flow_value(&mut self) -> Result<i32, String> {
|
pub fn get_flow_value(&mut self) -> Result<i32, String> {
|
||||||
self.get_outflow(Vertex::Source)
|
self.get_outflow(Vertex::Source)
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function shuffles the order of the edge lists. It keeps the ids of the
|
/// This function shuffles the order of the edge lists. It keeps the ids of the
|
||||||
///reversed edges consistent.
|
/// reversed edges consistent.
|
||||||
fn shuffle_edges(&mut self) {
|
fn shuffle_edges(&mut self) {
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
for i in 0..self.graph.len() {
|
for i in 0..self.graph.len() {
|
||||||
self.graph[i].shuffle(&mut rng);
|
self.graph[i].shuffle(&mut rng);
|
||||||
//We need to update the ids of the reverse edges.
|
// We need to update the ids of the reverse edges.
|
||||||
for j in 0..self.graph[i].len() {
|
for j in 0..self.graph[i].len() {
|
||||||
let target_v = self.graph[i][j].dest;
|
let target_v = self.graph[i][j].dest;
|
||||||
let target_rev = self.graph[i][j].rev;
|
let target_rev = self.graph[i][j].rev;
|
||||||
|
@ -156,97 +155,86 @@ impl Graph<FlowEdge> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///Computes an upper bound of the flow on the graph
|
/// Computes an upper bound of the flow on the graph
|
||||||
pub fn flow_upper_bound(&self) -> u32 {
|
pub fn flow_upper_bound(&self) -> Result<u32, String> {
|
||||||
let idsource = self.vertextoid[&Vertex::Source];
|
let idsource = self.get_vertex_id(&Vertex::Source)?;
|
||||||
let mut flow_upper_bound = 0;
|
let mut flow_upper_bound = 0;
|
||||||
for edge in self.graph[idsource].iter() {
|
for edge in self.graph[idsource].iter() {
|
||||||
flow_upper_bound += edge.cap;
|
flow_upper_bound += edge.cap;
|
||||||
}
|
}
|
||||||
flow_upper_bound
|
Ok(flow_upper_bound)
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function computes the maximal flow using Dinic's algorithm. It starts with
|
/// This function computes the maximal flow using Dinic's algorithm. It starts with
|
||||||
///the flow values already present in the graph. So it is possible to add some edge to
|
/// the flow values already present in the graph. So it is possible to add some edge to
|
||||||
///the graph, compute a flow, add other edges, update the flow.
|
/// the graph, compute a flow, add other edges, update the flow.
|
||||||
pub fn compute_maximal_flow(&mut self) -> Result<(), String> {
|
pub fn compute_maximal_flow(&mut self) -> Result<(), String> {
|
||||||
if !self.vertextoid.contains_key(&Vertex::Source) {
|
let idsource = self.get_vertex_id(&Vertex::Source)?;
|
||||||
return Err("The graph does not contain a source.".to_string());
|
let idsink = self.get_vertex_id(&Vertex::Sink)?;
|
||||||
}
|
|
||||||
if !self.vertextoid.contains_key(&Vertex::Sink) {
|
|
||||||
return Err("The graph does not contain a sink.".to_string());
|
|
||||||
}
|
|
||||||
|
|
||||||
let idsource = self.vertextoid[&Vertex::Source];
|
|
||||||
let idsink = self.vertextoid[&Vertex::Sink];
|
|
||||||
|
|
||||||
let nb_vertices = self.graph.len();
|
let nb_vertices = self.graph.len();
|
||||||
|
|
||||||
let flow_upper_bound = self.flow_upper_bound();
|
let flow_upper_bound = self.flow_upper_bound()?;
|
||||||
|
|
||||||
//To ensure the dispersion of the associations generated by the
|
// To ensure the dispersion of the associations generated by the
|
||||||
//assignation, we shuffle the neighbours of the nodes. Hence,
|
// assignation, we shuffle the neighbours of the nodes. Hence,
|
||||||
//the vertices do not consider their neighbours in the same order.
|
// the vertices do not consider their neighbours in the same order.
|
||||||
self.shuffle_edges();
|
self.shuffle_edges();
|
||||||
|
|
||||||
//We run Dinic's max flow algorithm
|
// We run Dinic's max flow algorithm
|
||||||
loop {
|
loop {
|
||||||
//We build the level array from Dinic's algorithm.
|
// We build the level array from Dinic's algorithm.
|
||||||
let mut level = vec![None; nb_vertices];
|
let mut level = vec![None; nb_vertices];
|
||||||
|
|
||||||
let mut fifo = VecDeque::new();
|
let mut fifo = VecDeque::new();
|
||||||
fifo.push_back((idsource, 0));
|
fifo.push_back((idsource, 0));
|
||||||
while !fifo.is_empty() {
|
while let Some((id, lvl)) = fifo.pop_front() {
|
||||||
if let Some((id, lvl)) = fifo.pop_front() {
|
if level[id] == None {
|
||||||
if level[id] == None {
|
// it means id has not yet been reached
|
||||||
//it means id has not yet been reached
|
level[id] = Some(lvl);
|
||||||
level[id] = Some(lvl);
|
for edge in self.graph[id].iter() {
|
||||||
for edge in self.graph[id].iter() {
|
if edge.cap as i32 - edge.flow > 0 {
|
||||||
if edge.cap as i32 - edge.flow > 0 {
|
fifo.push_back((edge.dest, lvl + 1));
|
||||||
fifo.push_back((edge.dest, lvl + 1));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if level[idsink] == None {
|
if level[idsink] == None {
|
||||||
//There is no residual flow
|
// There is no residual flow
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
//Now we run DFS respecting the level array
|
// Now we run DFS respecting the level array
|
||||||
let mut next_nbd = vec![0; nb_vertices];
|
let mut next_nbd = vec![0; nb_vertices];
|
||||||
let mut lifo = VecDeque::new();
|
let mut lifo = Vec::new();
|
||||||
|
|
||||||
lifo.push_back((idsource, flow_upper_bound));
|
lifo.push((idsource, flow_upper_bound));
|
||||||
|
|
||||||
while let Some((id_tmp, f_tmp)) = lifo.back() {
|
while let Some((id, f)) = lifo.last().cloned() {
|
||||||
let id = *id_tmp;
|
|
||||||
let f = *f_tmp;
|
|
||||||
if id == idsink {
|
if id == idsink {
|
||||||
//The DFS reached the sink, we can add a
|
// The DFS reached the sink, we can add a
|
||||||
//residual flow.
|
// residual flow.
|
||||||
lifo.pop_back();
|
lifo.pop();
|
||||||
while let Some((id, _)) = lifo.pop_back() {
|
while let Some((id, _)) = lifo.pop() {
|
||||||
let nbd = next_nbd[id];
|
let nbd = next_nbd[id];
|
||||||
self.graph[id][nbd].flow += f as i32;
|
self.graph[id][nbd].flow += f as i32;
|
||||||
let id_rev = self.graph[id][nbd].dest;
|
let id_rev = self.graph[id][nbd].dest;
|
||||||
let nbd_rev = self.graph[id][nbd].rev;
|
let nbd_rev = self.graph[id][nbd].rev;
|
||||||
self.graph[id_rev][nbd_rev].flow -= f as i32;
|
self.graph[id_rev][nbd_rev].flow -= f as i32;
|
||||||
}
|
}
|
||||||
lifo.push_back((idsource, flow_upper_bound));
|
lifo.push((idsource, flow_upper_bound));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
//else we did not reach the sink
|
// else we did not reach the sink
|
||||||
let nbd = next_nbd[id];
|
let nbd = next_nbd[id];
|
||||||
if nbd >= self.graph[id].len() {
|
if nbd >= self.graph[id].len() {
|
||||||
//There is nothing to explore from id anymore
|
// There is nothing to explore from id anymore
|
||||||
lifo.pop_back();
|
lifo.pop();
|
||||||
if let Some((parent, _)) = lifo.back() {
|
if let Some((parent, _)) = lifo.last() {
|
||||||
next_nbd[*parent] += 1;
|
next_nbd[*parent] += 1;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
//else we can try to send flow from id to its nbd
|
// else we can try to send flow from id to its nbd
|
||||||
let new_flow = min(
|
let new_flow = min(
|
||||||
f as i32,
|
f as i32,
|
||||||
self.graph[id][nbd].cap as i32 - self.graph[id][nbd].flow,
|
self.graph[id][nbd].cap as i32 - self.graph[id][nbd].flow,
|
||||||
|
@ -257,19 +245,19 @@ impl Graph<FlowEdge> {
|
||||||
}
|
}
|
||||||
if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) {
|
if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) {
|
||||||
if lvldest <= lvlid {
|
if lvldest <= lvlid {
|
||||||
//We cannot send flow to nbd.
|
// We cannot send flow to nbd.
|
||||||
next_nbd[id] += 1;
|
next_nbd[id] += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//otherwise, we send flow to nbd.
|
// otherwise, we send flow to nbd.
|
||||||
lifo.push_back((self.graph[id][nbd].dest, new_flow));
|
lifo.push((self.graph[id][nbd].dest, new_flow));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function takes a flow, and a cost function on the edges, and tries to find an
|
/// This function takes a flow, and a cost function on the edges, and tries to find an
|
||||||
/// equivalent flow with a better cost, by finding improving overflow cycles. It uses
|
/// equivalent flow with a better cost, by finding improving overflow cycles. It uses
|
||||||
/// as subroutine the Bellman Ford algorithm run up to path_length.
|
/// as subroutine the Bellman Ford algorithm run up to path_length.
|
||||||
/// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and
|
/// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and
|
||||||
|
@ -279,19 +267,19 @@ impl Graph<FlowEdge> {
|
||||||
cost: &CostFunction,
|
cost: &CostFunction,
|
||||||
path_length: usize,
|
path_length: usize,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
//We build the weighted graph g where we will look for negative cycle
|
// We build the weighted graph g where we will look for negative cycle
|
||||||
let mut gf = self.build_cost_graph(cost)?;
|
let mut gf = self.build_cost_graph(cost)?;
|
||||||
let mut cycles = gf.list_negative_cycles(path_length);
|
let mut cycles = gf.list_negative_cycles(path_length);
|
||||||
while !cycles.is_empty() {
|
while !cycles.is_empty() {
|
||||||
//we enumerate negative cycles
|
// we enumerate negative cycles
|
||||||
for c in cycles.iter() {
|
for c in cycles.iter() {
|
||||||
for i in 0..c.len() {
|
for i in 0..c.len() {
|
||||||
//We add one flow unit to the edge (u,v) of cycle c
|
// We add one flow unit to the edge (u,v) of cycle c
|
||||||
let idu = self.vertextoid[&c[i]];
|
let idu = self.vertex_to_id[&c[i]];
|
||||||
let idv = self.vertextoid[&c[(i + 1) % c.len()]];
|
let idv = self.vertex_to_id[&c[(i + 1) % c.len()]];
|
||||||
for j in 0..self.graph[idu].len() {
|
for j in 0..self.graph[idu].len() {
|
||||||
//since idu appears at most once in the cycles, we enumerate every
|
// since idu appears at most once in the cycles, we enumerate every
|
||||||
//edge at most once.
|
// edge at most once.
|
||||||
let edge = self.graph[idu][j];
|
let edge = self.graph[idu][j];
|
||||||
if edge.dest == idv {
|
if edge.dest == idv {
|
||||||
self.graph[idu][j].flow += 1;
|
self.graph[idu][j].flow += 1;
|
||||||
|
@ -308,16 +296,16 @@ impl Graph<FlowEdge> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
///Construct the weighted graph G_f from the flow and the cost function
|
/// Construct the weighted graph G_f from the flow and the cost function
|
||||||
fn build_cost_graph(&self, cost: &CostFunction) -> Result<Graph<WeightedEdge>, String> {
|
fn build_cost_graph(&self, cost: &CostFunction) -> Result<Graph<WeightedEdge>, String> {
|
||||||
let mut g = Graph::<WeightedEdge>::new(&self.idtovertex);
|
let mut g = Graph::<WeightedEdge>::new(&self.id_to_vertex);
|
||||||
let nb_vertices = self.idtovertex.len();
|
let nb_vertices = self.id_to_vertex.len();
|
||||||
for i in 0..nb_vertices {
|
for i in 0..nb_vertices {
|
||||||
for edge in self.graph[i].iter() {
|
for edge in self.graph[i].iter() {
|
||||||
if edge.cap as i32 - edge.flow > 0 {
|
if edge.cap as i32 - edge.flow > 0 {
|
||||||
//It is possible to send overflow through this edge
|
// It is possible to send overflow through this edge
|
||||||
let u = self.idtovertex[i];
|
let u = self.id_to_vertex[i];
|
||||||
let v = self.idtovertex[edge.dest];
|
let v = self.id_to_vertex[edge.dest];
|
||||||
if cost.contains_key(&(u, v)) {
|
if cost.contains_key(&(u, v)) {
|
||||||
g.add_edge(u, v, cost[&(u, v)])?;
|
g.add_edge(u, v, cost[&(u, v)])?;
|
||||||
} else if cost.contains_key(&(v, u)) {
|
} else if cost.contains_key(&(v, u)) {
|
||||||
|
@ -333,29 +321,26 @@ impl Graph<FlowEdge> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Graph<WeightedEdge> {
|
impl Graph<WeightedEdge> {
|
||||||
///This function adds a single directed weighted edge to the graph.
|
/// This function adds a single directed weighted edge to the graph.
|
||||||
pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i32) -> Result<(), String> {
|
pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i32) -> Result<(), String> {
|
||||||
if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) {
|
let idu = self.get_vertex_id(&u)?;
|
||||||
return Err("The graph does not contain the provided vertex.".to_string());
|
let idv = self.get_vertex_id(&v)?;
|
||||||
}
|
|
||||||
let idu = self.vertextoid[&u];
|
|
||||||
let idv = self.vertextoid[&v];
|
|
||||||
self.graph[idu].push(WeightedEdge { w, dest: idv });
|
self.graph[idu].push(WeightedEdge { w, dest: idv });
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function lists the negative cycles it manages to find after path_length
|
/// This function lists the negative cycles it manages to find after path_length
|
||||||
///iterations of the main loop of the Bellman-Ford algorithm. For the classical
|
/// iterations of the main loop of the Bellman-Ford algorithm. For the classical
|
||||||
///algorithm, path_length needs to be equal to the number of vertices. However,
|
/// algorithm, path_length needs to be equal to the number of vertices. However,
|
||||||
///for particular graph structures like in our case, the algorithm is still correct
|
/// for particular graph structures like in our case, the algorithm is still correct
|
||||||
///when path_length is the length of the longest possible simple path.
|
/// when path_length is the length of the longest possible simple path.
|
||||||
///See the formal description of the algorithm for more details.
|
/// See the formal description of the algorithm for more details.
|
||||||
fn list_negative_cycles(&self, path_length: usize) -> Vec<Vec<Vertex>> {
|
fn list_negative_cycles(&self, path_length: usize) -> Vec<Vec<Vertex>> {
|
||||||
let nb_vertices = self.graph.len();
|
let nb_vertices = self.graph.len();
|
||||||
|
|
||||||
//We start with every vertex at distance 0 of some imaginary extra -1 vertex.
|
// We start with every vertex at distance 0 of some imaginary extra -1 vertex.
|
||||||
let mut distance = vec![0; nb_vertices];
|
let mut distance = vec![0; nb_vertices];
|
||||||
//The prev vector collects for every vertex from where does the shortest path come
|
// The prev vector collects for every vertex from where does the shortest path come
|
||||||
let mut prev = vec![None; nb_vertices];
|
let mut prev = vec![None; nb_vertices];
|
||||||
|
|
||||||
for _ in 0..path_length + 1 {
|
for _ in 0..path_length + 1 {
|
||||||
|
@ -369,29 +354,35 @@ impl Graph<WeightedEdge> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//If self.graph contains a negative cycle, then at this point the graph described
|
// If self.graph contains a negative cycle, then at this point the graph described
|
||||||
//by prev (which is a directed 1-forest/functional graph)
|
// by prev (which is a directed 1-forest/functional graph)
|
||||||
//must contain a cycle. We list the cycles of prev.
|
// must contain a cycle. We list the cycles of prev.
|
||||||
let cycles_prev = cycles_of_1_forest(&prev);
|
let cycles_prev = cycles_of_1_forest(&prev);
|
||||||
|
|
||||||
//Remark that the cycle in prev is in the reverse order compared to the cycle
|
// Remark that the cycle in prev is in the reverse order compared to the cycle
|
||||||
//in the graph. Thus the .rev().
|
// in the graph. Thus the .rev().
|
||||||
return cycles_prev
|
return cycles_prev
|
||||||
.iter()
|
.iter()
|
||||||
.map(|cycle| cycle.iter().rev().map(|id| self.idtovertex[*id]).collect())
|
.map(|cycle| {
|
||||||
|
cycle
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.map(|id| self.id_to_vertex[*id])
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
.collect();
|
.collect();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function returns the list of cycles of a directed 1 forest. It does not
|
/// This function returns the list of cycles of a directed 1 forest. It does not
|
||||||
///check for the consistency of the input.
|
/// check for the consistency of the input.
|
||||||
fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> {
|
fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> {
|
||||||
let mut cycles = Vec::<Vec<usize>>::new();
|
let mut cycles = Vec::<Vec<usize>>::new();
|
||||||
let mut time_of_discovery = vec![None; forest.len()];
|
let mut time_of_discovery = vec![None; forest.len()];
|
||||||
|
|
||||||
for t in 0..forest.len() {
|
for t in 0..forest.len() {
|
||||||
let mut id = t;
|
let mut id = t;
|
||||||
//while we are on a valid undiscovered node
|
// while we are on a valid undiscovered node
|
||||||
while time_of_discovery[id] == None {
|
while time_of_discovery[id] == None {
|
||||||
time_of_discovery[id] = Some(t);
|
time_of_discovery[id] = Some(t);
|
||||||
if let Some(i) = forest[id] {
|
if let Some(i) = forest[id] {
|
||||||
|
@ -401,8 +392,8 @@ fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if forest[id] != None && time_of_discovery[id] == Some(t) {
|
if forest[id] != None && time_of_discovery[id] == Some(t) {
|
||||||
//We discovered an id that we explored at this iteration t.
|
// We discovered an id that we explored at this iteration t.
|
||||||
//It means we are on a cycle
|
// It means we are on a cycle
|
||||||
let mut cy = vec![id; 1];
|
let mut cy = vec![id; 1];
|
||||||
let mut id2 = id;
|
let mut id2 = id;
|
||||||
while let Some(id_next) = forest[id2] {
|
while let Some(id_next) = forest[id2] {
|
||||||
|
|
|
@ -19,7 +19,7 @@ use std::convert::TryInto;
|
||||||
|
|
||||||
const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
|
const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
|
||||||
|
|
||||||
//The Message type will be used to collect information on the algorithm.
|
// The Message type will be used to collect information on the algorithm.
|
||||||
type Message = Vec<String>;
|
type Message = Vec<String>;
|
||||||
|
|
||||||
/// The layout of the cluster, i.e. the list of roles
|
/// The layout of the cluster, i.e. the list of roles
|
||||||
|
@ -30,11 +30,11 @@ pub struct ClusterLayout {
|
||||||
|
|
||||||
pub replication_factor: usize,
|
pub replication_factor: usize,
|
||||||
|
|
||||||
///This attribute is only used to retain the previously computed partition size,
|
/// This attribute is only used to retain the previously computed partition size,
|
||||||
///to know to what extent does it change with the layout update.
|
/// to know to what extent does it change with the layout update.
|
||||||
pub partition_size: u32,
|
pub partition_size: u32,
|
||||||
///Parameters used to compute the assignation currently given by
|
/// Parameters used to compute the assignation currently given by
|
||||||
///ring_assignation_data
|
/// ring_assignation_data
|
||||||
pub parameters: LayoutParameters,
|
pub parameters: LayoutParameters,
|
||||||
|
|
||||||
pub roles: LwwMap<Uuid, NodeRoleV>,
|
pub roles: LwwMap<Uuid, NodeRoleV>,
|
||||||
|
@ -53,14 +53,14 @@ pub struct ClusterLayout {
|
||||||
pub ring_assignation_data: Vec<CompactNodeType>,
|
pub ring_assignation_data: Vec<CompactNodeType>,
|
||||||
|
|
||||||
/// Parameters to be used in the next partition assignation computation.
|
/// Parameters to be used in the next partition assignation computation.
|
||||||
pub staged_parameters: Lww<LayoutParameters>,
|
pub staging_parameters: Lww<LayoutParameters>,
|
||||||
/// Role changes which are staged for the next version of the layout
|
/// Role changes which are staged for the next version of the layout
|
||||||
pub staging: LwwMap<Uuid, NodeRoleV>,
|
pub staging_roles: LwwMap<Uuid, NodeRoleV>,
|
||||||
pub staging_hash: Hash,
|
pub staging_hash: Hash,
|
||||||
}
|
}
|
||||||
|
|
||||||
///This struct is used to set the parameters to be used in the assignation computation
|
/// This struct is used to set the parameters to be used in the assignation computation
|
||||||
///algorithm. It is stored as a Crdt.
|
/// algorithm. It is stored as a Crdt.
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct LayoutParameters {
|
pub struct LayoutParameters {
|
||||||
pub zone_redundancy: usize,
|
pub zone_redundancy: usize,
|
||||||
|
@ -114,20 +114,19 @@ impl NodeRole {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Implementation of the ClusterLayout methods unrelated to the assignation algorithm.
|
// Implementation of the ClusterLayout methods unrelated to the assignation algorithm.
|
||||||
impl ClusterLayout {
|
impl ClusterLayout {
|
||||||
pub fn new(replication_factor: usize) -> Self {
|
pub fn new(replication_factor: usize) -> Self {
|
||||||
//We set the default zone redundancy to be equal to the replication factor,
|
// We set the default zone redundancy to be equal to the replication factor,
|
||||||
//i.e. as strict as possible.
|
// i.e. as strict as possible.
|
||||||
let parameters = LayoutParameters {
|
let parameters = LayoutParameters {
|
||||||
zone_redundancy: replication_factor,
|
zone_redundancy: replication_factor,
|
||||||
};
|
};
|
||||||
let staged_parameters = Lww::<LayoutParameters>::new(parameters.clone());
|
let staging_parameters = Lww::<LayoutParameters>::new(parameters.clone());
|
||||||
|
|
||||||
let empty_lwwmap = LwwMap::new();
|
let empty_lwwmap = LwwMap::new();
|
||||||
let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]);
|
|
||||||
|
|
||||||
ClusterLayout {
|
let mut ret = ClusterLayout {
|
||||||
version: 0,
|
version: 0,
|
||||||
replication_factor,
|
replication_factor,
|
||||||
partition_size: 0,
|
partition_size: 0,
|
||||||
|
@ -135,10 +134,17 @@ impl ClusterLayout {
|
||||||
node_id_vec: Vec::new(),
|
node_id_vec: Vec::new(),
|
||||||
ring_assignation_data: Vec::new(),
|
ring_assignation_data: Vec::new(),
|
||||||
parameters,
|
parameters,
|
||||||
staged_parameters,
|
staging_parameters,
|
||||||
staging: empty_lwwmap,
|
staging_roles: empty_lwwmap,
|
||||||
staging_hash: empty_lwwmap_hash,
|
staging_hash: [0u8; 32].into(),
|
||||||
}
|
};
|
||||||
|
ret.staging_hash = ret.calculate_staging_hash();
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
|
||||||
|
fn calculate_staging_hash(&self) -> Hash {
|
||||||
|
let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
|
||||||
|
blake2sum(&rmp_to_vec_all_named(&hashed_tuple).unwrap()[..])
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn merge(&mut self, other: &ClusterLayout) -> bool {
|
pub fn merge(&mut self, other: &ClusterLayout) -> bool {
|
||||||
|
@ -148,16 +154,15 @@ impl ClusterLayout {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
Ordering::Equal => {
|
Ordering::Equal => {
|
||||||
let param_changed = self.staged_parameters.get() != other.staged_parameters.get();
|
self.staging_parameters.merge(&other.staging_parameters);
|
||||||
self.staged_parameters.merge(&other.staged_parameters);
|
self.staging_roles.merge(&other.staging_roles);
|
||||||
self.staging.merge(&other.staging);
|
|
||||||
|
|
||||||
let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
let new_staging_hash = self.calculate_staging_hash();
|
||||||
let stage_changed = new_staging_hash != self.staging_hash;
|
let changed = new_staging_hash != self.staging_hash;
|
||||||
|
|
||||||
self.staging_hash = new_staging_hash;
|
self.staging_hash = new_staging_hash;
|
||||||
|
|
||||||
stage_changed || param_changed
|
changed
|
||||||
}
|
}
|
||||||
Ordering::Less => false,
|
Ordering::Less => false,
|
||||||
}
|
}
|
||||||
|
@ -179,13 +184,14 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.roles.merge(&self.staging);
|
self.roles.merge(&self.staging_roles);
|
||||||
self.roles.retain(|(_, _, v)| v.0.is_some());
|
self.roles.retain(|(_, _, v)| v.0.is_some());
|
||||||
|
self.parameters = self.staging_parameters.get().clone();
|
||||||
|
|
||||||
let msg = self.calculate_partition_assignation()?;
|
let msg = self.calculate_partition_assignation()?;
|
||||||
|
|
||||||
self.staging.clear();
|
self.staging_roles.clear();
|
||||||
self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
self.staging_hash = self.calculate_staging_hash();
|
||||||
|
|
||||||
self.version += 1;
|
self.version += 1;
|
||||||
|
|
||||||
|
@ -208,9 +214,9 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.staging.clear();
|
self.staging_roles.clear();
|
||||||
self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
self.staging_hash = self.calculate_staging_hash();
|
||||||
self.staged_parameters.update(self.parameters.clone());
|
self.staging_parameters.update(self.parameters.clone());
|
||||||
|
|
||||||
self.version += 1;
|
self.version += 1;
|
||||||
|
|
||||||
|
@ -235,7 +241,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///Returns the uuids of the non_gateway nodes in self.node_id_vec.
|
/// Returns the uuids of the non_gateway nodes in self.node_id_vec.
|
||||||
pub fn nongateway_nodes(&self) -> Vec<Uuid> {
|
pub fn nongateway_nodes(&self) -> Vec<Uuid> {
|
||||||
let mut result = Vec::<Uuid>::new();
|
let mut result = Vec::<Uuid>::new();
|
||||||
for uuid in self.node_id_vec.iter() {
|
for uuid in self.node_id_vec.iter() {
|
||||||
|
@ -247,7 +253,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
///Given a node uuids, this function returns the label of its zone
|
/// Given a node uuids, this function returns the label of its zone
|
||||||
pub fn get_node_zone(&self, uuid: &Uuid) -> Result<String, Error> {
|
pub fn get_node_zone(&self, uuid: &Uuid) -> Result<String, Error> {
|
||||||
match self.node_role(uuid) {
|
match self.node_role(uuid) {
|
||||||
Some(role) => Ok(role.zone.clone()),
|
Some(role) => Ok(role.zone.clone()),
|
||||||
|
@ -257,7 +263,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///Given a node uuids, this function returns its capacity or fails if it does not have any
|
/// Given a node uuids, this function returns its capacity or fails if it does not have any
|
||||||
pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u32, Error> {
|
pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u32, Error> {
|
||||||
match self.node_role(uuid) {
|
match self.node_role(uuid) {
|
||||||
Some(NodeRole {
|
Some(NodeRole {
|
||||||
|
@ -273,7 +279,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///Returns the number of partitions associated to this node in the ring
|
/// Returns the number of partitions associated to this node in the ring
|
||||||
pub fn get_node_usage(&self, uuid: &Uuid) -> Result<usize, Error> {
|
pub fn get_node_usage(&self, uuid: &Uuid) -> Result<usize, Error> {
|
||||||
for (i, id) in self.node_id_vec.iter().enumerate() {
|
for (i, id) in self.node_id_vec.iter().enumerate() {
|
||||||
if id == uuid {
|
if id == uuid {
|
||||||
|
@ -293,7 +299,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
///Returns the sum of capacities of non gateway nodes in the cluster
|
/// Returns the sum of capacities of non gateway nodes in the cluster
|
||||||
pub fn get_total_capacity(&self) -> Result<u32, Error> {
|
pub fn get_total_capacity(&self) -> Result<u32, Error> {
|
||||||
let mut total_capacity = 0;
|
let mut total_capacity = 0;
|
||||||
for uuid in self.nongateway_nodes().iter() {
|
for uuid in self.nongateway_nodes().iter() {
|
||||||
|
@ -307,7 +313,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
/// returns true if consistent, false if error
|
/// returns true if consistent, false if error
|
||||||
pub fn check(&self) -> bool {
|
pub fn check(&self) -> bool {
|
||||||
// Check that the hash of the staging data is correct
|
// Check that the hash of the staging data is correct
|
||||||
let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
let staging_hash = self.calculate_staging_hash();
|
||||||
if staging_hash != self.staging_hash {
|
if staging_hash != self.staging_hash {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -346,14 +352,14 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Check that every partition is associated to distinct nodes
|
// Check that every partition is associated to distinct nodes
|
||||||
let rf = self.replication_factor;
|
let rf = self.replication_factor;
|
||||||
for p in 0..(1 << PARTITION_BITS) {
|
for p in 0..(1 << PARTITION_BITS) {
|
||||||
let nodes_of_p = self.ring_assignation_data[rf * p..rf * (p + 1)].to_vec();
|
let nodes_of_p = self.ring_assignation_data[rf * p..rf * (p + 1)].to_vec();
|
||||||
if nodes_of_p.iter().unique().count() != rf {
|
if nodes_of_p.iter().unique().count() != rf {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
//Check that every partition is spread over at least zone_redundancy zones.
|
// Check that every partition is spread over at least zone_redundancy zones.
|
||||||
let zones_of_p = nodes_of_p.iter().map(|n| {
|
let zones_of_p = nodes_of_p.iter().map(|n| {
|
||||||
self.get_node_zone(&self.node_id_vec[*n as usize])
|
self.get_node_zone(&self.node_id_vec[*n as usize])
|
||||||
.expect("Zone not found.")
|
.expect("Zone not found.")
|
||||||
|
@ -364,7 +370,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Check that the nodes capacities is consistent with the stored partitions
|
// Check that the nodes capacities is consistent with the stored partitions
|
||||||
let mut node_usage = vec![0; MAX_NODE_NUMBER];
|
let mut node_usage = vec![0; MAX_NODE_NUMBER];
|
||||||
for n in self.ring_assignation_data.iter() {
|
for n in self.ring_assignation_data.iter() {
|
||||||
node_usage[*n as usize] += 1;
|
node_usage[*n as usize] += 1;
|
||||||
|
@ -380,8 +386,8 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Check that the partition size stored is the one computed by the asignation
|
// Check that the partition size stored is the one computed by the asignation
|
||||||
//algorithm.
|
// algorithm.
|
||||||
let cl2 = self.clone();
|
let cl2 = self.clone();
|
||||||
let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().expect("Critical Error");
|
let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().expect("Critical Error");
|
||||||
match cl2.compute_optimal_partition_size(&zone_to_id) {
|
match cl2.compute_optimal_partition_size(&zone_to_id) {
|
||||||
|
@ -394,7 +400,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Implementation of the ClusterLayout methods related to the assignation algorithm.
|
// Implementation of the ClusterLayout methods related to the assignation algorithm.
|
||||||
impl ClusterLayout {
|
impl ClusterLayout {
|
||||||
/// This function calculates a new partition-to-node assignation.
|
/// This function calculates a new partition-to-node assignation.
|
||||||
/// The computed assignation respects the node replication factor
|
/// The computed assignation respects the node replication factor
|
||||||
|
@ -403,16 +409,13 @@ impl ClusterLayout {
|
||||||
/// Among such optimal assignation, it minimizes the distance to
|
/// Among such optimal assignation, it minimizes the distance to
|
||||||
/// the former assignation (if any) to minimize the amount of
|
/// the former assignation (if any) to minimize the amount of
|
||||||
/// data to be moved.
|
/// data to be moved.
|
||||||
// Staged role changes must be merged with nodes roles before calling this function,
|
/// Staged role changes must be merged with nodes roles before calling this function,
|
||||||
// hence it must only be called from apply_staged_changes() and hence is not public.
|
/// hence it must only be called from apply_staged_changes() and hence is not public.
|
||||||
fn calculate_partition_assignation(&mut self) -> Result<Message, Error> {
|
fn calculate_partition_assignation(&mut self) -> Result<Message, Error> {
|
||||||
//We update the node ids, since the node role list might have changed with the
|
// We update the node ids, since the node role list might have changed with the
|
||||||
//changes in the layout. We retrieve the old_assignation reframed with new ids
|
// changes in the layout. We retrieve the old_assignation reframed with new ids
|
||||||
let old_assignation_opt = self.update_node_id_vec()?;
|
let old_assignation_opt = self.update_node_id_vec()?;
|
||||||
|
|
||||||
//We update the parameters
|
|
||||||
self.parameters = self.staged_parameters.get().clone();
|
|
||||||
|
|
||||||
let mut msg = Message::new();
|
let mut msg = Message::new();
|
||||||
msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into());
|
msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into());
|
||||||
msg.push("".into());
|
msg.push("".into());
|
||||||
|
@ -422,8 +425,8 @@ impl ClusterLayout {
|
||||||
self.replication_factor, self.parameters.zone_redundancy
|
self.replication_factor, self.parameters.zone_redundancy
|
||||||
));
|
));
|
||||||
|
|
||||||
//We generate for once numerical ids for the zones of non gateway nodes,
|
// We generate for once numerical ids for the zones of non gateway nodes,
|
||||||
//to use them as indices in the flow graphs.
|
// to use them as indices in the flow graphs.
|
||||||
let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
|
let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
|
||||||
|
|
||||||
let nb_nongateway_nodes = self.nongateway_nodes().len();
|
let nb_nongateway_nodes = self.nongateway_nodes().len();
|
||||||
|
@ -443,10 +446,10 @@ impl ClusterLayout {
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
//We compute the optimal partition size
|
// We compute the optimal partition size
|
||||||
//Capacities should be given in a unit so that partition size is at least 100.
|
// Capacities should be given in a unit so that partition size is at least 100.
|
||||||
//In this case, integer rounding plays a marginal role in the percentages of
|
// In this case, integer rounding plays a marginal role in the percentages of
|
||||||
//optimality.
|
// optimality.
|
||||||
let partition_size = self.compute_optimal_partition_size(&zone_to_id)?;
|
let partition_size = self.compute_optimal_partition_size(&zone_to_id)?;
|
||||||
|
|
||||||
if old_assignation_opt != None {
|
if old_assignation_opt != None {
|
||||||
|
@ -461,7 +464,7 @@ impl ClusterLayout {
|
||||||
partition_size
|
partition_size
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
//We write the partition size.
|
// We write the partition size.
|
||||||
self.partition_size = partition_size;
|
self.partition_size = partition_size;
|
||||||
|
|
||||||
if partition_size < 100 {
|
if partition_size < 100 {
|
||||||
|
@ -472,15 +475,15 @@ impl ClusterLayout {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
//We compute a first flow/assignation that is heuristically close to the previous
|
// We compute a first flow/assignation that is heuristically close to the previous
|
||||||
//assignation
|
// assignation
|
||||||
let mut gflow = self.compute_candidate_assignation(&zone_to_id, &old_assignation_opt)?;
|
let mut gflow = self.compute_candidate_assignation(&zone_to_id, &old_assignation_opt)?;
|
||||||
if let Some(assoc) = &old_assignation_opt {
|
if let Some(assoc) = &old_assignation_opt {
|
||||||
//We minimize the distance to the previous assignation.
|
// We minimize the distance to the previous assignation.
|
||||||
self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?;
|
self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
//We display statistics of the computation
|
// We display statistics of the computation
|
||||||
msg.append(&mut self.output_stat(
|
msg.append(&mut self.output_stat(
|
||||||
&gflow,
|
&gflow,
|
||||||
&old_assignation_opt,
|
&old_assignation_opt,
|
||||||
|
@ -489,7 +492,7 @@ impl ClusterLayout {
|
||||||
)?);
|
)?);
|
||||||
msg.push("".to_string());
|
msg.push("".to_string());
|
||||||
|
|
||||||
//We update the layout structure
|
// We update the layout structure
|
||||||
self.update_ring_from_flow(id_to_zone.len(), &gflow)?;
|
self.update_ring_from_flow(id_to_zone.len(), &gflow)?;
|
||||||
|
|
||||||
if !self.check() {
|
if !self.check() {
|
||||||
|
@ -508,8 +511,8 @@ impl ClusterLayout {
|
||||||
/// do modify assignation_ring and node_id_vec.
|
/// do modify assignation_ring and node_id_vec.
|
||||||
fn update_node_id_vec(&mut self) -> Result<Option<Vec<Vec<usize>>>, Error> {
|
fn update_node_id_vec(&mut self) -> Result<Option<Vec<Vec<usize>>>, Error> {
|
||||||
// (1) We compute the new node list
|
// (1) We compute the new node list
|
||||||
//Non gateway nodes should be coded on 8bits, hence they must be first in the list
|
// Non gateway nodes should be coded on 8bits, hence they must be first in the list
|
||||||
//We build the new node ids
|
// We build the new node ids
|
||||||
let mut new_non_gateway_nodes: Vec<Uuid> = self
|
let mut new_non_gateway_nodes: Vec<Uuid> = self
|
||||||
.roles
|
.roles
|
||||||
.items()
|
.items()
|
||||||
|
@ -542,12 +545,12 @@ impl ClusterLayout {
|
||||||
self.node_id_vec = new_node_id_vec.clone();
|
self.node_id_vec = new_node_id_vec.clone();
|
||||||
|
|
||||||
// (2) We retrieve the old association
|
// (2) We retrieve the old association
|
||||||
//We rewrite the old association with the new indices. We only consider partition
|
// We rewrite the old association with the new indices. We only consider partition
|
||||||
//to node assignations where the node is still in use.
|
// to node assignations where the node is still in use.
|
||||||
let mut old_assignation = vec![Vec::<usize>::new(); NB_PARTITIONS];
|
let mut old_assignation = vec![Vec::<usize>::new(); NB_PARTITIONS];
|
||||||
|
|
||||||
if self.ring_assignation_data.is_empty() {
|
if self.ring_assignation_data.is_empty() {
|
||||||
//This is a new association
|
// This is a new association
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor {
|
if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor {
|
||||||
|
@ -558,11 +561,11 @@ impl ClusterLayout {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
//We build a translation table between the uuid and new ids
|
// We build a translation table between the uuid and new ids
|
||||||
let mut uuid_to_new_id = HashMap::<Uuid, usize>::new();
|
let mut uuid_to_new_id = HashMap::<Uuid, usize>::new();
|
||||||
|
|
||||||
//We add the indices of only the new non-gateway nodes that can be used in the
|
// We add the indices of only the new non-gateway nodes that can be used in the
|
||||||
//association ring
|
// association ring
|
||||||
for (i, uuid) in new_node_id_vec.iter().enumerate() {
|
for (i, uuid) in new_node_id_vec.iter().enumerate() {
|
||||||
uuid_to_new_id.insert(*uuid, i);
|
uuid_to_new_id.insert(*uuid, i);
|
||||||
}
|
}
|
||||||
|
@ -577,14 +580,14 @@ impl ClusterLayout {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//We write the ring
|
// We write the ring
|
||||||
self.ring_assignation_data = Vec::<CompactNodeType>::new();
|
self.ring_assignation_data = Vec::<CompactNodeType>::new();
|
||||||
|
|
||||||
Ok(Some(old_assignation))
|
Ok(Some(old_assignation))
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function generates ids for the zone of the nodes appearing in
|
/// This function generates ids for the zone of the nodes appearing in
|
||||||
///self.node_id_vec.
|
/// self.node_id_vec.
|
||||||
fn generate_nongateway_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> {
|
fn generate_nongateway_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> {
|
||||||
let mut id_to_zone = Vec::<String>::new();
|
let mut id_to_zone = Vec::<String>::new();
|
||||||
let mut zone_to_id = HashMap::<String, usize>::new();
|
let mut zone_to_id = HashMap::<String, usize>::new();
|
||||||
|
@ -607,8 +610,8 @@ impl ClusterLayout {
|
||||||
Ok((id_to_zone, zone_to_id))
|
Ok((id_to_zone, zone_to_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function computes by dichotomy the largest realizable partition size, given
|
/// This function computes by dichotomy the largest realizable partition size, given
|
||||||
///the layout roles and parameters.
|
/// the layout roles and parameters.
|
||||||
fn compute_optimal_partition_size(
|
fn compute_optimal_partition_size(
|
||||||
&self,
|
&self,
|
||||||
zone_to_id: &HashMap<String, usize>,
|
zone_to_id: &HashMap<String, usize>,
|
||||||
|
@ -662,13 +665,13 @@ impl ClusterLayout {
|
||||||
vertices
|
vertices
|
||||||
}
|
}
|
||||||
|
|
||||||
///Generates the graph to compute the maximal flow corresponding to the optimal
|
/// Generates the graph to compute the maximal flow corresponding to the optimal
|
||||||
///partition assignation.
|
/// partition assignation.
|
||||||
///exclude_assoc is the set of (partition, node) association that we are forbidden
|
/// exclude_assoc is the set of (partition, node) association that we are forbidden
|
||||||
///to use (hence we do not add the corresponding edge to the graph). This parameter
|
/// to use (hence we do not add the corresponding edge to the graph). This parameter
|
||||||
///is used to compute a first flow that uses only edges appearing in the previous
|
/// is used to compute a first flow that uses only edges appearing in the previous
|
||||||
///assignation. This produces a solution that heuristically should be close to the
|
/// assignation. This produces a solution that heuristically should be close to the
|
||||||
///previous one.
|
/// previous one.
|
||||||
fn generate_flow_graph(
|
fn generate_flow_graph(
|
||||||
&self,
|
&self,
|
||||||
partition_size: u32,
|
partition_size: u32,
|
||||||
|
@ -709,14 +712,14 @@ impl ClusterLayout {
|
||||||
Ok(g)
|
Ok(g)
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function computes a first optimal assignation (in the form of a flow graph).
|
/// This function computes a first optimal assignation (in the form of a flow graph).
|
||||||
fn compute_candidate_assignation(
|
fn compute_candidate_assignation(
|
||||||
&self,
|
&self,
|
||||||
zone_to_id: &HashMap<String, usize>,
|
zone_to_id: &HashMap<String, usize>,
|
||||||
prev_assign_opt: &Option<Vec<Vec<usize>>>,
|
prev_assign_opt: &Option<Vec<Vec<usize>>>,
|
||||||
) -> Result<Graph<FlowEdge>, Error> {
|
) -> Result<Graph<FlowEdge>, Error> {
|
||||||
//We list the (partition,node) associations that are not used in the
|
// We list the (partition,node) associations that are not used in the
|
||||||
//previous assignation
|
// previous assignation
|
||||||
let mut exclude_edge = HashSet::<(usize, usize)>::new();
|
let mut exclude_edge = HashSet::<(usize, usize)>::new();
|
||||||
if let Some(prev_assign) = prev_assign_opt {
|
if let Some(prev_assign) = prev_assign_opt {
|
||||||
let nb_nodes = self.nongateway_nodes().len();
|
let nb_nodes = self.nongateway_nodes().len();
|
||||||
|
@ -730,13 +733,13 @@ impl ClusterLayout {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//We compute the best flow using only the edges used in the previous assignation
|
// We compute the best flow using only the edges used in the previous assignation
|
||||||
let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?;
|
let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?;
|
||||||
g.compute_maximal_flow()?;
|
g.compute_maximal_flow()?;
|
||||||
|
|
||||||
//We add the excluded edges and compute the maximal flow with the full graph.
|
// We add the excluded edges and compute the maximal flow with the full graph.
|
||||||
//The algorithm is such that it will start with the flow that we just computed
|
// The algorithm is such that it will start with the flow that we just computed
|
||||||
//and find ameliorating paths from that.
|
// and find ameliorating paths from that.
|
||||||
for (p, n) in exclude_edge.iter() {
|
for (p, n) in exclude_edge.iter() {
|
||||||
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
|
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
|
||||||
g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
|
g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
|
||||||
|
@ -745,16 +748,16 @@ impl ClusterLayout {
|
||||||
Ok(g)
|
Ok(g)
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function updates the flow graph gflow to minimize the distance between
|
/// This function updates the flow graph gflow to minimize the distance between
|
||||||
///its corresponding assignation and the previous one
|
/// its corresponding assignation and the previous one
|
||||||
fn minimize_rebalance_load(
|
fn minimize_rebalance_load(
|
||||||
&self,
|
&self,
|
||||||
gflow: &mut Graph<FlowEdge>,
|
gflow: &mut Graph<FlowEdge>,
|
||||||
zone_to_id: &HashMap<String, usize>,
|
zone_to_id: &HashMap<String, usize>,
|
||||||
prev_assign: &[Vec<usize>],
|
prev_assign: &[Vec<usize>],
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
//We define a cost function on the edges (pairs of vertices) corresponding
|
// We define a cost function on the edges (pairs of vertices) corresponding
|
||||||
//to the distance between the two assignations.
|
// to the distance between the two assignations.
|
||||||
let mut cost = CostFunction::new();
|
let mut cost = CostFunction::new();
|
||||||
for (p, assoc_p) in prev_assign.iter().enumerate() {
|
for (p, assoc_p) in prev_assign.iter().enumerate() {
|
||||||
for n in assoc_p.iter() {
|
for n in assoc_p.iter() {
|
||||||
|
@ -763,9 +766,9 @@ impl ClusterLayout {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//We compute the maximal length of a simple path in gflow. It is used in the
|
// We compute the maximal length of a simple path in gflow. It is used in the
|
||||||
//Bellman-Ford algorithm in optimize_flow_with_cost to set the number
|
// Bellman-Ford algorithm in optimize_flow_with_cost to set the number
|
||||||
//of iterations.
|
// of iterations.
|
||||||
let nb_nodes = self.nongateway_nodes().len();
|
let nb_nodes = self.nongateway_nodes().len();
|
||||||
let path_length = 4 * nb_nodes;
|
let path_length = 4 * nb_nodes;
|
||||||
gflow.optimize_flow_with_cost(&cost, path_length)?;
|
gflow.optimize_flow_with_cost(&cost, path_length)?;
|
||||||
|
@ -773,7 +776,7 @@ impl ClusterLayout {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function updates the assignation ring from the flow graph.
|
/// This function updates the assignation ring from the flow graph.
|
||||||
fn update_ring_from_flow(
|
fn update_ring_from_flow(
|
||||||
&mut self,
|
&mut self,
|
||||||
nb_zones: usize,
|
nb_zones: usize,
|
||||||
|
@ -801,8 +804,8 @@ impl ClusterLayout {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
///This function returns a message summing up the partition repartition of the new
|
/// This function returns a message summing up the partition repartition of the new
|
||||||
///layout, and other statistics of the partition assignation computation.
|
/// layout, and other statistics of the partition assignation computation.
|
||||||
fn output_stat(
|
fn output_stat(
|
||||||
&self,
|
&self,
|
||||||
gflow: &Graph<FlowEdge>,
|
gflow: &Graph<FlowEdge>,
|
||||||
|
@ -837,7 +840,7 @@ impl ClusterLayout {
|
||||||
used_cap / self.replication_factor as u32
|
used_cap / self.replication_factor as u32
|
||||||
));
|
));
|
||||||
|
|
||||||
//We define and fill in the following tables
|
// We define and fill in the following tables
|
||||||
let storing_nodes = self.nongateway_nodes();
|
let storing_nodes = self.nongateway_nodes();
|
||||||
let mut new_partitions = vec![0; storing_nodes.len()];
|
let mut new_partitions = vec![0; storing_nodes.len()];
|
||||||
let mut stored_partitions = vec![0; storing_nodes.len()];
|
let mut stored_partitions = vec![0; storing_nodes.len()];
|
||||||
|
@ -879,7 +882,7 @@ impl ClusterLayout {
|
||||||
new_partitions_zone = stored_partitions_zone.clone();
|
new_partitions_zone = stored_partitions_zone.clone();
|
||||||
}
|
}
|
||||||
|
|
||||||
//We display the statistics
|
// We display the statistics
|
||||||
|
|
||||||
msg.push("".into());
|
msg.push("".into());
|
||||||
if *prev_assign_opt != None {
|
if *prev_assign_opt != None {
|
||||||
|
@ -951,27 +954,27 @@ impl ClusterLayout {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//====================================================================================
|
// ====================================================================================
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::{Error, *};
|
use super::{Error, *};
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
|
|
||||||
//This function checks that the partition size S computed is at least better than the
|
// This function checks that the partition size S computed is at least better than the
|
||||||
//one given by a very naive algorithm. To do so, we try to run the naive algorithm
|
// one given by a very naive algorithm. To do so, we try to run the naive algorithm
|
||||||
//assuming a partion size of S+1. If we succed, it means that the optimal assignation
|
// assuming a partion size of S+1. If we succed, it means that the optimal assignation
|
||||||
//was not optimal. The naive algorithm is the following :
|
// was not optimal. The naive algorithm is the following :
|
||||||
//- we compute the max number of partitions associated to every node, capped at the
|
// - we compute the max number of partitions associated to every node, capped at the
|
||||||
//partition number. It gives the number of tokens of every node.
|
// partition number. It gives the number of tokens of every node.
|
||||||
//- every zone has a number of tokens equal to the sum of the tokens of its nodes.
|
// - every zone has a number of tokens equal to the sum of the tokens of its nodes.
|
||||||
//- we cycle over the partitions and associate zone tokens while respecting the
|
// - we cycle over the partitions and associate zone tokens while respecting the
|
||||||
//zone redundancy constraint.
|
// zone redundancy constraint.
|
||||||
//NOTE: the naive algorithm is not optimal. Counter example:
|
// NOTE: the naive algorithm is not optimal. Counter example:
|
||||||
//take nb_partition = 3 ; replication_factor = 5; redundancy = 4;
|
// take nb_partition = 3 ; replication_factor = 5; redundancy = 4;
|
||||||
//number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2)
|
// number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2)
|
||||||
//With these parameters, the naive algo fails, whereas there is a solution:
|
// With these parameters, the naive algo fails, whereas there is a solution:
|
||||||
//(A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E)
|
// (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E)
|
||||||
fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> {
|
fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> {
|
||||||
let over_size = cl.partition_size + 1;
|
let over_size = cl.partition_size + 1;
|
||||||
let mut zone_token = HashMap::<String, usize>::new();
|
let mut zone_token = HashMap::<String, usize>::new();
|
||||||
|
@ -994,8 +997,8 @@ mod tests {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
//For every partition, we count the number of zone already associated and
|
// For every partition, we count the number of zone already associated and
|
||||||
//the name of the last zone associated
|
// the name of the last zone associated
|
||||||
|
|
||||||
let mut id_zone_token = vec![0; zones.len()];
|
let mut id_zone_token = vec![0; zones.len()];
|
||||||
for (z, t) in zone_token.iter() {
|
for (z, t) in zone_token.iter() {
|
||||||
|
@ -1049,7 +1052,7 @@ mod tests {
|
||||||
cl.node_id_vec.push(x);
|
cl.node_id_vec.push(x);
|
||||||
}
|
}
|
||||||
|
|
||||||
let update = cl.staging.update_mutator(
|
let update = cl.staging_roles.update_mutator(
|
||||||
cl.node_id_vec[i],
|
cl.node_id_vec[i],
|
||||||
NodeRoleV(Some(NodeRole {
|
NodeRoleV(Some(NodeRole {
|
||||||
zone: (node_zone_vec[i].to_string()),
|
zone: (node_zone_vec[i].to_string()),
|
||||||
|
@ -1057,10 +1060,10 @@ mod tests {
|
||||||
tags: (vec![]),
|
tags: (vec![]),
|
||||||
})),
|
})),
|
||||||
);
|
);
|
||||||
cl.staging.merge(&update);
|
cl.staging_roles.merge(&update);
|
||||||
}
|
}
|
||||||
cl.staging_hash = blake2sum(&rmp_to_vec_all_named(&cl.staging).unwrap()[..]);
|
cl.staging_hash = cl.calculate_staging_hash();
|
||||||
cl.staged_parameters
|
cl.staging_parameters
|
||||||
.update(LayoutParameters { zone_redundancy });
|
.update(LayoutParameters { zone_redundancy });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue