Garage v0.9 #473

Merged
lx merged 175 commits from next into main 2023-10-10 13:28:29 +00:00
2 changed files with 795 additions and 745 deletions
Showing only changes of commit 2aeaddd5e2 - Show all commits

View file

@ -1,12 +1,12 @@
use std::cmp::min;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::cmp::{min}; use std::collections::HashMap;
use std::collections::{HashMap};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_util::bipartite::*;
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
use garage_util::data::*; use garage_util::data::*;
use garage_util::bipartite::*;
use rand::prelude::SliceRandom; use rand::prelude::SliceRandom;
@ -168,454 +168,506 @@ impl ClusterLayout {
true true
} }
/// This function calculates a new partition-to-node assignation.
/// The computed assignation maximizes the capacity of a
/// partition (assuming all partitions have the same size).
/// Among such optimal assignation, it minimizes the distance to
/// the former assignation (if any) to minimize the amount of
/// data to be moved. A heuristic ensures node triplets
/// dispersion (in garage_util::bipartite::optimize_matching()).
pub fn calculate_partition_assignation(&mut self) -> bool {
//The nodes might have been updated, some might have been deleted.
//So we need to first update the list of nodes and retrieve the
//assignation.
let old_node_assignation = self.update_nodes_and_ring();
/// This function calculates a new partition-to-node assignation. let (node_zone, _) = self.get_node_zone_capacity();
/// The computed assignation maximizes the capacity of a
/// partition (assuming all partitions have the same size).
/// Among such optimal assignation, it minimizes the distance to
/// the former assignation (if any) to minimize the amount of
/// data to be moved. A heuristic ensures node triplets
/// dispersion (in garage_util::bipartite::optimize_matching()).
pub fn calculate_partition_assignation(&mut self) -> bool {
//The nodes might have been updated, some might have been deleted.
//So we need to first update the list of nodes and retrieve the
//assignation.
let old_node_assignation = self.update_nodes_and_ring();
let (node_zone, _) = self.get_node_zone_capacity(); //We compute the optimal number of partition to assign to
//every node and zone.
//We compute the optimal number of partition to assign to if let Some((part_per_nod, part_per_zone)) = self.optimal_proportions() {
//every node and zone. //We collect part_per_zone in a vec to not rely on the
if let Some((part_per_nod, part_per_zone)) = self.optimal_proportions(){ //arbitrary order in which elements are iterated in
//We collect part_per_zone in a vec to not rely on the //Hashmap::iter()
//arbitrary order in which elements are iterated in let part_per_zone_vec = part_per_zone
//Hashmap::iter() .iter()
let part_per_zone_vec = part_per_zone.iter() .map(|(x, y)| (x.clone(), *y))
.map(|(x,y)| (x.clone(),*y)) .collect::<Vec<(String, usize)>>();
.collect::<Vec<(String,usize)>>(); //We create an indexing of the zones
//We create an indexing of the zones let mut zone_id = HashMap::<String, usize>::new();
let mut zone_id = HashMap::<String,usize>::new(); for i in 0..part_per_zone_vec.len() {
for i in 0..part_per_zone_vec.len(){ zone_id.insert(part_per_zone_vec[i].0.clone(), i);
zone_id.insert(part_per_zone_vec[i].0.clone(), i); }
}
//We compute a candidate for the new partition to zone
//assignation.
let nb_zones = part_per_zone.len();
let nb_nodes = part_per_nod.len();
let nb_partitions = 1<<PARTITION_BITS;
let left_cap_vec = vec![self.replication_factor as u32 ; nb_partitions];
let right_cap_vec = part_per_zone_vec.iter().map(|(_,y)| *y as u32)
.collect();
let mut zone_assignation =
dinic_compute_matching(left_cap_vec, right_cap_vec);
//We compute a candidate for the new partition to zone
//We create the structure for the partition-to-node assignation. //assignation.
let mut node_assignation = let nb_zones = part_per_zone.len();
vec![vec![None; self.replication_factor ];nb_partitions]; let nb_nodes = part_per_nod.len();
//We will decrement part_per_nod to keep track of the number let nb_partitions = 1 << PARTITION_BITS;
//of partitions that we still have to associate. let left_cap_vec = vec![self.replication_factor as u32; nb_partitions];
let mut part_per_nod = part_per_nod.clone(); let right_cap_vec = part_per_zone_vec.iter().map(|(_, y)| *y as u32).collect();
let mut zone_assignation = dinic_compute_matching(left_cap_vec, right_cap_vec);
//We minimize the distance to the former assignation(if any)
//We get the id of the zones of the former assignation
//(and the id no_zone if there is no node assignated)
let no_zone = part_per_zone_vec.len();
let old_zone_assignation : Vec<Vec<usize>> =
old_node_assignation.iter().map(|x| x.iter().map(
|id| match *id { Some(i) => zone_id[&node_zone[i]] ,
None => no_zone }
).collect()).collect();
//We minimize the distance to the former zone assignation //We create the structure for the partition-to-node assignation.
zone_assignation = optimize_matching( let mut node_assignation = vec![vec![None; self.replication_factor]; nb_partitions];
&old_zone_assignation, &zone_assignation, nb_zones+1); //+1 for no_zone //We will decrement part_per_nod to keep track of the number
//of partitions that we still have to associate.
let mut part_per_nod = part_per_nod.clone();
//We need to assign partitions to nodes in their zone //We minimize the distance to the former assignation(if any)
//We first put the nodes assignation that can stay the same
for i in 0..nb_partitions{
for j in 0..self.replication_factor {
if let Some(Some(former_node)) = old_node_assignation[i].iter().find(
|x| if let Some(id) = x {
zone_id[&node_zone[*id]] == zone_assignation[i][j]
}
else {false}
)
{
if part_per_nod[*former_node] > 0 {
node_assignation[i][j] = Some(*former_node);
part_per_nod[*former_node] -= 1;
}
}
}
}
//We get the id of the zones of the former assignation
//We complete the assignation of partitions to nodes //(and the id no_zone if there is no node assignated)
let mut rng = rand::thread_rng(); let no_zone = part_per_zone_vec.len();
for i in 0..nb_partitions { let old_zone_assignation: Vec<Vec<usize>> = old_node_assignation
for j in 0..self.replication_factor { .iter()
if node_assignation[i][j] == None { .map(|x| {
let possible_nodes : Vec<usize> = (0..nb_nodes) x.iter()
.filter( .map(|id| match *id {
|id| zone_id[&node_zone[*id]] == zone_assignation[i][j] Some(i) => zone_id[&node_zone[i]],
&& part_per_nod[*id] > 0).collect(); None => no_zone,
assert!(possible_nodes.len()>0); })
//We randomly pick a node .collect()
if let Some(nod) = possible_nodes.choose(&mut rng){ })
node_assignation[i][j] = Some(*nod); .collect();
part_per_nod[*nod] -= 1;
}
}
}
}
//We write the assignation in the 1D table //We minimize the distance to the former zone assignation
self.ring_assignation_data = Vec::<CompactNodeType>::new(); zone_assignation =
for i in 0..nb_partitions{ optimize_matching(&old_zone_assignation, &zone_assignation, nb_zones + 1); //+1 for no_zone
for j in 0..self.replication_factor {
if let Some(id) = node_assignation[i][j] {
self.ring_assignation_data.push(id as CompactNodeType);
}
else {assert!(false)}
}
}
true //We need to assign partitions to nodes in their zone
} //We first put the nodes assignation that can stay the same
else { false } for i in 0..nb_partitions {
} for j in 0..self.replication_factor {
if let Some(Some(former_node)) = old_node_assignation[i].iter().find(|x| {
if let Some(id) = x {
zone_id[&node_zone[*id]] == zone_assignation[i][j]
} else {
false
}
}) {
if part_per_nod[*former_node] > 0 {
node_assignation[i][j] = Some(*former_node);
part_per_nod[*former_node] -= 1;
}
}
}
}
/// The LwwMap of node roles might have changed. This function updates the node_id_vec //We complete the assignation of partitions to nodes
/// and returns the assignation given by ring, with the new indices of the nodes, and let mut rng = rand::thread_rng();
/// None of the node is not present anymore. for i in 0..nb_partitions {
/// We work with the assumption that only this function and calculate_new_assignation for j in 0..self.replication_factor {
/// do modify assignation_ring and node_id_vec. if node_assignation[i][j] == None {
fn update_nodes_and_ring(&mut self) -> Vec<Vec<Option<usize>>> { let possible_nodes: Vec<usize> = (0..nb_nodes)
let nb_partitions = 1usize<<PARTITION_BITS; .filter(|id| {
let mut node_assignation = zone_id[&node_zone[*id]] == zone_assignation[i][j]
vec![vec![None; self.replication_factor ];nb_partitions]; && part_per_nod[*id] > 0
let rf = self.replication_factor; })
let ring = &self.ring_assignation_data; .collect();
assert!(possible_nodes.len() > 0);
let new_node_id_vec : Vec::<Uuid> = self.roles.items().iter() //We randomly pick a node
.map(|(k, _, _)| *k) if let Some(nod) = possible_nodes.choose(&mut rng) {
.collect(); node_assignation[i][j] = Some(*nod);
part_per_nod[*nod] -= 1;
if ring.len() == rf*nb_partitions { }
for i in 0..nb_partitions { }
for j in 0..self.replication_factor { }
node_assignation[i][j] = new_node_id_vec.iter() }
.position(|id| *id == self.node_id_vec[ring[i*rf + j] as usize]);
}
}
}
self.node_id_vec = new_node_id_vec; //We write the assignation in the 1D table
self.ring_assignation_data = vec![]; self.ring_assignation_data = Vec::<CompactNodeType>::new();
return node_assignation; for i in 0..nb_partitions {
} for j in 0..self.replication_factor {
if let Some(id) = node_assignation[i][j] {
///This function compute the number of partition to assign to self.ring_assignation_data.push(id as CompactNodeType);
///every node and zone, so that every partition is replicated } else {
///self.replication_factor times and the capacity of a partition assert!(false)
///is maximized. }
fn optimal_proportions(&mut self) -> Option<(Vec<usize>, HashMap<String, usize>)> { }
}
let mut zone_capacity :HashMap<String, u32>= HashMap::new();
let (node_zone, node_capacity) = self.get_node_zone_capacity();
let nb_nodes = self.node_id_vec.len();
for i in 0..nb_nodes true
{ } else {
if zone_capacity.contains_key(&node_zone[i]) { false
zone_capacity.insert(node_zone[i].clone(), zone_capacity[&node_zone[i]] + node_capacity[i]); }
} }
else{
zone_capacity.insert(node_zone[i].clone(), node_capacity[i]);
}
}
//Compute the optimal number of partitions per zone /// The LwwMap of node roles might have changed. This function updates the node_id_vec
let sum_capacities: u32 =zone_capacity.values().sum(); /// and returns the assignation given by ring, with the new indices of the nodes, and
/// None of the node is not present anymore.
if sum_capacities <= 0 { /// We work with the assumption that only this function and calculate_new_assignation
println!("No storage capacity in the network."); /// do modify assignation_ring and node_id_vec.
return None; fn update_nodes_and_ring(&mut self) -> Vec<Vec<Option<usize>>> {
} let nb_partitions = 1usize << PARTITION_BITS;
let mut node_assignation = vec![vec![None; self.replication_factor]; nb_partitions];
let rf = self.replication_factor;
let ring = &self.ring_assignation_data;
let nb_partitions = 1<<PARTITION_BITS; let new_node_id_vec: Vec<Uuid> = self.roles.items().iter().map(|(k, _, _)| *k).collect();
//Initially we would like to use zones porportionally to
//their capacity.
//However, a large zone can be associated to at most
//nb_partitions to ensure replication of the date.
//So we take the min with nb_partitions:
let mut part_per_zone : HashMap<String, usize> =
zone_capacity.iter()
.map(|(k, v)| (k.clone(), min(nb_partitions,
(self.replication_factor*nb_partitions
**v as usize)/sum_capacities as usize) ) ).collect();
//The replication_factor-1 upper bounds the number of if ring.len() == rf * nb_partitions {
//part_per_zones that are greater than nb_partitions for i in 0..nb_partitions {
for _ in 1..self.replication_factor { for j in 0..self.replication_factor {
//The number of partitions that are not assignated to node_assignation[i][j] = new_node_id_vec
//a zone that takes nb_partitions. .iter()
let sum_capleft : u32 = zone_capacity.keys() .position(|id| *id == self.node_id_vec[ring[i * rf + j] as usize]);
.filter(| k | {part_per_zone[*k] < nb_partitions} ) }
.map(|k| zone_capacity[k]).sum(); }
}
//The number of replication of the data that we need
//to ensure.
let repl_left = self.replication_factor
- part_per_zone.values()
.filter(|x| {**x == nb_partitions})
.count();
if repl_left == 0 {
break;
}
for k in zone_capacity.keys() { self.node_id_vec = new_node_id_vec;
if part_per_zone[k] != nb_partitions self.ring_assignation_data = vec![];
{ return node_assignation;
part_per_zone.insert(k.to_string() , min(nb_partitions, }
(nb_partitions*zone_capacity[k] as usize
*repl_left)/sum_capleft as usize));
}
}
}
//Now we divide the zone's partition share proportionally ///This function compute the number of partition to assign to
//between their nodes. ///every node and zone, so that every partition is replicated
///self.replication_factor times and the capacity of a partition
let mut part_per_nod : Vec<usize> = (0..nb_nodes).map( ///is maximized.
|i| (part_per_zone[&node_zone[i]]*node_capacity[i] as usize)/zone_capacity[&node_zone[i]] as usize fn optimal_proportions(&mut self) -> Option<(Vec<usize>, HashMap<String, usize>)> {
) let mut zone_capacity: HashMap<String, u32> = HashMap::new();
.collect();
//We must update the part_per_zone to make it correspond to let (node_zone, node_capacity) = self.get_node_zone_capacity();
//part_per_nod (because of integer rounding) let nb_nodes = self.node_id_vec.len();
part_per_zone = part_per_zone.iter().map(|(k,_)|
(k.clone(), 0))
.collect();
for i in 0..nb_nodes {
part_per_zone.insert(
node_zone[i].clone() ,
part_per_zone[&node_zone[i]] + part_per_nod[i]);
}
//Because of integer rounding, the total sum of part_per_nod for i in 0..nb_nodes {
//might not be replication_factor*nb_partitions. if zone_capacity.contains_key(&node_zone[i]) {
// We need at most to add 1 to every non maximal value of zone_capacity.insert(
// part_per_nod. The capacity of a partition will be bounded node_zone[i].clone(),
// by the minimal value of zone_capacity[&node_zone[i]] + node_capacity[i],
// node_capacity_vec[i]/part_per_nod[i] );
// so we try to maximize this minimal value, keeping the } else {
// part_per_zone capped zone_capacity.insert(node_zone[i].clone(), node_capacity[i]);
}
}
let discrepancy : usize = //Compute the optimal number of partitions per zone
nb_partitions*self.replication_factor let sum_capacities: u32 = zone_capacity.values().sum();
- part_per_nod.iter().sum::<usize>();
//We use a stupid O(N^2) algorithm. If the number of nodes
//is actually expected to be high, one should optimize this.
for _ in 0..discrepancy { if sum_capacities <= 0 {
if let Some(idmax) = (0..nb_nodes) println!("No storage capacity in the network.");
.filter(|i| part_per_zone[&node_zone[*i]] < nb_partitions) return None;
.max_by( |i,j| }
(node_capacity[*i]*(part_per_nod[*j]+1) as u32)
.cmp(&(node_capacity[*j]*(part_per_nod[*i]+1) as u32))
)
{
part_per_nod[idmax] += 1;
part_per_zone.insert(node_zone[idmax].clone(),part_per_zone[&node_zone[idmax]]+1);
}
}
//We check the algorithm consistency let nb_partitions = 1 << PARTITION_BITS;
let discrepancy : usize =
nb_partitions*self.replication_factor
- part_per_nod.iter().sum::<usize>();
assert!(discrepancy == 0);
assert!(if let Some(v) = part_per_zone.values().max()
{*v <= nb_partitions} else {false} );
Some((part_per_nod, part_per_zone))
}
//Returns vectors of zone and capacity; indexed by the same (temporary)
//indices as node_id_vec.
fn get_node_zone_capacity(& self) -> (Vec<String> , Vec<u32>) {
let node_zone = self.node_id_vec.iter().map(
|id_nod| match self.node_role(id_nod) {
Some(NodeRole{zone,capacity:_,tags:_}) => zone.clone() ,
_ => "".to_string()
}
).collect();
let node_capacity = self.node_id_vec.iter().map(
|id_nod| match self.node_role(id_nod) {
Some(NodeRole{zone:_,capacity,tags:_}) =>
if let Some(c)=capacity
{*c}
else {0},
_ => 0
}
).collect();
(node_zone,node_capacity) //Initially we would like to use zones porportionally to
} //their capacity.
//However, a large zone can be associated to at most
//nb_partitions to ensure replication of the date.
//So we take the min with nb_partitions:
let mut part_per_zone: HashMap<String, usize> = zone_capacity
.iter()
.map(|(k, v)| {
(
k.clone(),
min(
nb_partitions,
(self.replication_factor * nb_partitions * *v as usize)
/ sum_capacities as usize,
),
)
})
.collect();
//The replication_factor-1 upper bounds the number of
//part_per_zones that are greater than nb_partitions
for _ in 1..self.replication_factor {
//The number of partitions that are not assignated to
//a zone that takes nb_partitions.
let sum_capleft: u32 = zone_capacity
.keys()
.filter(|k| part_per_zone[*k] < nb_partitions)
.map(|k| zone_capacity[k])
.sum();
//The number of replication of the data that we need
//to ensure.
let repl_left = self.replication_factor
- part_per_zone
.values()
.filter(|x| **x == nb_partitions)
.count();
if repl_left == 0 {
break;
}
for k in zone_capacity.keys() {
if part_per_zone[k] != nb_partitions {
part_per_zone.insert(
k.to_string(),
min(
nb_partitions,
(nb_partitions * zone_capacity[k] as usize * repl_left)
/ sum_capleft as usize,
),
);
}
}
}
//Now we divide the zone's partition share proportionally
//between their nodes.
let mut part_per_nod: Vec<usize> = (0..nb_nodes)
.map(|i| {
(part_per_zone[&node_zone[i]] * node_capacity[i] as usize)
/ zone_capacity[&node_zone[i]] as usize
})
.collect();
//We must update the part_per_zone to make it correspond to
//part_per_nod (because of integer rounding)
part_per_zone = part_per_zone.iter().map(|(k, _)| (k.clone(), 0)).collect();
for i in 0..nb_nodes {
part_per_zone.insert(
node_zone[i].clone(),
part_per_zone[&node_zone[i]] + part_per_nod[i],
);
}
//Because of integer rounding, the total sum of part_per_nod
//might not be replication_factor*nb_partitions.
// We need at most to add 1 to every non maximal value of
// part_per_nod. The capacity of a partition will be bounded
// by the minimal value of
// node_capacity_vec[i]/part_per_nod[i]
// so we try to maximize this minimal value, keeping the
// part_per_zone capped
let discrepancy: usize =
nb_partitions * self.replication_factor - part_per_nod.iter().sum::<usize>();
//We use a stupid O(N^2) algorithm. If the number of nodes
//is actually expected to be high, one should optimize this.
for _ in 0..discrepancy {
if let Some(idmax) = (0..nb_nodes)
.filter(|i| part_per_zone[&node_zone[*i]] < nb_partitions)
.max_by(|i, j| {
(node_capacity[*i] * (part_per_nod[*j] + 1) as u32)
.cmp(&(node_capacity[*j] * (part_per_nod[*i] + 1) as u32))
}) {
part_per_nod[idmax] += 1;
part_per_zone.insert(
node_zone[idmax].clone(),
part_per_zone[&node_zone[idmax]] + 1,
);
}
}
//We check the algorithm consistency
let discrepancy: usize =
nb_partitions * self.replication_factor - part_per_nod.iter().sum::<usize>();
assert!(discrepancy == 0);
assert!(if let Some(v) = part_per_zone.values().max() {
*v <= nb_partitions
} else {
false
});
Some((part_per_nod, part_per_zone))
}
//Returns vectors of zone and capacity; indexed by the same (temporary)
//indices as node_id_vec.
fn get_node_zone_capacity(&self) -> (Vec<String>, Vec<u32>) {
let node_zone = self
.node_id_vec
.iter()
.map(|id_nod| match self.node_role(id_nod) {
Some(NodeRole {
zone,
capacity: _,
tags: _,
}) => zone.clone(),
_ => "".to_string(),
})
.collect();
let node_capacity = self
.node_id_vec
.iter()
.map(|id_nod| match self.node_role(id_nod) {
Some(NodeRole {
zone: _,
capacity,
tags: _,
}) => {
if let Some(c) = capacity {
*c
} else {
0
}
}
_ => 0,
})
.collect();
(node_zone, node_capacity)
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use itertools::Itertools; use itertools::Itertools;
fn check_assignation(cl : &ClusterLayout) { fn check_assignation(cl: &ClusterLayout) {
//Check that input data has the right format
//Check that input data has the right format let nb_partitions = 1usize << PARTITION_BITS;
let nb_partitions = 1usize<<PARTITION_BITS; assert!([1, 2, 3].contains(&cl.replication_factor));
assert!([1,2,3].contains(&cl.replication_factor)); assert!(cl.ring_assignation_data.len() == nb_partitions * cl.replication_factor);
assert!(cl.ring_assignation_data.len() == nb_partitions*cl.replication_factor);
let (node_zone, node_capacity) = cl.get_node_zone_capacity();
//Check that is is a correct assignation with zone redundancy let (node_zone, node_capacity) = cl.get_node_zone_capacity();
let rf = cl.replication_factor;
for i in 0..nb_partitions{
assert!( rf ==
cl.ring_assignation_data[rf*i..rf*(i+1)].iter()
.map(|nod| node_zone[*nod as usize].clone())
.unique()
.count() );
}
let nb_nodes = cl.node_id_vec.len(); //Check that is is a correct assignation with zone redundancy
//Check optimality let rf = cl.replication_factor;
let node_nb_part =(0..nb_nodes).map(|i| cl.ring_assignation_data for i in 0..nb_partitions {
.iter() assert!(
.filter(|x| **x==i as u8) rf == cl.ring_assignation_data[rf * i..rf * (i + 1)]
.count()) .iter()
.collect::<Vec::<_>>(); .map(|nod| node_zone[*nod as usize].clone())
.unique()
.count()
);
}
let zone_vec = node_zone.iter().unique().collect::<Vec::<_>>(); let nb_nodes = cl.node_id_vec.len();
let zone_nb_part = zone_vec.iter().map( |z| cl.ring_assignation_data.iter() //Check optimality
.filter(|x| node_zone[**x as usize] == **z) let node_nb_part = (0..nb_nodes)
.count() .map(|i| {
).collect::<Vec::<_>>(); cl.ring_assignation_data
.iter()
//Check optimality of the zone assignation : would it be better for the .filter(|x| **x == i as u8)
//node_capacity/node_partitions ratio to change the assignation of a partition .count()
})
if let Some(idmin) = (0..nb_nodes).min_by( .collect::<Vec<_>>();
|i,j| (node_capacity[*i]*node_nb_part[*j] as u32)
.cmp(&(node_capacity[*j]*node_nb_part[*i] as u32))
){
if let Some(idnew) = (0..nb_nodes)
.filter( |i| if let Some(p) = zone_vec.iter().position(|z| **z==node_zone[*i])
{zone_nb_part[p] < nb_partitions }
else { false })
.max_by(
|i,j| (node_capacity[*i]*(node_nb_part[*j]as u32+1))
.cmp(&(node_capacity[*j]*(node_nb_part[*i] as u32+1)))
){
assert!(node_capacity[idmin]*(node_nb_part[idnew] as u32+1) >=
node_capacity[idnew]*node_nb_part[idmin] as u32);
}
} let zone_vec = node_zone.iter().unique().collect::<Vec<_>>();
let zone_nb_part = zone_vec
.iter()
.map(|z| {
cl.ring_assignation_data
.iter()
.filter(|x| node_zone[**x as usize] == **z)
.count()
})
.collect::<Vec<_>>();
//In every zone, check optimality of the nod assignation //Check optimality of the zone assignation : would it be better for the
for z in zone_vec { //node_capacity/node_partitions ratio to change the assignation of a partition
let node_of_z_iter = (0..nb_nodes).filter(|id| node_zone[*id] == *z );
if let Some(idmin) = node_of_z_iter.clone().min_by(
|i,j| (node_capacity[*i]*node_nb_part[*j] as u32)
.cmp(&(node_capacity[*j]*node_nb_part[*i] as u32))
){
if let Some(idnew) = node_of_z_iter.min_by(
|i,j| (node_capacity[*i]*(node_nb_part[*j] as u32+1))
.cmp(&(node_capacity[*j]*(node_nb_part[*i] as u32+1)))
){
assert!(node_capacity[idmin]*(node_nb_part[idnew] as u32+1) >=
node_capacity[idnew]*node_nb_part[idmin] as u32);
}
}
}
}
fn update_layout(cl : &mut ClusterLayout, node_id_vec : &Vec<u8>, if let Some(idmin) = (0..nb_nodes).min_by(|i, j| {
node_capacity_vec : &Vec<u32> , node_zone_vec : &Vec<String>) { (node_capacity[*i] * node_nb_part[*j] as u32)
for i in 0..node_id_vec.len(){ .cmp(&(node_capacity[*j] * node_nb_part[*i] as u32))
if let Some(x) = FixedBytes32::try_from(&[i as u8;32]) { }) {
cl.node_id_vec.push(x); if let Some(idnew) = (0..nb_nodes)
} .filter(|i| {
if let Some(p) = zone_vec.iter().position(|z| **z == node_zone[*i]) {
let update = cl.roles.update_mutator(cl.node_id_vec[i] , zone_nb_part[p] < nb_partitions
NodeRoleV(Some(NodeRole{ } else {
zone : (node_zone_vec[i].to_string()), false
capacity : (Some(node_capacity_vec[i])), }
tags : (vec![])}))); })
cl.roles.merge(&update); .max_by(|i, j| {
} (node_capacity[*i] * (node_nb_part[*j] as u32 + 1))
} .cmp(&(node_capacity[*j] * (node_nb_part[*i] as u32 + 1)))
}) {
assert!(
node_capacity[idmin] * (node_nb_part[idnew] as u32 + 1)
>= node_capacity[idnew] * node_nb_part[idmin] as u32
);
}
}
#[test] //In every zone, check optimality of the nod assignation
fn test_assignation() { for z in zone_vec {
let node_of_z_iter = (0..nb_nodes).filter(|id| node_zone[*id] == *z);
if let Some(idmin) = node_of_z_iter.clone().min_by(|i, j| {
(node_capacity[*i] * node_nb_part[*j] as u32)
.cmp(&(node_capacity[*j] * node_nb_part[*i] as u32))
}) {
if let Some(idnew) = node_of_z_iter.min_by(|i, j| {
(node_capacity[*i] * (node_nb_part[*j] as u32 + 1))
.cmp(&(node_capacity[*j] * (node_nb_part[*i] as u32 + 1)))
}) {
assert!(
node_capacity[idmin] * (node_nb_part[idnew] as u32 + 1)
>= node_capacity[idnew] * node_nb_part[idmin] as u32
);
}
}
}
}
let mut node_id_vec = vec![1,2,3]; fn update_layout(
let mut node_capacity_vec = vec![4000,1000,2000]; cl: &mut ClusterLayout,
let mut node_zone_vec= vec!["A", "B", "C"].into_iter().map(|x| x.to_string()).collect(); node_id_vec: &Vec<u8>,
node_capacity_vec: &Vec<u32>,
let mut cl = ClusterLayout { node_zone_vec: &Vec<String>,
node_id_vec: vec![], ) {
for i in 0..node_id_vec.len() {
roles : LwwMap::new(), if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) {
cl.node_id_vec.push(x);
}
replication_factor: 3, let update = cl.roles.update_mutator(
ring_assignation_data : vec![], cl.node_id_vec[i],
version:0, NodeRoleV(Some(NodeRole {
staging: LwwMap::new(), zone: (node_zone_vec[i].to_string()),
staging_hash: sha256sum(&[1;32]), capacity: (Some(node_capacity_vec[i])),
}; tags: (vec![]),
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); })),
cl.calculate_partition_assignation(); );
check_assignation(&cl); cl.roles.merge(&update);
}
}
node_id_vec = vec![1,2,3, 4, 5, 6, 7, 8, 9]; #[test]
node_capacity_vec = vec![4000,1000,1000, 3000, 1000, 1000, 2000, 10000, 2000]; fn test_assignation() {
node_zone_vec= vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"].into_iter().map(|x| x.to_string()).collect(); let mut node_id_vec = vec![1, 2, 3];
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); let mut node_capacity_vec = vec![4000, 1000, 2000];
cl.calculate_partition_assignation(); let mut node_zone_vec = vec!["A", "B", "C"]
check_assignation(&cl); .into_iter()
.map(|x| x.to_string())
.collect();
node_capacity_vec = vec![4000,1000,2000, 7000, 1000, 1000, 2000, 10000, 2000]; let mut cl = ClusterLayout {
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); node_id_vec: vec![],
cl.calculate_partition_assignation();
check_assignation(&cl);
roles: LwwMap::new(),
node_capacity_vec = vec![4000,4000,2000, 7000, 1000, 9000, 2000, 10, 2000]; replication_factor: 3,
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec); ring_assignation_data: vec![],
cl.calculate_partition_assignation(); version: 0,
check_assignation(&cl); staging: LwwMap::new(),
staging_hash: sha256sum(&[1; 32]),
} };
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
cl.calculate_partition_assignation();
check_assignation(&cl);
node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000];
node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"]
.into_iter()
.map(|x| x.to_string())
.collect();
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
cl.calculate_partition_assignation();
check_assignation(&cl);
node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000];
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
cl.calculate_partition_assignation();
check_assignation(&cl);
node_capacity_vec = vec![4000, 4000, 2000, 7000, 1000, 9000, 2000, 10, 2000];
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec);
cl.calculate_partition_assignation();
check_assignation(&cl);
}
} }

View file

@ -1,378 +1,376 @@
/* /*
* This module deals with graph algorithm in complete bipartite * This module deals with graph algorithm in complete bipartite
* graphs. It is used in layout.rs to build the partition to node * graphs. It is used in layout.rs to build the partition to node
* assignation. * assignation.
* */ * */
use std::cmp::{min,max};
use std::collections::VecDeque;
use rand::prelude::SliceRandom; use rand::prelude::SliceRandom;
use std::cmp::{max, min};
use std::collections::VecDeque;
//Graph data structure for the flow algorithm. //Graph data structure for the flow algorithm.
#[derive(Clone,Copy,Debug)] #[derive(Clone, Copy, Debug)]
struct EdgeFlow{ struct EdgeFlow {
c : i32, c: i32,
flow : i32, flow: i32,
v : usize, v: usize,
rev : usize, rev: usize,
} }
//Graph data structure for the detection of positive cycles. //Graph data structure for the detection of positive cycles.
#[derive(Clone,Copy,Debug)] #[derive(Clone, Copy, Debug)]
struct WeightedEdge{ struct WeightedEdge {
w : i32, w: i32,
u : usize, u: usize,
v : usize, v: usize,
} }
/* This function takes two matchings (old_match and new_match) in a
/* This function takes two matchings (old_match and new_match) in a * complete bipartite graph. It returns a matching that has the
* complete bipartite graph. It returns a matching that has the
* same degree as new_match at every vertex, and that is as close * same degree as new_match at every vertex, and that is as close
* as possible to old_match. * as possible to old_match.
* */ * */
pub fn optimize_matching( old_match : &Vec<Vec<usize>> , pub fn optimize_matching(
new_match : &Vec<Vec<usize>> , old_match: &Vec<Vec<usize>>,
nb_right : usize ) new_match: &Vec<Vec<usize>>,
-> Vec<Vec<usize>> { nb_right: usize,
let nb_left = old_match.len(); ) -> Vec<Vec<usize>> {
let ed = WeightedEdge{w:-1,u:0,v:0}; let nb_left = old_match.len();
let mut edge_vec = vec![ed ; nb_left*nb_right]; let ed = WeightedEdge { w: -1, u: 0, v: 0 };
let mut edge_vec = vec![ed; nb_left * nb_right];
//We build the complete bipartite graph structure, represented
//by the list of all edges.
for i in 0..nb_left {
for j in 0..nb_right{
edge_vec[i*nb_right + j].u = i;
edge_vec[i*nb_right + j].v = nb_left+j;
}
}
for i in 0..edge_vec.len() { //We build the complete bipartite graph structure, represented
//We add the old matchings //by the list of all edges.
if old_match[edge_vec[i].u].contains(&(edge_vec[i].v-nb_left)) { for i in 0..nb_left {
edge_vec[i].w *= -1; for j in 0..nb_right {
} edge_vec[i * nb_right + j].u = i;
//We add the new matchings edge_vec[i * nb_right + j].v = nb_left + j;
if new_match[edge_vec[i].u].contains(&(edge_vec[i].v-nb_left)) { }
(edge_vec[i].u,edge_vec[i].v) = }
(edge_vec[i].v,edge_vec[i].u);
edge_vec[i].w *= -1; for i in 0..edge_vec.len() {
} //We add the old matchings
} if old_match[edge_vec[i].u].contains(&(edge_vec[i].v - nb_left)) {
//Now edge_vec is a graph where edges are oriented LR if we edge_vec[i].w *= -1;
//can add them to new_match, and RL otherwise. If }
//adding/removing them makes the matching closer to old_match //We add the new matchings
//they have weight 1; and -1 otherwise. if new_match[edge_vec[i].u].contains(&(edge_vec[i].v - nb_left)) {
(edge_vec[i].u, edge_vec[i].v) = (edge_vec[i].v, edge_vec[i].u);
//We shuffle the edge list so that there is no bias depending in edge_vec[i].w *= -1;
//partitions/zone label in the triplet dispersion }
let mut rng = rand::thread_rng(); }
edge_vec.shuffle(&mut rng); //Now edge_vec is a graph where edges are oriented LR if we
//can add them to new_match, and RL otherwise. If
//Discovering and flipping a cycle with positive weight in this //adding/removing them makes the matching closer to old_match
//graph will make the matching closer to old_match. //they have weight 1; and -1 otherwise.
//We use Bellman Ford algorithm to discover positive cycles
loop{ //We shuffle the edge list so that there is no bias depending in
if let Some(cycle) = positive_cycle(&edge_vec, nb_left, nb_right) { //partitions/zone label in the triplet dispersion
for i in cycle { let mut rng = rand::thread_rng();
//We flip the edges of the cycle. edge_vec.shuffle(&mut rng);
(edge_vec[i].u,edge_vec[i].v) =
(edge_vec[i].v,edge_vec[i].u); //Discovering and flipping a cycle with positive weight in this
edge_vec[i].w *= -1; //graph will make the matching closer to old_match.
} //We use Bellman Ford algorithm to discover positive cycles
} loop {
else { if let Some(cycle) = positive_cycle(&edge_vec, nb_left, nb_right) {
//If there is no cycle, we return the optimal matching. for i in cycle {
break; //We flip the edges of the cycle.
} (edge_vec[i].u, edge_vec[i].v) = (edge_vec[i].v, edge_vec[i].u);
} edge_vec[i].w *= -1;
}
//The optimal matching is build from the graph structure. } else {
let mut matching = vec![Vec::<usize>::new() ; nb_left]; //If there is no cycle, we return the optimal matching.
for e in edge_vec { break;
if e.u > e.v { }
matching[e.v].push(e.u-nb_left); }
}
} //The optimal matching is build from the graph structure.
matching let mut matching = vec![Vec::<usize>::new(); nb_left];
for e in edge_vec {
if e.u > e.v {
matching[e.v].push(e.u - nb_left);
}
}
matching
} }
//This function finds a positive cycle in a bipartite wieghted graph. //This function finds a positive cycle in a bipartite wieghted graph.
fn positive_cycle( edge_vec : &Vec<WeightedEdge>, nb_left : usize, fn positive_cycle(
nb_right : usize) -> Option<Vec<usize>> { edge_vec: &Vec<WeightedEdge>,
let nb_side_min = min(nb_left, nb_right); nb_left: usize,
let nb_vertices = nb_left+nb_right; nb_right: usize,
let weight_lowerbound = -((nb_left +nb_right) as i32) -1; ) -> Option<Vec<usize>> {
let mut accessed = vec![false ; nb_left]; let nb_side_min = min(nb_left, nb_right);
let nb_vertices = nb_left + nb_right;
//We try to find a positive cycle accessible from the left let weight_lowerbound = -((nb_left + nb_right) as i32) - 1;
//vertex i. let mut accessed = vec![false; nb_left];
for i in 0..nb_left{
if accessed[i] {
continue;
}
let mut weight =vec![weight_lowerbound ; nb_vertices];
let mut prev =vec![ edge_vec.len() ; nb_vertices];
weight[i] = 0;
//We compute largest weighted paths from i.
//Since the graph is bipartite, any simple cycle has length
//at most 2*nb_side_min. In the general Bellman-Ford
//algorithm, the bound here is the number of vertices. Since
//the number of partitions can be much larger than the
//number of nodes, we optimize that.
for _ in 0..(2*nb_side_min) {
for j in 0..edge_vec.len() {
let e = edge_vec[j];
if weight[e.v] < weight[e.u]+e.w {
weight[e.v] = weight[e.u]+e.w;
prev[e.v] = j;
}
}
}
//We update the accessed table
for i in 0..nb_left {
if weight[i] > weight_lowerbound {
accessed[i] = true;
}
}
//We detect positive cycle
for e in edge_vec {
if weight[e.v] < weight[e.u]+e.w {
//it means e is on a path branching from a positive cycle
let mut was_seen = vec![false ; nb_vertices];
let mut curr = e.u;
//We track back with prev until we reach the cycle.
while !was_seen[curr]{
was_seen[curr] = true;
curr = edge_vec[prev[curr]].u;
}
//Now curr is on the cycle. We collect the edges ids.
let mut cycle = Vec::<usize>::new();
cycle.push(prev[curr]);
let mut cycle_vert = edge_vec[prev[curr]].u;
while cycle_vert != curr {
cycle.push(prev[cycle_vert]);
cycle_vert = edge_vec[prev[cycle_vert]].u;
}
return Some(cycle); //We try to find a positive cycle accessible from the left
} //vertex i.
} for i in 0..nb_left {
} if accessed[i] {
continue;
}
let mut weight = vec![weight_lowerbound; nb_vertices];
let mut prev = vec![edge_vec.len(); nb_vertices];
weight[i] = 0;
//We compute largest weighted paths from i.
//Since the graph is bipartite, any simple cycle has length
//at most 2*nb_side_min. In the general Bellman-Ford
//algorithm, the bound here is the number of vertices. Since
//the number of partitions can be much larger than the
//number of nodes, we optimize that.
for _ in 0..(2 * nb_side_min) {
for j in 0..edge_vec.len() {
let e = edge_vec[j];
if weight[e.v] < weight[e.u] + e.w {
weight[e.v] = weight[e.u] + e.w;
prev[e.v] = j;
}
}
}
//We update the accessed table
for i in 0..nb_left {
if weight[i] > weight_lowerbound {
accessed[i] = true;
}
}
//We detect positive cycle
for e in edge_vec {
if weight[e.v] < weight[e.u] + e.w {
//it means e is on a path branching from a positive cycle
let mut was_seen = vec![false; nb_vertices];
let mut curr = e.u;
//We track back with prev until we reach the cycle.
while !was_seen[curr] {
was_seen[curr] = true;
curr = edge_vec[prev[curr]].u;
}
//Now curr is on the cycle. We collect the edges ids.
let mut cycle = Vec::<usize>::new();
cycle.push(prev[curr]);
let mut cycle_vert = edge_vec[prev[curr]].u;
while cycle_vert != curr {
cycle.push(prev[cycle_vert]);
cycle_vert = edge_vec[prev[cycle_vert]].u;
}
None return Some(cycle);
}
}
}
None
} }
// This function takes two arrays of capacity and computes the
// This function takes two arrays of capacity and computes the // maximal matching in the complete bipartite graph such that the
// maximal matching in the complete bipartite graph such that the
// left vertex i is matched to left_cap_vec[i] right vertices, and // left vertex i is matched to left_cap_vec[i] right vertices, and
// the right vertex j is matched to right_cap_vec[j] left vertices. // the right vertex j is matched to right_cap_vec[j] left vertices.
// To do so, we use Dinic's maximum flow algorithm. // To do so, we use Dinic's maximum flow algorithm.
pub fn dinic_compute_matching( left_cap_vec : Vec<u32>, pub fn dinic_compute_matching(left_cap_vec: Vec<u32>, right_cap_vec: Vec<u32>) -> Vec<Vec<usize>> {
right_cap_vec : Vec<u32>) -> Vec< Vec<usize> > let mut graph = Vec::<Vec<EdgeFlow>>::new();
{ let ed = EdgeFlow {
let mut graph = Vec::<Vec::<EdgeFlow> >::new(); c: 0,
let ed = EdgeFlow{c:0,flow:0,v:0, rev:0}; flow: 0,
v: 0,
rev: 0,
};
// 0 will be the source // 0 will be the source
graph.push(vec![ed ; left_cap_vec.len()]); graph.push(vec![ed; left_cap_vec.len()]);
for i in 0..left_cap_vec.len() for i in 0..left_cap_vec.len() {
{ graph[0][i].c = left_cap_vec[i] as i32;
graph[0][i].c = left_cap_vec[i] as i32; graph[0][i].v = i + 2;
graph[0][i].v = i+2; graph[0][i].rev = 0;
graph[0][i].rev = 0; }
}
//1 will be the sink //1 will be the sink
graph.push(vec![ed ; right_cap_vec.len()]); graph.push(vec![ed; right_cap_vec.len()]);
for i in 0..right_cap_vec.len() for i in 0..right_cap_vec.len() {
{ graph[1][i].c = right_cap_vec[i] as i32;
graph[1][i].c = right_cap_vec[i] as i32; graph[1][i].v = i + 2 + left_cap_vec.len();
graph[1][i].v = i+2+left_cap_vec.len(); graph[1][i].rev = 0;
graph[1][i].rev = 0; }
}
//we add left vertices
for i in 0..left_cap_vec.len() {
graph.push(vec![ed ; 1+right_cap_vec.len()]);
graph[i+2][0].c = 0; //directed
graph[i+2][0].v = 0;
graph[i+2][0].rev = i;
for j in 0..right_cap_vec.len() { //we add left vertices
graph[i+2][j+1].c = 1; for i in 0..left_cap_vec.len() {
graph[i+2][j+1].v = 2+left_cap_vec.len()+j; graph.push(vec![ed; 1 + right_cap_vec.len()]);
graph[i+2][j+1].rev = i+1; graph[i + 2][0].c = 0; //directed
} graph[i + 2][0].v = 0;
} graph[i + 2][0].rev = i;
//we add right vertices for j in 0..right_cap_vec.len() {
for i in 0..right_cap_vec.len() { graph[i + 2][j + 1].c = 1;
let lft_ln = left_cap_vec.len(); graph[i + 2][j + 1].v = 2 + left_cap_vec.len() + j;
graph.push(vec![ed ; 1+lft_ln]); graph[i + 2][j + 1].rev = i + 1;
graph[i+lft_ln+2][0].c = graph[1][i].c; }
graph[i+lft_ln+2][0].v = 1; }
graph[i+lft_ln+2][0].rev = i;
for j in 0..left_cap_vec.len() { //we add right vertices
graph[i+2+lft_ln][j+1].c = 0; //directed for i in 0..right_cap_vec.len() {
graph[i+2+lft_ln][j+1].v = j+2; let lft_ln = left_cap_vec.len();
graph[i+2+lft_ln][j+1].rev = i+1; graph.push(vec![ed; 1 + lft_ln]);
} graph[i + lft_ln + 2][0].c = graph[1][i].c;
} graph[i + lft_ln + 2][0].v = 1;
graph[i + lft_ln + 2][0].rev = i;
//To ensure the dispersion of the triplets generated by the for j in 0..left_cap_vec.len() {
//assignation, we shuffle the neighbours of the nodes. Hence, graph[i + 2 + lft_ln][j + 1].c = 0; //directed
//left vertices do not consider the right ones in the same order. graph[i + 2 + lft_ln][j + 1].v = j + 2;
let mut rng = rand::thread_rng(); graph[i + 2 + lft_ln][j + 1].rev = i + 1;
for i in 0..graph.len() { }
graph[i].shuffle(&mut rng); }
//We need to update the ids of the reverse edges.
for j in 0..graph[i].len() {
let target_v = graph[i][j].v;
let target_rev = graph[i][j].rev;
graph[target_v][target_rev].rev = j;
}
}
let nb_vertices = graph.len(); //To ensure the dispersion of the triplets generated by the
//assignation, we shuffle the neighbours of the nodes. Hence,
//We run Dinic's max flow algorithm //left vertices do not consider the right ones in the same order.
loop{ let mut rng = rand::thread_rng();
//We build the level array from Dinic's algorithm. for i in 0..graph.len() {
let mut level = vec![-1; nb_vertices]; graph[i].shuffle(&mut rng);
//We need to update the ids of the reverse edges.
for j in 0..graph[i].len() {
let target_v = graph[i][j].v;
let target_rev = graph[i][j].rev;
graph[target_v][target_rev].rev = j;
}
}
let mut fifo = VecDeque::new(); let nb_vertices = graph.len();
fifo.push_back((0,0));
while !fifo.is_empty() {
if let Some((id,lvl)) = fifo.pop_front(){
if level[id] == -1 {
level[id] = lvl;
for e in graph[id].iter(){
if e.c-e.flow > 0{
fifo.push_back((e.v,lvl+1));
}
}
}
}
}
if level[1] == -1 {
//There is no residual flow
break;
}
//Now we run DFS respecting the level array //We run Dinic's max flow algorithm
let mut next_nbd = vec![0; nb_vertices]; loop {
let mut lifo = VecDeque::new(); //We build the level array from Dinic's algorithm.
let mut level = vec![-1; nb_vertices];
let flow_upper_bound;
if let Some(x) = left_cap_vec.iter().max() {
flow_upper_bound=*x as i32;
}
else {
flow_upper_bound = 0;
assert!(false);
}
lifo.push_back((0,flow_upper_bound));
loop
{
if let Some((id_tmp, f_tmp)) = lifo.back() {
let id = *id_tmp;
let f = *f_tmp;
if id == 1 {
//The DFS reached the sink, we can add a
//residual flow.
lifo.pop_back();
while !lifo.is_empty() {
if let Some((id,_)) = lifo.pop_back(){
let nbd=next_nbd[id];
graph[id][nbd].flow += f;
let id_v = graph[id][nbd].v;
let nbd_v = graph[id][nbd].rev;
graph[id_v][nbd_v].flow -= f;
}
}
lifo.push_back((0,flow_upper_bound));
continue;
}
//else we did not reach the sink
let nbd = next_nbd[id];
if nbd >= graph[id].len() {
//There is nothing to explore from id anymore
lifo.pop_back();
if let Some((parent, _)) = lifo.back(){
next_nbd[*parent] +=1;
}
continue;
}
//else we can try to send flow from id to its nbd
let new_flow = min(f,graph[id][nbd].c
- graph[id][nbd].flow);
if level[graph[id][nbd].v] <= level[id] ||
new_flow == 0 {
//We cannot send flow to nbd.
next_nbd[id] += 1;
continue;
}
//otherwise, we send flow to nbd.
lifo.push_back((graph[id][nbd].v, new_flow));
}
else {
break;
}
}
}
//We return the association
let assoc_table = (0..left_cap_vec.len()).map(
|id| graph[id+2].iter()
.filter(|e| e.flow > 0)
.map( |e| e.v-2-left_cap_vec.len())
.collect()).collect();
//consistency check let mut fifo = VecDeque::new();
fifo.push_back((0, 0));
//it is a flow while !fifo.is_empty() {
for i in 3..graph.len(){ if let Some((id, lvl)) = fifo.pop_front() {
assert!( graph[i].iter().map(|e| e.flow).sum::<i32>() == 0); if level[id] == -1 {
for e in graph[i].iter(){ level[id] = lvl;
assert!(e.flow + graph[e.v][e.rev].flow == 0); for e in graph[id].iter() {
} if e.c - e.flow > 0 {
} fifo.push_back((e.v, lvl + 1));
}
//it solves the matching problem }
for i in 0..left_cap_vec.len(){ }
assert!(left_cap_vec[i] as i32 == }
graph[i+2].iter().map(|e| max(0,e.flow)).sum::<i32>()); }
} if level[1] == -1 {
for i in 0..right_cap_vec.len(){ //There is no residual flow
assert!(right_cap_vec[i] as i32 == break;
graph[i+2+left_cap_vec.len()].iter() }
.map(|e| max(0,e.flow)).sum::<i32>());
}
//Now we run DFS respecting the level array
let mut next_nbd = vec![0; nb_vertices];
let mut lifo = VecDeque::new();
assoc_table let flow_upper_bound;
if let Some(x) = left_cap_vec.iter().max() {
flow_upper_bound = *x as i32;
} else {
flow_upper_bound = 0;
assert!(false);
}
lifo.push_back((0, flow_upper_bound));
loop {
if let Some((id_tmp, f_tmp)) = lifo.back() {
let id = *id_tmp;
let f = *f_tmp;
if id == 1 {
//The DFS reached the sink, we can add a
//residual flow.
lifo.pop_back();
while !lifo.is_empty() {
if let Some((id, _)) = lifo.pop_back() {
let nbd = next_nbd[id];
graph[id][nbd].flow += f;
let id_v = graph[id][nbd].v;
let nbd_v = graph[id][nbd].rev;
graph[id_v][nbd_v].flow -= f;
}
}
lifo.push_back((0, flow_upper_bound));
continue;
}
//else we did not reach the sink
let nbd = next_nbd[id];
if nbd >= graph[id].len() {
//There is nothing to explore from id anymore
lifo.pop_back();
if let Some((parent, _)) = lifo.back() {
next_nbd[*parent] += 1;
}
continue;
}
//else we can try to send flow from id to its nbd
let new_flow = min(f, graph[id][nbd].c - graph[id][nbd].flow);
if level[graph[id][nbd].v] <= level[id] || new_flow == 0 {
//We cannot send flow to nbd.
next_nbd[id] += 1;
continue;
}
//otherwise, we send flow to nbd.
lifo.push_back((graph[id][nbd].v, new_flow));
} else {
break;
}
}
}
//We return the association
let assoc_table = (0..left_cap_vec.len())
.map(|id| {
graph[id + 2]
.iter()
.filter(|e| e.flow > 0)
.map(|e| e.v - 2 - left_cap_vec.len())
.collect()
})
.collect();
//consistency check
//it is a flow
for i in 3..graph.len() {
assert!(graph[i].iter().map(|e| e.flow).sum::<i32>() == 0);
for e in graph[i].iter() {
assert!(e.flow + graph[e.v][e.rev].flow == 0);
}
}
//it solves the matching problem
for i in 0..left_cap_vec.len() {
assert!(left_cap_vec[i] as i32 == graph[i + 2].iter().map(|e| max(0, e.flow)).sum::<i32>());
}
for i in 0..right_cap_vec.len() {
assert!(
right_cap_vec[i] as i32
== graph[i + 2 + left_cap_vec.len()]
.iter()
.map(|e| max(0, e.flow))
.sum::<i32>()
);
}
assoc_table
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
#[test]
fn test_flow() {
let left_vec = vec![3;8];
let right_vec = vec![0,4,8,4,8];
//There are asserts in the function that computes the flow
let _ = dinic_compute_matching(left_vec, right_vec);
}
//maybe add tests relative to the matching optilization ? #[test]
fn test_flow() {
let left_vec = vec![3; 8];
let right_vec = vec![0, 4, 8, 4, 8];
//There are asserts in the function that computes the flow
let _ = dinic_compute_matching(left_vec, right_vec);
}
//maybe add tests relative to the matching optilization ?
} }