diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 61bfb8c5..040778b1 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -86,7 +86,7 @@ fn get_cluster_layout(garage: &Arc) -> GetClusterLayoutResponse { .map(|(k, _, v)| (hex::encode(k), v.0.clone())) .collect(), staged_role_changes: layout - .staging + .staging_roles .items() .iter() .filter(|(k, _, v)| layout.roles.get(k) != Some(v)) @@ -137,14 +137,14 @@ pub async fn handle_update_cluster_layout( let mut layout = garage.system.get_cluster_layout(); let mut roles = layout.roles.clone(); - roles.merge(&layout.staging); + roles.merge(&layout.staging_roles); for (node, role) in updates { let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?; let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?; layout - .staging + .staging_roles .merge(&roles.update_mutator(node, NodeRoleV(role))); } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index c8b96489..e352ddf2 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -71,7 +71,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint, rpc_host: NodeID) -> )); } _ => { - let new_role = match layout.staging.get(&adv.id) { + let new_role = match layout.staging_roles.get(&adv.id) { Some(NodeRoleV(Some(_))) => "(pending)", _ => "NO ROLE ASSIGNED", }; diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 5056e57d..4b23a096 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -63,14 +63,14 @@ pub async fn cmd_assign_role( .collect::, _>>()?; let mut roles = layout.roles.clone(); - roles.merge(&layout.staging); + roles.merge(&layout.staging_roles); for replaced in args.replace.iter() { let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?; match roles.get(&replaced_node) { Some(NodeRoleV(Some(_))) => { layout - .staging + .staging_roles .merge(&roles.update_mutator(replaced_node, NodeRoleV(None))); } _ => { @@ -128,7 +128,7 @@ pub async fn cmd_assign_role( }; layout - .staging + .staging_roles .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); } @@ -148,13 +148,13 @@ pub async fn cmd_remove_role( let mut layout = fetch_layout(rpc_cli, rpc_host).await?; let mut roles = layout.roles.clone(); - roles.merge(&layout.staging); + roles.merge(&layout.staging_roles); let deleted_node = find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?; layout - .staging + .staging_roles .merge(&roles.update_mutator(deleted_node, NodeRoleV(None))); send_layout(rpc_cli, rpc_host, layout).await?; @@ -278,7 +278,7 @@ pub async fn cmd_config_layout( println!("The zone redundancy must be at least 1."); } else { layout - .staged_parameters + .staging_parameters .update(LayoutParameters { zone_redundancy: r }); println!("The new zone redundancy has been saved ({}).", r); } @@ -352,13 +352,13 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { } pub fn print_staging_parameters_changes(layout: &ClusterLayout) -> bool { - let has_changes = layout.staged_parameters.get().clone() != layout.parameters; + let has_changes = layout.staging_parameters.get().clone() != layout.parameters; if has_changes { println!(); println!("==== NEW LAYOUT PARAMETERS ===="); println!( "Zone redundancy: {}", - layout.staged_parameters.get().zone_redundancy + layout.staging_parameters.get().zone_redundancy ); println!(); } @@ -367,7 +367,7 @@ pub fn print_staging_parameters_changes(layout: &ClusterLayout) -> bool { pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { let has_changes = layout - .staging + .staging_roles .items() .iter() .any(|(k, _, v)| layout.roles.get(k) != Some(v)); @@ -376,7 +376,7 @@ pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { println!(); println!("==== STAGED ROLE CHANGES ===="); let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; - for (id, _, role) in layout.staging.items().iter() { + for (id, _, role) in layout.staging_roles.items().iter() { if layout.roles.get(id) == Some(role) { continue; } diff --git a/src/rpc/graph_algo.rs b/src/rpc/graph_algo.rs index 5bd6cc51..1e4a819b 100644 --- a/src/rpc/graph_algo.rs +++ b/src/rpc/graph_algo.rs @@ -6,33 +6,33 @@ use std::cmp::{max, min}; use std::collections::HashMap; use std::collections::VecDeque; -///Vertex data structures used in all the graphs used in layout.rs. -///usize parameters correspond to node/zone/partitions ids. -///To understand the vertex roles below, please refer to the formal description -///of the layout computation algorithm. +/// Vertex data structures used in all the graphs used in layout.rs. +/// usize parameters correspond to node/zone/partitions ids. +/// To understand the vertex roles below, please refer to the formal description +/// of the layout computation algorithm. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub enum Vertex { Source, - Pup(usize), //The vertex p+ of partition p - Pdown(usize), //The vertex p- of partition p - PZ(usize, usize), //The vertex corresponding to x_(partition p, zone z) - N(usize), //The vertex corresponding to node n + Pup(usize), // The vertex p+ of partition p + Pdown(usize), // The vertex p- of partition p + PZ(usize, usize), // The vertex corresponding to x_(partition p, zone z) + N(usize), // The vertex corresponding to node n Sink, } -///Edge data structure for the flow algorithm. +/// Edge data structure for the flow algorithm. #[derive(Clone, Copy, Debug)] pub struct FlowEdge { - cap: u32, //flow maximal capacity of the edge - flow: i32, //flow value on the edge - dest: usize, //destination vertex id - rev: usize, //index of the reversed edge (v, self) in the edge list of vertex v + cap: u32, // flow maximal capacity of the edge + flow: i32, // flow value on the edge + dest: usize, // destination vertex id + rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v } -///Edge data structure for the detection of negative cycles. +/// Edge data structure for the detection of negative cycles. #[derive(Clone, Copy, Debug)] pub struct WeightedEdge { - w: i32, //weight of the edge + w: i32, // weight of the edge dest: usize, } @@ -40,14 +40,14 @@ pub trait Edge: Clone + Copy {} impl Edge for FlowEdge {} impl Edge for WeightedEdge {} -///Struct for the graph structure. We do encapsulation here to be able to both -///provide user friendly Vertex enum to address vertices, and to use internally usize -///indices and Vec instead of HashMap in the graph algorithm to optimize execution speed. +/// Struct for the graph structure. We do encapsulation here to be able to both +/// provide user friendly Vertex enum to address vertices, and to use internally usize +/// indices and Vec instead of HashMap in the graph algorithm to optimize execution speed. pub struct Graph { - vertextoid: HashMap, - idtovertex: Vec, + vertex_to_id: HashMap, + id_to_vertex: Vec, - //The graph is stored as an adjacency list + // The graph is stored as an adjacency list graph: Vec>, } @@ -60,22 +60,30 @@ impl Graph { map.insert(*vert, i); } Graph:: { - vertextoid: map, - idtovertex: vertices.to_vec(), + vertex_to_id: map, + id_to_vertex: vertices.to_vec(), graph: vec![Vec::::new(); vertices.len()], } } + + fn get_vertex_id(&self, v: &Vertex) -> Result { + self.vertex_to_id + .get(v) + .cloned() + .ok_or_else(|| format!("The graph does not contain vertex {:?}", v)) + } } impl Graph { - ///This function adds a directed edge to the graph with capacity c, and the - ///corresponding reversed edge with capacity 0. + /// This function adds a directed edge to the graph with capacity c, and the + /// corresponding reversed edge with capacity 0. pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u32) -> Result<(), String> { - if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) { - return Err("The graph does not contain the provided vertex.".to_string()); + let idu = self.get_vertex_id(&u)?; + let idv = self.get_vertex_id(&v)?; + if idu == idv { + return Err("Cannot add edge from vertex to itself in flow graph".into()); } - let idu = self.vertextoid[&u]; - let idv = self.vertextoid[&v]; + let rev_u = self.graph[idu].len(); let rev_v = self.graph[idv].len(); self.graph[idu].push(FlowEdge { @@ -93,28 +101,22 @@ impl Graph { Ok(()) } - ///This function returns the list of vertices that receive a positive flow from - ///vertex v. + /// This function returns the list of vertices that receive a positive flow from + /// vertex v. pub fn get_positive_flow_from(&self, v: Vertex) -> Result, String> { - if !self.vertextoid.contains_key(&v) { - return Err("The graph does not contain the provided vertex.".to_string()); - } - let idv = self.vertextoid[&v]; + let idv = self.get_vertex_id(&v)?; let mut result = Vec::::new(); for edge in self.graph[idv].iter() { if edge.flow > 0 { - result.push(self.idtovertex[edge.dest]); + result.push(self.id_to_vertex[edge.dest]); } } Ok(result) } - ///This function returns the value of the flow incoming to v. + /// This function returns the value of the flow incoming to v. pub fn get_inflow(&self, v: Vertex) -> Result { - if !self.vertextoid.contains_key(&v) { - return Err("The graph does not contain the provided vertex.".to_string()); - } - let idv = self.vertextoid[&v]; + let idv = self.get_vertex_id(&v)?; let mut result = 0; for edge in self.graph[idv].iter() { result += max(0, self.graph[edge.dest][edge.rev].flow); @@ -122,12 +124,9 @@ impl Graph { Ok(result) } - ///This function returns the value of the flow outgoing from v. + /// This function returns the value of the flow outgoing from v. pub fn get_outflow(&self, v: Vertex) -> Result { - if !self.vertextoid.contains_key(&v) { - return Err("The graph does not contain the provided vertex.".to_string()); - } - let idv = self.vertextoid[&v]; + let idv = self.get_vertex_id(&v)?; let mut result = 0; for edge in self.graph[idv].iter() { result += max(0, edge.flow); @@ -135,19 +134,19 @@ impl Graph { Ok(result) } - ///This function computes the flow total value by computing the outgoing flow - ///from the source. + /// This function computes the flow total value by computing the outgoing flow + /// from the source. pub fn get_flow_value(&mut self) -> Result { self.get_outflow(Vertex::Source) } - ///This function shuffles the order of the edge lists. It keeps the ids of the - ///reversed edges consistent. + /// This function shuffles the order of the edge lists. It keeps the ids of the + /// reversed edges consistent. fn shuffle_edges(&mut self) { let mut rng = rand::thread_rng(); for i in 0..self.graph.len() { self.graph[i].shuffle(&mut rng); - //We need to update the ids of the reverse edges. + // We need to update the ids of the reverse edges. for j in 0..self.graph[i].len() { let target_v = self.graph[i][j].dest; let target_rev = self.graph[i][j].rev; @@ -156,97 +155,86 @@ impl Graph { } } - ///Computes an upper bound of the flow on the graph - pub fn flow_upper_bound(&self) -> u32 { - let idsource = self.vertextoid[&Vertex::Source]; + /// Computes an upper bound of the flow on the graph + pub fn flow_upper_bound(&self) -> Result { + let idsource = self.get_vertex_id(&Vertex::Source)?; let mut flow_upper_bound = 0; for edge in self.graph[idsource].iter() { flow_upper_bound += edge.cap; } - flow_upper_bound + Ok(flow_upper_bound) } - ///This function computes the maximal flow using Dinic's algorithm. It starts with - ///the flow values already present in the graph. So it is possible to add some edge to - ///the graph, compute a flow, add other edges, update the flow. + /// This function computes the maximal flow using Dinic's algorithm. It starts with + /// the flow values already present in the graph. So it is possible to add some edge to + /// the graph, compute a flow, add other edges, update the flow. pub fn compute_maximal_flow(&mut self) -> Result<(), String> { - if !self.vertextoid.contains_key(&Vertex::Source) { - return Err("The graph does not contain a source.".to_string()); - } - if !self.vertextoid.contains_key(&Vertex::Sink) { - return Err("The graph does not contain a sink.".to_string()); - } - - let idsource = self.vertextoid[&Vertex::Source]; - let idsink = self.vertextoid[&Vertex::Sink]; + let idsource = self.get_vertex_id(&Vertex::Source)?; + let idsink = self.get_vertex_id(&Vertex::Sink)?; let nb_vertices = self.graph.len(); - let flow_upper_bound = self.flow_upper_bound(); + let flow_upper_bound = self.flow_upper_bound()?; - //To ensure the dispersion of the associations generated by the - //assignation, we shuffle the neighbours of the nodes. Hence, - //the vertices do not consider their neighbours in the same order. + // To ensure the dispersion of the associations generated by the + // assignation, we shuffle the neighbours of the nodes. Hence, + // the vertices do not consider their neighbours in the same order. self.shuffle_edges(); - //We run Dinic's max flow algorithm + // We run Dinic's max flow algorithm loop { - //We build the level array from Dinic's algorithm. + // We build the level array from Dinic's algorithm. let mut level = vec![None; nb_vertices]; let mut fifo = VecDeque::new(); fifo.push_back((idsource, 0)); - while !fifo.is_empty() { - if let Some((id, lvl)) = fifo.pop_front() { - if level[id] == None { - //it means id has not yet been reached - level[id] = Some(lvl); - for edge in self.graph[id].iter() { - if edge.cap as i32 - edge.flow > 0 { - fifo.push_back((edge.dest, lvl + 1)); - } + while let Some((id, lvl)) = fifo.pop_front() { + if level[id] == None { + // it means id has not yet been reached + level[id] = Some(lvl); + for edge in self.graph[id].iter() { + if edge.cap as i32 - edge.flow > 0 { + fifo.push_back((edge.dest, lvl + 1)); } } } } if level[idsink] == None { - //There is no residual flow + // There is no residual flow break; } - //Now we run DFS respecting the level array + // Now we run DFS respecting the level array let mut next_nbd = vec![0; nb_vertices]; - let mut lifo = VecDeque::new(); + let mut lifo = Vec::new(); - lifo.push_back((idsource, flow_upper_bound)); + lifo.push((idsource, flow_upper_bound)); - while let Some((id_tmp, f_tmp)) = lifo.back() { - let id = *id_tmp; - let f = *f_tmp; + while let Some((id, f)) = lifo.last().cloned() { if id == idsink { - //The DFS reached the sink, we can add a - //residual flow. - lifo.pop_back(); - while let Some((id, _)) = lifo.pop_back() { + // The DFS reached the sink, we can add a + // residual flow. + lifo.pop(); + while let Some((id, _)) = lifo.pop() { let nbd = next_nbd[id]; self.graph[id][nbd].flow += f as i32; let id_rev = self.graph[id][nbd].dest; let nbd_rev = self.graph[id][nbd].rev; self.graph[id_rev][nbd_rev].flow -= f as i32; } - lifo.push_back((idsource, flow_upper_bound)); + lifo.push((idsource, flow_upper_bound)); continue; } - //else we did not reach the sink + // else we did not reach the sink let nbd = next_nbd[id]; if nbd >= self.graph[id].len() { - //There is nothing to explore from id anymore - lifo.pop_back(); - if let Some((parent, _)) = lifo.back() { + // There is nothing to explore from id anymore + lifo.pop(); + if let Some((parent, _)) = lifo.last() { next_nbd[*parent] += 1; } continue; } - //else we can try to send flow from id to its nbd + // else we can try to send flow from id to its nbd let new_flow = min( f as i32, self.graph[id][nbd].cap as i32 - self.graph[id][nbd].flow, @@ -257,19 +245,19 @@ impl Graph { } if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) { if lvldest <= lvlid { - //We cannot send flow to nbd. + // We cannot send flow to nbd. next_nbd[id] += 1; continue; } } - //otherwise, we send flow to nbd. - lifo.push_back((self.graph[id][nbd].dest, new_flow)); + // otherwise, we send flow to nbd. + lifo.push((self.graph[id][nbd].dest, new_flow)); } } Ok(()) } - ///This function takes a flow, and a cost function on the edges, and tries to find an + /// This function takes a flow, and a cost function on the edges, and tries to find an /// equivalent flow with a better cost, by finding improving overflow cycles. It uses /// as subroutine the Bellman Ford algorithm run up to path_length. /// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and @@ -279,19 +267,19 @@ impl Graph { cost: &CostFunction, path_length: usize, ) -> Result<(), String> { - //We build the weighted graph g where we will look for negative cycle + // We build the weighted graph g where we will look for negative cycle let mut gf = self.build_cost_graph(cost)?; let mut cycles = gf.list_negative_cycles(path_length); while !cycles.is_empty() { - //we enumerate negative cycles + // we enumerate negative cycles for c in cycles.iter() { for i in 0..c.len() { - //We add one flow unit to the edge (u,v) of cycle c - let idu = self.vertextoid[&c[i]]; - let idv = self.vertextoid[&c[(i + 1) % c.len()]]; + // We add one flow unit to the edge (u,v) of cycle c + let idu = self.vertex_to_id[&c[i]]; + let idv = self.vertex_to_id[&c[(i + 1) % c.len()]]; for j in 0..self.graph[idu].len() { - //since idu appears at most once in the cycles, we enumerate every - //edge at most once. + // since idu appears at most once in the cycles, we enumerate every + // edge at most once. let edge = self.graph[idu][j]; if edge.dest == idv { self.graph[idu][j].flow += 1; @@ -308,16 +296,16 @@ impl Graph { Ok(()) } - ///Construct the weighted graph G_f from the flow and the cost function + /// Construct the weighted graph G_f from the flow and the cost function fn build_cost_graph(&self, cost: &CostFunction) -> Result, String> { - let mut g = Graph::::new(&self.idtovertex); - let nb_vertices = self.idtovertex.len(); + let mut g = Graph::::new(&self.id_to_vertex); + let nb_vertices = self.id_to_vertex.len(); for i in 0..nb_vertices { for edge in self.graph[i].iter() { if edge.cap as i32 - edge.flow > 0 { - //It is possible to send overflow through this edge - let u = self.idtovertex[i]; - let v = self.idtovertex[edge.dest]; + // It is possible to send overflow through this edge + let u = self.id_to_vertex[i]; + let v = self.id_to_vertex[edge.dest]; if cost.contains_key(&(u, v)) { g.add_edge(u, v, cost[&(u, v)])?; } else if cost.contains_key(&(v, u)) { @@ -333,29 +321,26 @@ impl Graph { } impl Graph { - ///This function adds a single directed weighted edge to the graph. + /// This function adds a single directed weighted edge to the graph. pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i32) -> Result<(), String> { - if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) { - return Err("The graph does not contain the provided vertex.".to_string()); - } - let idu = self.vertextoid[&u]; - let idv = self.vertextoid[&v]; + let idu = self.get_vertex_id(&u)?; + let idv = self.get_vertex_id(&v)?; self.graph[idu].push(WeightedEdge { w, dest: idv }); Ok(()) } - ///This function lists the negative cycles it manages to find after path_length - ///iterations of the main loop of the Bellman-Ford algorithm. For the classical - ///algorithm, path_length needs to be equal to the number of vertices. However, - ///for particular graph structures like in our case, the algorithm is still correct - ///when path_length is the length of the longest possible simple path. - ///See the formal description of the algorithm for more details. + /// This function lists the negative cycles it manages to find after path_length + /// iterations of the main loop of the Bellman-Ford algorithm. For the classical + /// algorithm, path_length needs to be equal to the number of vertices. However, + /// for particular graph structures like in our case, the algorithm is still correct + /// when path_length is the length of the longest possible simple path. + /// See the formal description of the algorithm for more details. fn list_negative_cycles(&self, path_length: usize) -> Vec> { let nb_vertices = self.graph.len(); - //We start with every vertex at distance 0 of some imaginary extra -1 vertex. + // We start with every vertex at distance 0 of some imaginary extra -1 vertex. let mut distance = vec![0; nb_vertices]; - //The prev vector collects for every vertex from where does the shortest path come + // The prev vector collects for every vertex from where does the shortest path come let mut prev = vec![None; nb_vertices]; for _ in 0..path_length + 1 { @@ -369,29 +354,35 @@ impl Graph { } } - //If self.graph contains a negative cycle, then at this point the graph described - //by prev (which is a directed 1-forest/functional graph) - //must contain a cycle. We list the cycles of prev. + // If self.graph contains a negative cycle, then at this point the graph described + // by prev (which is a directed 1-forest/functional graph) + // must contain a cycle. We list the cycles of prev. let cycles_prev = cycles_of_1_forest(&prev); - //Remark that the cycle in prev is in the reverse order compared to the cycle - //in the graph. Thus the .rev(). + // Remark that the cycle in prev is in the reverse order compared to the cycle + // in the graph. Thus the .rev(). return cycles_prev .iter() - .map(|cycle| cycle.iter().rev().map(|id| self.idtovertex[*id]).collect()) + .map(|cycle| { + cycle + .iter() + .rev() + .map(|id| self.id_to_vertex[*id]) + .collect() + }) .collect(); } } -///This function returns the list of cycles of a directed 1 forest. It does not -///check for the consistency of the input. +/// This function returns the list of cycles of a directed 1 forest. It does not +/// check for the consistency of the input. fn cycles_of_1_forest(forest: &[Option]) -> Vec> { let mut cycles = Vec::>::new(); let mut time_of_discovery = vec![None; forest.len()]; for t in 0..forest.len() { let mut id = t; - //while we are on a valid undiscovered node + // while we are on a valid undiscovered node while time_of_discovery[id] == None { time_of_discovery[id] = Some(t); if let Some(i) = forest[id] { @@ -401,8 +392,8 @@ fn cycles_of_1_forest(forest: &[Option]) -> Vec> { } } if forest[id] != None && time_of_discovery[id] == Some(t) { - //We discovered an id that we explored at this iteration t. - //It means we are on a cycle + // We discovered an id that we explored at this iteration t. + // It means we are on a cycle let mut cy = vec![id; 1]; let mut id2 = id; while let Some(id_next) = forest[id2] { diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 38e56b88..95f69dc8 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -19,7 +19,7 @@ use std::convert::TryInto; const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; -//The Message type will be used to collect information on the algorithm. +// The Message type will be used to collect information on the algorithm. type Message = Vec; /// The layout of the cluster, i.e. the list of roles @@ -30,11 +30,11 @@ pub struct ClusterLayout { pub replication_factor: usize, - ///This attribute is only used to retain the previously computed partition size, - ///to know to what extent does it change with the layout update. + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. pub partition_size: u32, - ///Parameters used to compute the assignation currently given by - ///ring_assignation_data + /// Parameters used to compute the assignation currently given by + /// ring_assignation_data pub parameters: LayoutParameters, pub roles: LwwMap, @@ -53,14 +53,14 @@ pub struct ClusterLayout { pub ring_assignation_data: Vec, /// Parameters to be used in the next partition assignation computation. - pub staged_parameters: Lww, + pub staging_parameters: Lww, /// Role changes which are staged for the next version of the layout - pub staging: LwwMap, + pub staging_roles: LwwMap, pub staging_hash: Hash, } -///This struct is used to set the parameters to be used in the assignation computation -///algorithm. It is stored as a Crdt. +/// This struct is used to set the parameters to be used in the assignation computation +/// algorithm. It is stored as a Crdt. #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct LayoutParameters { pub zone_redundancy: usize, @@ -114,20 +114,19 @@ impl NodeRole { } } -//Implementation of the ClusterLayout methods unrelated to the assignation algorithm. +// Implementation of the ClusterLayout methods unrelated to the assignation algorithm. impl ClusterLayout { pub fn new(replication_factor: usize) -> Self { - //We set the default zone redundancy to be equal to the replication factor, - //i.e. as strict as possible. + // We set the default zone redundancy to be equal to the replication factor, + // i.e. as strict as possible. let parameters = LayoutParameters { zone_redundancy: replication_factor, }; - let staged_parameters = Lww::::new(parameters.clone()); + let staging_parameters = Lww::::new(parameters.clone()); let empty_lwwmap = LwwMap::new(); - let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]); - ClusterLayout { + let mut ret = ClusterLayout { version: 0, replication_factor, partition_size: 0, @@ -135,10 +134,17 @@ impl ClusterLayout { node_id_vec: Vec::new(), ring_assignation_data: Vec::new(), parameters, - staged_parameters, - staging: empty_lwwmap, - staging_hash: empty_lwwmap_hash, - } + staging_parameters, + staging_roles: empty_lwwmap, + staging_hash: [0u8; 32].into(), + }; + ret.staging_hash = ret.calculate_staging_hash(); + ret + } + + fn calculate_staging_hash(&self) -> Hash { + let hashed_tuple = (&self.staging_roles, &self.staging_parameters); + blake2sum(&rmp_to_vec_all_named(&hashed_tuple).unwrap()[..]) } pub fn merge(&mut self, other: &ClusterLayout) -> bool { @@ -148,16 +154,15 @@ impl ClusterLayout { true } Ordering::Equal => { - let param_changed = self.staged_parameters.get() != other.staged_parameters.get(); - self.staged_parameters.merge(&other.staged_parameters); - self.staging.merge(&other.staging); + self.staging_parameters.merge(&other.staging_parameters); + self.staging_roles.merge(&other.staging_roles); - let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); - let stage_changed = new_staging_hash != self.staging_hash; + let new_staging_hash = self.calculate_staging_hash(); + let changed = new_staging_hash != self.staging_hash; self.staging_hash = new_staging_hash; - stage_changed || param_changed + changed } Ordering::Less => false, } @@ -179,13 +184,14 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - self.roles.merge(&self.staging); + self.roles.merge(&self.staging_roles); self.roles.retain(|(_, _, v)| v.0.is_some()); + self.parameters = self.staging_parameters.get().clone(); let msg = self.calculate_partition_assignation()?; - self.staging.clear(); - self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + self.staging_roles.clear(); + self.staging_hash = self.calculate_staging_hash(); self.version += 1; @@ -208,9 +214,9 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - self.staging.clear(); - self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); - self.staged_parameters.update(self.parameters.clone()); + self.staging_roles.clear(); + self.staging_hash = self.calculate_staging_hash(); + self.staging_parameters.update(self.parameters.clone()); self.version += 1; @@ -235,7 +241,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - ///Returns the uuids of the non_gateway nodes in self.node_id_vec. + /// Returns the uuids of the non_gateway nodes in self.node_id_vec. pub fn nongateway_nodes(&self) -> Vec { let mut result = Vec::::new(); for uuid in self.node_id_vec.iter() { @@ -247,7 +253,7 @@ To know the correct value of the new layout version, invoke `garage layout show` result } - ///Given a node uuids, this function returns the label of its zone + /// Given a node uuids, this function returns the label of its zone pub fn get_node_zone(&self, uuid: &Uuid) -> Result { match self.node_role(uuid) { Some(role) => Ok(role.zone.clone()), @@ -257,7 +263,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - ///Given a node uuids, this function returns its capacity or fails if it does not have any + /// Given a node uuids, this function returns its capacity or fails if it does not have any pub fn get_node_capacity(&self, uuid: &Uuid) -> Result { match self.node_role(uuid) { Some(NodeRole { @@ -273,7 +279,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - ///Returns the number of partitions associated to this node in the ring + /// Returns the number of partitions associated to this node in the ring pub fn get_node_usage(&self, uuid: &Uuid) -> Result { for (i, id) in self.node_id_vec.iter().enumerate() { if id == uuid { @@ -293,7 +299,7 @@ To know the correct value of the new layout version, invoke `garage layout show` )) } - ///Returns the sum of capacities of non gateway nodes in the cluster + /// Returns the sum of capacities of non gateway nodes in the cluster pub fn get_total_capacity(&self) -> Result { let mut total_capacity = 0; for uuid in self.nongateway_nodes().iter() { @@ -307,7 +313,7 @@ To know the correct value of the new layout version, invoke `garage layout show` /// returns true if consistent, false if error pub fn check(&self) -> bool { // Check that the hash of the staging data is correct - let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + let staging_hash = self.calculate_staging_hash(); if staging_hash != self.staging_hash { return false; } @@ -346,14 +352,14 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - //Check that every partition is associated to distinct nodes + // Check that every partition is associated to distinct nodes let rf = self.replication_factor; for p in 0..(1 << PARTITION_BITS) { let nodes_of_p = self.ring_assignation_data[rf * p..rf * (p + 1)].to_vec(); if nodes_of_p.iter().unique().count() != rf { return false; } - //Check that every partition is spread over at least zone_redundancy zones. + // Check that every partition is spread over at least zone_redundancy zones. let zones_of_p = nodes_of_p.iter().map(|n| { self.get_node_zone(&self.node_id_vec[*n as usize]) .expect("Zone not found.") @@ -364,7 +370,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - //Check that the nodes capacities is consistent with the stored partitions + // Check that the nodes capacities is consistent with the stored partitions let mut node_usage = vec![0; MAX_NODE_NUMBER]; for n in self.ring_assignation_data.iter() { node_usage[*n as usize] += 1; @@ -380,8 +386,8 @@ To know the correct value of the new layout version, invoke `garage layout show` } } - //Check that the partition size stored is the one computed by the asignation - //algorithm. + // Check that the partition size stored is the one computed by the asignation + // algorithm. let cl2 = self.clone(); let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().expect("Critical Error"); match cl2.compute_optimal_partition_size(&zone_to_id) { @@ -394,7 +400,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } } -//Implementation of the ClusterLayout methods related to the assignation algorithm. +// Implementation of the ClusterLayout methods related to the assignation algorithm. impl ClusterLayout { /// This function calculates a new partition-to-node assignation. /// The computed assignation respects the node replication factor @@ -403,16 +409,13 @@ impl ClusterLayout { /// Among such optimal assignation, it minimizes the distance to /// the former assignation (if any) to minimize the amount of /// data to be moved. - // Staged role changes must be merged with nodes roles before calling this function, - // hence it must only be called from apply_staged_changes() and hence is not public. + /// Staged role changes must be merged with nodes roles before calling this function, + /// hence it must only be called from apply_staged_changes() and hence is not public. fn calculate_partition_assignation(&mut self) -> Result { - //We update the node ids, since the node role list might have changed with the - //changes in the layout. We retrieve the old_assignation reframed with new ids + // We update the node ids, since the node role list might have changed with the + // changes in the layout. We retrieve the old_assignation reframed with new ids let old_assignation_opt = self.update_node_id_vec()?; - //We update the parameters - self.parameters = self.staged_parameters.get().clone(); - let mut msg = Message::new(); msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into()); msg.push("".into()); @@ -422,8 +425,8 @@ impl ClusterLayout { self.replication_factor, self.parameters.zone_redundancy )); - //We generate for once numerical ids for the zones of non gateway nodes, - //to use them as indices in the flow graphs. + // We generate for once numerical ids for the zones of non gateway nodes, + // to use them as indices in the flow graphs. let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?; let nb_nongateway_nodes = self.nongateway_nodes().len(); @@ -443,10 +446,10 @@ impl ClusterLayout { ))); } - //We compute the optimal partition size - //Capacities should be given in a unit so that partition size is at least 100. - //In this case, integer rounding plays a marginal role in the percentages of - //optimality. + // We compute the optimal partition size + // Capacities should be given in a unit so that partition size is at least 100. + // In this case, integer rounding plays a marginal role in the percentages of + // optimality. let partition_size = self.compute_optimal_partition_size(&zone_to_id)?; if old_assignation_opt != None { @@ -461,7 +464,7 @@ impl ClusterLayout { partition_size )); } - //We write the partition size. + // We write the partition size. self.partition_size = partition_size; if partition_size < 100 { @@ -472,15 +475,15 @@ impl ClusterLayout { ); } - //We compute a first flow/assignation that is heuristically close to the previous - //assignation + // We compute a first flow/assignation that is heuristically close to the previous + // assignation let mut gflow = self.compute_candidate_assignation(&zone_to_id, &old_assignation_opt)?; if let Some(assoc) = &old_assignation_opt { - //We minimize the distance to the previous assignation. + // We minimize the distance to the previous assignation. self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?; } - //We display statistics of the computation + // We display statistics of the computation msg.append(&mut self.output_stat( &gflow, &old_assignation_opt, @@ -489,7 +492,7 @@ impl ClusterLayout { )?); msg.push("".to_string()); - //We update the layout structure + // We update the layout structure self.update_ring_from_flow(id_to_zone.len(), &gflow)?; if !self.check() { @@ -508,8 +511,8 @@ impl ClusterLayout { /// do modify assignation_ring and node_id_vec. fn update_node_id_vec(&mut self) -> Result>>, Error> { // (1) We compute the new node list - //Non gateway nodes should be coded on 8bits, hence they must be first in the list - //We build the new node ids + // Non gateway nodes should be coded on 8bits, hence they must be first in the list + // We build the new node ids let mut new_non_gateway_nodes: Vec = self .roles .items() @@ -542,12 +545,12 @@ impl ClusterLayout { self.node_id_vec = new_node_id_vec.clone(); // (2) We retrieve the old association - //We rewrite the old association with the new indices. We only consider partition - //to node assignations where the node is still in use. + // We rewrite the old association with the new indices. We only consider partition + // to node assignations where the node is still in use. let mut old_assignation = vec![Vec::::new(); NB_PARTITIONS]; if self.ring_assignation_data.is_empty() { - //This is a new association + // This is a new association return Ok(None); } if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor { @@ -558,11 +561,11 @@ impl ClusterLayout { )); } - //We build a translation table between the uuid and new ids + // We build a translation table between the uuid and new ids let mut uuid_to_new_id = HashMap::::new(); - //We add the indices of only the new non-gateway nodes that can be used in the - //association ring + // We add the indices of only the new non-gateway nodes that can be used in the + // association ring for (i, uuid) in new_node_id_vec.iter().enumerate() { uuid_to_new_id.insert(*uuid, i); } @@ -577,14 +580,14 @@ impl ClusterLayout { } } - //We write the ring + // We write the ring self.ring_assignation_data = Vec::::new(); Ok(Some(old_assignation)) } - ///This function generates ids for the zone of the nodes appearing in - ///self.node_id_vec. + /// This function generates ids for the zone of the nodes appearing in + /// self.node_id_vec. fn generate_nongateway_zone_ids(&self) -> Result<(Vec, HashMap), Error> { let mut id_to_zone = Vec::::new(); let mut zone_to_id = HashMap::::new(); @@ -607,8 +610,8 @@ impl ClusterLayout { Ok((id_to_zone, zone_to_id)) } - ///This function computes by dichotomy the largest realizable partition size, given - ///the layout roles and parameters. + /// This function computes by dichotomy the largest realizable partition size, given + /// the layout roles and parameters. fn compute_optimal_partition_size( &self, zone_to_id: &HashMap, @@ -662,13 +665,13 @@ impl ClusterLayout { vertices } - ///Generates the graph to compute the maximal flow corresponding to the optimal - ///partition assignation. - ///exclude_assoc is the set of (partition, node) association that we are forbidden - ///to use (hence we do not add the corresponding edge to the graph). This parameter - ///is used to compute a first flow that uses only edges appearing in the previous - ///assignation. This produces a solution that heuristically should be close to the - ///previous one. + /// Generates the graph to compute the maximal flow corresponding to the optimal + /// partition assignation. + /// exclude_assoc is the set of (partition, node) association that we are forbidden + /// to use (hence we do not add the corresponding edge to the graph). This parameter + /// is used to compute a first flow that uses only edges appearing in the previous + /// assignation. This produces a solution that heuristically should be close to the + /// previous one. fn generate_flow_graph( &self, partition_size: u32, @@ -709,14 +712,14 @@ impl ClusterLayout { Ok(g) } - ///This function computes a first optimal assignation (in the form of a flow graph). + /// This function computes a first optimal assignation (in the form of a flow graph). fn compute_candidate_assignation( &self, zone_to_id: &HashMap, prev_assign_opt: &Option>>, ) -> Result, Error> { - //We list the (partition,node) associations that are not used in the - //previous assignation + // We list the (partition,node) associations that are not used in the + // previous assignation let mut exclude_edge = HashSet::<(usize, usize)>::new(); if let Some(prev_assign) = prev_assign_opt { let nb_nodes = self.nongateway_nodes().len(); @@ -730,13 +733,13 @@ impl ClusterLayout { } } - //We compute the best flow using only the edges used in the previous assignation + // We compute the best flow using only the edges used in the previous assignation let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?; g.compute_maximal_flow()?; - //We add the excluded edges and compute the maximal flow with the full graph. - //The algorithm is such that it will start with the flow that we just computed - //and find ameliorating paths from that. + // We add the excluded edges and compute the maximal flow with the full graph. + // The algorithm is such that it will start with the flow that we just computed + // and find ameliorating paths from that. for (p, n) in exclude_edge.iter() { let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?; @@ -745,16 +748,16 @@ impl ClusterLayout { Ok(g) } - ///This function updates the flow graph gflow to minimize the distance between - ///its corresponding assignation and the previous one + /// This function updates the flow graph gflow to minimize the distance between + /// its corresponding assignation and the previous one fn minimize_rebalance_load( &self, gflow: &mut Graph, zone_to_id: &HashMap, prev_assign: &[Vec], ) -> Result<(), Error> { - //We define a cost function on the edges (pairs of vertices) corresponding - //to the distance between the two assignations. + // We define a cost function on the edges (pairs of vertices) corresponding + // to the distance between the two assignations. let mut cost = CostFunction::new(); for (p, assoc_p) in prev_assign.iter().enumerate() { for n in assoc_p.iter() { @@ -763,9 +766,9 @@ impl ClusterLayout { } } - //We compute the maximal length of a simple path in gflow. It is used in the - //Bellman-Ford algorithm in optimize_flow_with_cost to set the number - //of iterations. + // We compute the maximal length of a simple path in gflow. It is used in the + // Bellman-Ford algorithm in optimize_flow_with_cost to set the number + // of iterations. let nb_nodes = self.nongateway_nodes().len(); let path_length = 4 * nb_nodes; gflow.optimize_flow_with_cost(&cost, path_length)?; @@ -773,7 +776,7 @@ impl ClusterLayout { Ok(()) } - ///This function updates the assignation ring from the flow graph. + /// This function updates the assignation ring from the flow graph. fn update_ring_from_flow( &mut self, nb_zones: usize, @@ -801,8 +804,8 @@ impl ClusterLayout { Ok(()) } - ///This function returns a message summing up the partition repartition of the new - ///layout, and other statistics of the partition assignation computation. + /// This function returns a message summing up the partition repartition of the new + /// layout, and other statistics of the partition assignation computation. fn output_stat( &self, gflow: &Graph, @@ -837,7 +840,7 @@ impl ClusterLayout { used_cap / self.replication_factor as u32 )); - //We define and fill in the following tables + // We define and fill in the following tables let storing_nodes = self.nongateway_nodes(); let mut new_partitions = vec![0; storing_nodes.len()]; let mut stored_partitions = vec![0; storing_nodes.len()]; @@ -879,7 +882,7 @@ impl ClusterLayout { new_partitions_zone = stored_partitions_zone.clone(); } - //We display the statistics + // We display the statistics msg.push("".into()); if *prev_assign_opt != None { @@ -951,27 +954,27 @@ impl ClusterLayout { } } -//==================================================================================== +// ==================================================================================== #[cfg(test)] mod tests { use super::{Error, *}; use std::cmp::min; - //This function checks that the partition size S computed is at least better than the - //one given by a very naive algorithm. To do so, we try to run the naive algorithm - //assuming a partion size of S+1. If we succed, it means that the optimal assignation - //was not optimal. The naive algorithm is the following : - //- we compute the max number of partitions associated to every node, capped at the - //partition number. It gives the number of tokens of every node. - //- every zone has a number of tokens equal to the sum of the tokens of its nodes. - //- we cycle over the partitions and associate zone tokens while respecting the - //zone redundancy constraint. - //NOTE: the naive algorithm is not optimal. Counter example: - //take nb_partition = 3 ; replication_factor = 5; redundancy = 4; - //number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2) - //With these parameters, the naive algo fails, whereas there is a solution: - //(A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E) + // This function checks that the partition size S computed is at least better than the + // one given by a very naive algorithm. To do so, we try to run the naive algorithm + // assuming a partion size of S+1. If we succed, it means that the optimal assignation + // was not optimal. The naive algorithm is the following : + // - we compute the max number of partitions associated to every node, capped at the + // partition number. It gives the number of tokens of every node. + // - every zone has a number of tokens equal to the sum of the tokens of its nodes. + // - we cycle over the partitions and associate zone tokens while respecting the + // zone redundancy constraint. + // NOTE: the naive algorithm is not optimal. Counter example: + // take nb_partition = 3 ; replication_factor = 5; redundancy = 4; + // number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2) + // With these parameters, the naive algo fails, whereas there is a solution: + // (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E) fn check_against_naive(cl: &ClusterLayout) -> Result { let over_size = cl.partition_size + 1; let mut zone_token = HashMap::::new(); @@ -994,8 +997,8 @@ mod tests { ); } - //For every partition, we count the number of zone already associated and - //the name of the last zone associated + // For every partition, we count the number of zone already associated and + // the name of the last zone associated let mut id_zone_token = vec![0; zones.len()]; for (z, t) in zone_token.iter() { @@ -1049,7 +1052,7 @@ mod tests { cl.node_id_vec.push(x); } - let update = cl.staging.update_mutator( + let update = cl.staging_roles.update_mutator( cl.node_id_vec[i], NodeRoleV(Some(NodeRole { zone: (node_zone_vec[i].to_string()), @@ -1057,10 +1060,10 @@ mod tests { tags: (vec![]), })), ); - cl.staging.merge(&update); + cl.staging_roles.merge(&update); } - cl.staging_hash = blake2sum(&rmp_to_vec_all_named(&cl.staging).unwrap()[..]); - cl.staged_parameters + cl.staging_hash = cl.calculate_staging_hash(); + cl.staging_parameters .update(LayoutParameters { zone_redundancy }); }