Garage v0.9 #473

Merged
lx merged 175 commits from next into main 2023-10-10 13:28:29 +00:00
8 changed files with 1109 additions and 985 deletions
Showing only changes of commit 4abab246f1 - Show all commits

View file

@ -163,10 +163,10 @@ pub async fn handle_apply_cluster_layout(
let layout = garage.system.get_cluster_layout(); let layout = garage.system.get_cluster_layout();
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
//TODO : how to display msg ? Should it be in the Body Response ? //TODO : how to display msg ? Should it be in the Body Response ?
for s in msg.iter() { for s in msg.iter() {
println!("{}", s); println!("{}", s);
} }
garage.system.update_cluster_layout(&layout).await?; garage.system.update_cluster_layout(&layout).await?;

View file

@ -4,7 +4,6 @@ extern crate tracing;
#[cfg(not(any(feature = "lmdb", feature = "sled", feature = "sqlite")))] #[cfg(not(any(feature = "lmdb", feature = "sled", feature = "sqlite")))]
//compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite."); //compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite.");
#[cfg(feature = "lmdb")] #[cfg(feature = "lmdb")]
pub mod lmdb_adapter; pub mod lmdb_adapter;
#[cfg(feature = "sled")] #[cfg(feature = "sled")]

View file

@ -27,9 +27,9 @@ pub async fn cli_layout_command_dispatch(
LayoutOperation::Revert(revert_opt) => { LayoutOperation::Revert(revert_opt) => {
cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await
} }
LayoutOperation::Config(config_opt) => { LayoutOperation::Config(config_opt) => {
cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await
} }
} }
} }
@ -190,28 +190,35 @@ pub async fn cmd_show_layout(
println!(); println!();
println!("==== PARAMETERS OF THE LAYOUT COMPUTATION ===="); println!("==== PARAMETERS OF THE LAYOUT COMPUTATION ====");
println!("Zone redundancy: {}", layout.staged_parameters.get().zone_redundancy); println!(
"Zone redundancy: {}",
layout.staged_parameters.get().zone_redundancy
);
println!(); println!();
// this will print the stats of what partitions // this will print the stats of what partitions
// will move around when we apply // will move around when we apply
match layout.calculate_partition_assignation() { match layout.calculate_partition_assignation() {
Ok(msg) => { Ok(msg) => {
for line in msg.iter() { for line in msg.iter() {
println!("{}", line); println!("{}", line);
} }
println!("To enact the staged role changes, type:"); println!("To enact the staged role changes, type:");
println!(); println!();
println!(" garage layout apply --version {}", layout.version + 1); println!(" garage layout apply --version {}", layout.version + 1);
println!(); println!();
println!( println!(
"You can also revert all proposed changes with: garage layout revert --version {}", "You can also revert all proposed changes with: garage layout revert --version {}",
layout.version + 1)}, layout.version + 1)
Err(Error::Message(s)) => { }
println!("Error while trying to compute the assignation: {}", s); Err(Error::Message(s)) => {
println!("This new layout cannot yet be applied.");}, println!("Error while trying to compute the assignation: {}", s);
_ => { println!("Unknown Error"); }, println!("This new layout cannot yet be applied.");
} }
_ => {
println!("Unknown Error");
}
}
} }
Ok(()) Ok(())
@ -225,9 +232,9 @@ pub async fn cmd_apply_layout(
let layout = fetch_layout(rpc_cli, rpc_host).await?; let layout = fetch_layout(rpc_cli, rpc_host).await?;
let (layout, msg) = layout.apply_staged_changes(apply_opt.version)?; let (layout, msg) = layout.apply_staged_changes(apply_opt.version)?;
for line in msg.iter() { for line in msg.iter() {
println!("{}", line); println!("{}", line);
} }
send_layout(rpc_cli, rpc_host, layout).await?; send_layout(rpc_cli, rpc_host, layout).await?;
@ -259,25 +266,28 @@ pub async fn cmd_config_layout(
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut layout = fetch_layout(rpc_cli, rpc_host).await?; let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
match config_opt.redundancy { match config_opt.redundancy {
None => (), None => (),
Some(r) => { Some(r) => {
if r > layout.replication_factor { if r > layout.replication_factor {
println!("The zone redundancy must be smaller or equal to the \ println!(
replication factor ({}).", layout.replication_factor); "The zone redundancy must be smaller or equal to the \
} replication factor ({}).",
else if r < 1 { layout.replication_factor
println!("The zone redundancy must be at least 1."); );
} } else if r < 1 {
else { println!("The zone redundancy must be at least 1.");
layout.staged_parameters.update(LayoutParameters{ zone_redundancy: r }); } else {
println!("The new zone redundancy has been saved ({}).", r); layout
} .staged_parameters
} .update(LayoutParameters { zone_redundancy: r });
} println!("The new zone redundancy has been saved ({}).", r);
}
}
}
send_layout(rpc_cli, rpc_host, layout).await?; send_layout(rpc_cli, rpc_host, layout).await?;
Ok(()) Ok(())
} }
// --- utility --- // --- utility ---

View file

@ -87,9 +87,9 @@ pub enum LayoutOperation {
#[structopt(name = "remove", version = garage_version())] #[structopt(name = "remove", version = garage_version())]
Remove(RemoveRoleOpt), Remove(RemoveRoleOpt),
/// Configure parameters value for the layout computation /// Configure parameters value for the layout computation
#[structopt(name = "config", version = garage_version())] #[structopt(name = "config", version = garage_version())]
Config(ConfigLayoutOpt), Config(ConfigLayoutOpt),
/// Show roles currently assigned to nodes and changes staged for commit /// Show roles currently assigned to nodes and changes staged for commit
#[structopt(name = "show", version = garage_version())] #[structopt(name = "show", version = garage_version())]
@ -104,7 +104,6 @@ pub enum LayoutOperation {
Revert(RevertLayoutOpt), Revert(RevertLayoutOpt),
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
pub struct AssignRoleOpt { pub struct AssignRoleOpt {
/// Node(s) to which to assign role (prefix of hexadecimal node id) /// Node(s) to which to assign role (prefix of hexadecimal node id)

View file

@ -1,42 +1,40 @@
//! This module deals with graph algorithms. //! This module deals with graph algorithms.
//! It is used in layout.rs to build the partition to node assignation. //! It is used in layout.rs to build the partition to node assignation.
use rand::prelude::SliceRandom; use rand::prelude::SliceRandom;
use std::cmp::{max, min}; use std::cmp::{max, min};
use std::collections::VecDeque;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::VecDeque;
//Vertex data structures used in all the graphs used in layout.rs. //Vertex data structures used in all the graphs used in layout.rs.
//usize parameters correspond to node/zone/partitions ids. //usize parameters correspond to node/zone/partitions ids.
//To understand the vertex roles below, please refer to the formal description //To understand the vertex roles below, please refer to the formal description
//of the layout computation algorithm. //of the layout computation algorithm.
#[derive(Clone,Copy,Debug, PartialEq, Eq, Hash)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum Vertex{ pub enum Vertex {
Source, Source,
Pup(usize), //The vertex p+ of partition p Pup(usize), //The vertex p+ of partition p
Pdown(usize), //The vertex p- of partition p Pdown(usize), //The vertex p- of partition p
PZ(usize,usize), //The vertex corresponding to x_(partition p, zone z) PZ(usize, usize), //The vertex corresponding to x_(partition p, zone z)
N(usize), //The vertex corresponding to node n N(usize), //The vertex corresponding to node n
Sink Sink,
} }
//Edge data structure for the flow algorithm. //Edge data structure for the flow algorithm.
//The graph is stored as an adjacency list //The graph is stored as an adjacency list
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub struct FlowEdge { pub struct FlowEdge {
cap: u32, //flow maximal capacity of the edge cap: u32, //flow maximal capacity of the edge
flow: i32, //flow value on the edge flow: i32, //flow value on the edge
dest: usize, //destination vertex id dest: usize, //destination vertex id
rev: usize, //index of the reversed edge (v, self) in the edge list of vertex v rev: usize, //index of the reversed edge (v, self) in the edge list of vertex v
} }
//Edge data structure for the detection of negative cycles. //Edge data structure for the detection of negative cycles.
//The graph is stored as a list of edges (u,v). //The graph is stored as a list of edges (u,v).
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub struct WeightedEdge { pub struct WeightedEdge {
w: i32, //weight of the edge w: i32, //weight of the edge
dest: usize, dest: usize,
} }
@ -47,375 +45,377 @@ impl Edge for WeightedEdge {}
//Struct for the graph structure. We do encapsulation here to be able to both //Struct for the graph structure. We do encapsulation here to be able to both
//provide user friendly Vertex enum to address vertices, and to use usize indices //provide user friendly Vertex enum to address vertices, and to use usize indices
//and Vec instead of HashMap in the graph algorithm to optimize execution speed. //and Vec instead of HashMap in the graph algorithm to optimize execution speed.
pub struct Graph<E : Edge>{ pub struct Graph<E: Edge> {
vertextoid : HashMap<Vertex , usize>, vertextoid: HashMap<Vertex, usize>,
idtovertex : Vec<Vertex>, idtovertex: Vec<Vertex>,
graph : Vec< Vec<E> > graph: Vec<Vec<E>>,
} }
pub type CostFunction = HashMap<(Vertex,Vertex), i32>; pub type CostFunction = HashMap<(Vertex, Vertex), i32>;
impl<E : Edge> Graph<E>{ impl<E: Edge> Graph<E> {
pub fn new(vertices : &[Vertex]) -> Self { pub fn new(vertices: &[Vertex]) -> Self {
let mut map = HashMap::<Vertex, usize>::new(); let mut map = HashMap::<Vertex, usize>::new();
for (i, vert) in vertices.iter().enumerate(){ for (i, vert) in vertices.iter().enumerate() {
map.insert(*vert , i); map.insert(*vert, i);
} }
Graph::<E> { Graph::<E> {
vertextoid : map, vertextoid: map,
idtovertex: vertices.to_vec(), idtovertex: vertices.to_vec(),
graph : vec![Vec::< E >::new(); vertices.len() ] graph: vec![Vec::<E>::new(); vertices.len()],
} }
} }
} }
impl Graph<FlowEdge>{ impl Graph<FlowEdge> {
//This function adds a directed edge to the graph with capacity c, and the //This function adds a directed edge to the graph with capacity c, and the
//corresponding reversed edge with capacity 0. //corresponding reversed edge with capacity 0.
pub fn add_edge(&mut self, u: Vertex, v:Vertex, c: u32) -> Result<(), String>{ pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u32) -> Result<(), String> {
if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) { if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) {
return Err("The graph does not contain the provided vertex.".to_string()); return Err("The graph does not contain the provided vertex.".to_string());
} }
let idu = self.vertextoid[&u]; let idu = self.vertextoid[&u];
let idv = self.vertextoid[&v]; let idv = self.vertextoid[&v];
let rev_u = self.graph[idu].len(); let rev_u = self.graph[idu].len();
let rev_v = self.graph[idv].len(); let rev_v = self.graph[idv].len();
self.graph[idu].push( FlowEdge{cap: c , dest: idv , flow: 0, rev : rev_v} ); self.graph[idu].push(FlowEdge {
self.graph[idv].push( FlowEdge{cap: 0 , dest: idu , flow: 0, rev : rev_u} ); cap: c,
Ok(()) dest: idv,
} flow: 0,
rev: rev_v,
});
self.graph[idv].push(FlowEdge {
cap: 0,
dest: idu,
flow: 0,
rev: rev_u,
});
Ok(())
}
//This function returns the list of vertices that receive a positive flow from //This function returns the list of vertices that receive a positive flow from
//vertex v. //vertex v.
pub fn get_positive_flow_from(&self , v:Vertex) -> Result< Vec<Vertex> , String>{ pub fn get_positive_flow_from(&self, v: Vertex) -> Result<Vec<Vertex>, String> {
if !self.vertextoid.contains_key(&v) { if !self.vertextoid.contains_key(&v) {
return Err("The graph does not contain the provided vertex.".to_string()); return Err("The graph does not contain the provided vertex.".to_string());
} }
let idv = self.vertextoid[&v]; let idv = self.vertextoid[&v];
let mut result = Vec::<Vertex>::new(); let mut result = Vec::<Vertex>::new();
for edge in self.graph[idv].iter() { for edge in self.graph[idv].iter() {
if edge.flow > 0 { if edge.flow > 0 {
result.push(self.idtovertex[edge.dest]); result.push(self.idtovertex[edge.dest]);
} }
} }
Ok(result) Ok(result)
} }
//This function returns the value of the flow incoming to v.
pub fn get_inflow(&self, v: Vertex) -> Result<i32, String> {
if !self.vertextoid.contains_key(&v) {
return Err("The graph does not contain the provided vertex.".to_string());
}
let idv = self.vertextoid[&v];
let mut result = 0;
for edge in self.graph[idv].iter() {
result += max(0, self.graph[edge.dest][edge.rev].flow);
}
Ok(result)
}
//This function returns the value of the flow incoming to v. //This function returns the value of the flow outgoing from v.
pub fn get_inflow(&self , v:Vertex) -> Result< i32 , String>{ pub fn get_outflow(&self, v: Vertex) -> Result<i32, String> {
if !self.vertextoid.contains_key(&v) { if !self.vertextoid.contains_key(&v) {
return Err("The graph does not contain the provided vertex.".to_string()); return Err("The graph does not contain the provided vertex.".to_string());
} }
let idv = self.vertextoid[&v]; let idv = self.vertextoid[&v];
let mut result = 0; let mut result = 0;
for edge in self.graph[idv].iter() { for edge in self.graph[idv].iter() {
result += max(0,self.graph[edge.dest][edge.rev].flow); result += max(0, edge.flow);
} }
Ok(result) Ok(result)
} }
//This function returns the value of the flow outgoing from v. //This function computes the flow total value by computing the outgoing flow
pub fn get_outflow(&self , v:Vertex) -> Result< i32 , String>{ //from the source.
if !self.vertextoid.contains_key(&v) { pub fn get_flow_value(&mut self) -> Result<i32, String> {
return Err("The graph does not contain the provided vertex.".to_string()); self.get_outflow(Vertex::Source)
} }
let idv = self.vertextoid[&v];
let mut result = 0;
for edge in self.graph[idv].iter() {
result += max(0,edge.flow);
}
Ok(result)
}
//This function computes the flow total value by computing the outgoing flow //This function shuffles the order of the edge lists. It keeps the ids of the
//from the source. //reversed edges consistent.
pub fn get_flow_value(&mut self) -> Result<i32, String> { fn shuffle_edges(&mut self) {
self.get_outflow(Vertex::Source) let mut rng = rand::thread_rng();
} for i in 0..self.graph.len() {
self.graph[i].shuffle(&mut rng);
//We need to update the ids of the reverse edges.
for j in 0..self.graph[i].len() {
let target_v = self.graph[i][j].dest;
let target_rev = self.graph[i][j].rev;
self.graph[target_v][target_rev].rev = j;
}
}
}
//This function shuffles the order of the edge lists. It keeps the ids of the //Computes an upper bound of the flow n the graph
//reversed edges consistent. pub fn flow_upper_bound(&self) -> u32 {
fn shuffle_edges(&mut self) { let idsource = self.vertextoid[&Vertex::Source];
let mut rng = rand::thread_rng(); let mut flow_upper_bound = 0;
for i in 0..self.graph.len() { for edge in self.graph[idsource].iter() {
self.graph[i].shuffle(&mut rng); flow_upper_bound += edge.cap;
//We need to update the ids of the reverse edges. }
for j in 0..self.graph[i].len() { flow_upper_bound
let target_v = self.graph[i][j].dest; }
let target_rev = self.graph[i][j].rev;
self.graph[target_v][target_rev].rev = j;
}
}
}
//Computes an upper bound of the flow n the graph //This function computes the maximal flow using Dinic's algorithm. It starts with
pub fn flow_upper_bound(&self) -> u32{ //the flow values already present in the graph. So it is possible to add some edge to
let idsource = self.vertextoid[&Vertex::Source]; //the graph, compute a flow, add other edges, update the flow.
let mut flow_upper_bound = 0; pub fn compute_maximal_flow(&mut self) -> Result<(), String> {
for edge in self.graph[idsource].iter(){ if !self.vertextoid.contains_key(&Vertex::Source) {
flow_upper_bound += edge.cap; return Err("The graph does not contain a source.".to_string());
} }
flow_upper_bound if !self.vertextoid.contains_key(&Vertex::Sink) {
} return Err("The graph does not contain a sink.".to_string());
}
//This function computes the maximal flow using Dinic's algorithm. It starts with let idsource = self.vertextoid[&Vertex::Source];
//the flow values already present in the graph. So it is possible to add some edge to let idsink = self.vertextoid[&Vertex::Sink];
//the graph, compute a flow, add other edges, update the flow.
pub fn compute_maximal_flow(&mut self) -> Result<(), String> {
if !self.vertextoid.contains_key(&Vertex::Source) {
return Err("The graph does not contain a source.".to_string());
}
if !self.vertextoid.contains_key(&Vertex::Sink) {
return Err("The graph does not contain a sink.".to_string());
}
let idsource = self.vertextoid[&Vertex::Source]; let nb_vertices = self.graph.len();
let idsink = self.vertextoid[&Vertex::Sink];
let nb_vertices = self.graph.len(); let flow_upper_bound = self.flow_upper_bound();
let flow_upper_bound = self.flow_upper_bound(); //To ensure the dispersion of the associations generated by the
//assignation, we shuffle the neighbours of the nodes. Hence,
//the vertices do not consider their neighbours in the same order.
self.shuffle_edges();
//To ensure the dispersion of the associations generated by the //We run Dinic's max flow algorithm
//assignation, we shuffle the neighbours of the nodes. Hence, loop {
//the vertices do not consider their neighbours in the same order. //We build the level array from Dinic's algorithm.
self.shuffle_edges(); let mut level = vec![None; nb_vertices];
//We run Dinic's max flow algorithm let mut fifo = VecDeque::new();
loop { fifo.push_back((idsource, 0));
//We build the level array from Dinic's algorithm. while !fifo.is_empty() {
let mut level = vec![None; nb_vertices]; if let Some((id, lvl)) = fifo.pop_front() {
if level[id] == None {
//it means id has not yet been reached
level[id] = Some(lvl);
for edge in self.graph[id].iter() {
if edge.cap as i32 - edge.flow > 0 {
fifo.push_back((edge.dest, lvl + 1));
}
}
}
}
}
if level[idsink] == None {
//There is no residual flow
break;
}
//Now we run DFS respecting the level array
let mut next_nbd = vec![0; nb_vertices];
let mut lifo = VecDeque::new();
let mut fifo = VecDeque::new(); lifo.push_back((idsource, flow_upper_bound));
fifo.push_back((idsource, 0));
while !fifo.is_empty() {
if let Some((id, lvl)) = fifo.pop_front() {
if level[id] == None { //it means id has not yet been reached
level[id] = Some(lvl);
for edge in self.graph[id].iter() {
if edge.cap as i32 - edge.flow > 0 {
fifo.push_back((edge.dest, lvl + 1));
}
}
}
}
}
if level[idsink] == None {
//There is no residual flow
break;
}
//Now we run DFS respecting the level array
let mut next_nbd = vec![0; nb_vertices];
let mut lifo = VecDeque::new();
lifo.push_back((idsource, flow_upper_bound)); while let Some((id_tmp, f_tmp)) = lifo.back() {
let id = *id_tmp;
let f = *f_tmp;
if id == idsink {
//The DFS reached the sink, we can add a
//residual flow.
lifo.pop_back();
while let Some((id, _)) = lifo.pop_back() {
let nbd = next_nbd[id];
self.graph[id][nbd].flow += f as i32;
let id_rev = self.graph[id][nbd].dest;
let nbd_rev = self.graph[id][nbd].rev;
self.graph[id_rev][nbd_rev].flow -= f as i32;
}
lifo.push_back((idsource, flow_upper_bound));
continue;
}
//else we did not reach the sink
let nbd = next_nbd[id];
if nbd >= self.graph[id].len() {
//There is nothing to explore from id anymore
lifo.pop_back();
if let Some((parent, _)) = lifo.back() {
next_nbd[*parent] += 1;
}
continue;
}
//else we can try to send flow from id to its nbd
let new_flow = min(
f as i32,
self.graph[id][nbd].cap as i32 - self.graph[id][nbd].flow,
) as u32;
if new_flow == 0 {
next_nbd[id] += 1;
continue;
}
if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) {
if lvldest <= lvlid {
//We cannot send flow to nbd.
next_nbd[id] += 1;
continue;
}
}
//otherwise, we send flow to nbd.
lifo.push_back((self.graph[id][nbd].dest, new_flow));
}
}
Ok(())
}
while let Some((id_tmp, f_tmp)) = lifo.back() { //This function takes a flow, and a cost function on the edges, and tries to find an
let id = *id_tmp; // equivalent flow with a better cost, by finding improving overflow cycles. It uses
let f = *f_tmp; // as subroutine the Bellman Ford algorithm run up to path_length.
if id == idsink { // We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and only
//The DFS reached the sink, we can add a // one needs to be present in the cost function.
//residual flow. pub fn optimize_flow_with_cost(
lifo.pop_back(); &mut self,
while let Some((id, _)) = lifo.pop_back() { cost: &CostFunction,
let nbd = next_nbd[id]; path_length: usize,
self.graph[id][nbd].flow += f as i32; ) -> Result<(), String> {
let id_rev = self.graph[id][nbd].dest; //We build the weighted graph g where we will look for negative cycle
let nbd_rev = self.graph[id][nbd].rev; let mut gf = self.build_cost_graph(cost)?;
self.graph[id_rev][nbd_rev].flow -= f as i32; let mut cycles = gf.list_negative_cycles(path_length);
} while !cycles.is_empty() {
lifo.push_back((idsource, flow_upper_bound)); //we enumerate negative cycles
continue; for c in cycles.iter() {
} for i in 0..c.len() {
//else we did not reach the sink //We add one flow unit to the edge (u,v) of cycle c
let nbd = next_nbd[id]; let idu = self.vertextoid[&c[i]];
if nbd >= self.graph[id].len() { let idv = self.vertextoid[&c[(i + 1) % c.len()]];
//There is nothing to explore from id anymore for j in 0..self.graph[idu].len() {
lifo.pop_back(); //since idu appears at most once in the cycles, we enumerate every
if let Some((parent, _)) = lifo.back() { //edge at most once.
next_nbd[*parent] += 1; let edge = self.graph[idu][j];
} if edge.dest == idv {
continue; self.graph[idu][j].flow += 1;
} self.graph[idv][edge.rev].flow -= 1;
//else we can try to send flow from id to its nbd break;
let new_flow = min(f as i32, self.graph[id][nbd].cap as i32 - self.graph[id][nbd].flow) as u32; }
if new_flow == 0 { }
next_nbd[id] += 1; }
continue; }
}
if let (Some(lvldest), Some(lvlid)) =
(level[self.graph[id][nbd].dest], level[id]){
if lvldest <= lvlid {
//We cannot send flow to nbd.
next_nbd[id] += 1;
continue;
}
}
//otherwise, we send flow to nbd.
lifo.push_back((self.graph[id][nbd].dest, new_flow));
}
}
Ok(())
}
//This function takes a flow, and a cost function on the edges, and tries to find an
// equivalent flow with a better cost, by finding improving overflow cycles. It uses
// as subroutine the Bellman Ford algorithm run up to path_length.
// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and only
// one needs to be present in the cost function.
pub fn optimize_flow_with_cost(&mut self , cost: &CostFunction, path_length: usize )
-> Result<(),String>{
//We build the weighted graph g where we will look for negative cycle
let mut gf = self.build_cost_graph(cost)?;
let mut cycles = gf.list_negative_cycles(path_length);
while !cycles.is_empty() {
//we enumerate negative cycles
for c in cycles.iter(){
for i in 0..c.len(){
//We add one flow unit to the edge (u,v) of cycle c
let idu = self.vertextoid[&c[i]];
let idv = self.vertextoid[&c[(i+1)%c.len()]];
for j in 0..self.graph[idu].len(){
//since idu appears at most once in the cycles, we enumerate every
//edge at most once.
let edge = self.graph[idu][j];
if edge.dest == idv {
self.graph[idu][j].flow += 1;
self.graph[idv][edge.rev].flow -=1;
break;
}
}
}
}
gf = self.build_cost_graph(cost)?;
cycles = gf.list_negative_cycles(path_length);
}
Ok(())
}
//Construct the weighted graph G_f from the flow and the cost function
fn build_cost_graph(&self , cost: &CostFunction) -> Result<Graph<WeightedEdge>,String>{
let mut g = Graph::<WeightedEdge>::new(&self.idtovertex);
let nb_vertices = self.idtovertex.len();
for i in 0..nb_vertices {
for edge in self.graph[i].iter() {
if edge.cap as i32 -edge.flow > 0 {
//It is possible to send overflow through this edge
let u = self.idtovertex[i];
let v = self.idtovertex[edge.dest];
if cost.contains_key(&(u,v)) {
g.add_edge(u,v, cost[&(u,v)])?;
}
else if cost.contains_key(&(v,u)) {
g.add_edge(u,v, -cost[&(v,u)])?;
}
else{
g.add_edge(u,v, 0)?;
}
}
}
}
Ok(g)
}
gf = self.build_cost_graph(cost)?;
cycles = gf.list_negative_cycles(path_length);
}
Ok(())
}
//Construct the weighted graph G_f from the flow and the cost function
fn build_cost_graph(&self, cost: &CostFunction) -> Result<Graph<WeightedEdge>, String> {
let mut g = Graph::<WeightedEdge>::new(&self.idtovertex);
let nb_vertices = self.idtovertex.len();
for i in 0..nb_vertices {
for edge in self.graph[i].iter() {
if edge.cap as i32 - edge.flow > 0 {
//It is possible to send overflow through this edge
let u = self.idtovertex[i];
let v = self.idtovertex[edge.dest];
if cost.contains_key(&(u, v)) {
g.add_edge(u, v, cost[&(u, v)])?;
} else if cost.contains_key(&(v, u)) {
g.add_edge(u, v, -cost[&(v, u)])?;
} else {
g.add_edge(u, v, 0)?;
}
}
}
}
Ok(g)
}
} }
impl Graph<WeightedEdge>{ impl Graph<WeightedEdge> {
//This function adds a single directed weighted edge to the graph. //This function adds a single directed weighted edge to the graph.
pub fn add_edge(&mut self, u: Vertex, v:Vertex, w: i32) -> Result<(), String>{ pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i32) -> Result<(), String> {
if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) { if !self.vertextoid.contains_key(&u) || !self.vertextoid.contains_key(&v) {
return Err("The graph does not contain the provided vertex.".to_string()); return Err("The graph does not contain the provided vertex.".to_string());
} }
let idu = self.vertextoid[&u]; let idu = self.vertextoid[&u];
let idv = self.vertextoid[&v]; let idv = self.vertextoid[&v];
self.graph[idu].push( WeightedEdge{ w , dest: idv} ); self.graph[idu].push(WeightedEdge { w, dest: idv });
Ok(()) Ok(())
} }
//This function lists the negative cycles it manages to find after path_length //This function lists the negative cycles it manages to find after path_length
//iterations of the main loop of the Bellman-Ford algorithm. For the classical //iterations of the main loop of the Bellman-Ford algorithm. For the classical
//algorithm, path_length needs to be equal to the number of vertices. However, //algorithm, path_length needs to be equal to the number of vertices. However,
//for particular graph structures like our case, the algorithm is still correct //for particular graph structures like our case, the algorithm is still correct
//when path_length is the length of the longest possible simple path. //when path_length is the length of the longest possible simple path.
//See the formal description of the algorithm for more details. //See the formal description of the algorithm for more details.
fn list_negative_cycles(&self, path_length: usize) -> Vec< Vec<Vertex> > { fn list_negative_cycles(&self, path_length: usize) -> Vec<Vec<Vertex>> {
let nb_vertices = self.graph.len();
let nb_vertices = self.graph.len(); //We start with every vertex at distance 0 of some imaginary extra -1 vertex.
let mut distance = vec![0; nb_vertices];
//The prev vector collects for every vertex from where does the shortest path come
let mut prev = vec![None; nb_vertices];
//We start with every vertex at distance 0 of some imaginary extra -1 vertex. for _ in 0..path_length + 1 {
let mut distance = vec![0 ; nb_vertices]; for id in 0..nb_vertices {
//The prev vector collects for every vertex from where does the shortest path come for e in self.graph[id].iter() {
let mut prev = vec![None; nb_vertices]; if distance[id] + e.w < distance[e.dest] {
distance[e.dest] = distance[id] + e.w;
prev[e.dest] = Some(id);
}
}
}
}
for _ in 0..path_length +1 { //If self.graph contains a negative cycle, then at this point the graph described
for id in 0..nb_vertices{ //by prev (which is a directed 1-forest/functional graph)
for e in self.graph[id].iter(){ //must contain a cycle. We list the cycles of prev.
if distance[id] + e.w < distance[e.dest] { let cycles_prev = cycles_of_1_forest(&prev);
distance[e.dest] = distance[id] + e.w;
prev[e.dest] = Some(id);
}
}
}
}
//If self.graph contains a negative cycle, then at this point the graph described
//by prev (which is a directed 1-forest/functional graph)
//must contain a cycle. We list the cycles of prev.
let cycles_prev = cycles_of_1_forest(&prev);
//Remark that the cycle in prev is in the reverse order compared to the cycle
//in the graph. Thus the .rev().
return cycles_prev.iter().map(|cycle| cycle.iter().rev().map(
|id| self.idtovertex[*id]
).collect() ).collect();
}
//Remark that the cycle in prev is in the reverse order compared to the cycle
//in the graph. Thus the .rev().
return cycles_prev
.iter()
.map(|cycle| cycle.iter().rev().map(|id| self.idtovertex[*id]).collect())
.collect();
}
} }
//This function returns the list of cycles of a directed 1 forest. It does not //This function returns the list of cycles of a directed 1 forest. It does not
//check for the consistency of the input. //check for the consistency of the input.
fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> { fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> {
let mut cycles = Vec::<Vec::<usize>>::new(); let mut cycles = Vec::<Vec<usize>>::new();
let mut time_of_discovery = vec![None; forest.len()]; let mut time_of_discovery = vec![None; forest.len()];
for t in 0..forest.len(){ for t in 0..forest.len() {
let mut id = t; let mut id = t;
//while we are on a valid undiscovered node //while we are on a valid undiscovered node
while time_of_discovery[id] == None { while time_of_discovery[id] == None {
time_of_discovery[id] = Some(t); time_of_discovery[id] = Some(t);
if let Some(i) = forest[id] { if let Some(i) = forest[id] {
id = i; id = i;
} } else {
else{ break;
break; }
} }
} if forest[id] != None && time_of_discovery[id] == Some(t) {
if forest[id] != None && time_of_discovery[id] == Some(t) { //We discovered an id that we explored at this iteration t.
//We discovered an id that we explored at this iteration t. //It means we are on a cycle
//It means we are on a cycle let mut cy = vec![id; 1];
let mut cy = vec![id; 1]; let mut id2 = id;
let mut id2 = id; while let Some(id_next) = forest[id2] {
while let Some(id_next) = forest[id2] { id2 = id_next;
id2 = id_next; if id2 != id {
if id2 != id { cy.push(id2);
cy.push(id2); } else {
} break;
else { }
break; }
} cycles.push(cy);
} }
cycles.push(cy); }
} cycles
}
cycles
} }

File diff suppressed because it is too large Load diff

View file

@ -7,12 +7,11 @@ mod consul;
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
mod kubernetes; mod kubernetes;
pub mod layout;
pub mod graph_algo; pub mod graph_algo;
pub mod layout;
pub mod ring; pub mod ring;
pub mod system; pub mod system;
mod metrics; mod metrics;
pub mod rpc_helper; pub mod rpc_helper;

View file

@ -565,7 +565,6 @@ impl System {
return Err(Error::Message(msg)); return Err(Error::Message(msg));
} }
let update_ring = self.update_ring.lock().await; let update_ring = self.update_ring.lock().await;
let mut layout: ClusterLayout = self.ring.borrow().layout.clone(); let mut layout: ClusterLayout = self.ring.borrow().layout.clone();