layout version: refactor get_node_zone
This commit is contained in:
parent
7f2541101f
commit
063294dd56
2 changed files with 25 additions and 26 deletions
|
@ -34,8 +34,8 @@ fn check_against_naive(cl: &LayoutVersion) -> Result<bool, Error> {
|
||||||
zone_token.insert(z.clone(), 0);
|
zone_token.insert(z.clone(), 0);
|
||||||
}
|
}
|
||||||
for uuid in cl.nongateway_nodes() {
|
for uuid in cl.nongateway_nodes() {
|
||||||
let z = cl.get_node_zone(&uuid)?;
|
let z = cl.expect_get_node_zone(&uuid);
|
||||||
let c = cl.get_node_capacity(&uuid).unwrap();
|
let c = cl.expect_get_node_capacity(&uuid);
|
||||||
zone_token.insert(
|
zone_token.insert(
|
||||||
z.to_string(),
|
z.to_string(),
|
||||||
zone_token[z] + min(NB_PARTITIONS, (c / over_size) as usize),
|
zone_token[z] + min(NB_PARTITIONS, (c / over_size) as usize),
|
||||||
|
|
|
@ -70,6 +70,14 @@ impl LayoutVersion {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Given a node uuids, this function returns the label of its zone if it has one
|
||||||
|
pub fn get_node_zone(&self, uuid: &Uuid) -> Option<&str> {
|
||||||
|
match self.node_role(uuid) {
|
||||||
|
Some(role) => Some(&role.zone),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the number of partitions associated to this node in the ring
|
/// Returns the number of partitions associated to this node in the ring
|
||||||
pub fn get_node_usage(&self, uuid: &Uuid) -> Result<usize, Error> {
|
pub fn get_node_usage(&self, uuid: &Uuid) -> Result<usize, Error> {
|
||||||
for (i, id) in self.node_id_vec.iter().enumerate() {
|
for (i, id) in self.node_id_vec.iter().enumerate() {
|
||||||
|
@ -129,28 +137,22 @@ impl LayoutVersion {
|
||||||
|
|
||||||
// ===================== internal information extractors ======================
|
// ===================== internal information extractors ======================
|
||||||
|
|
||||||
/// Given a node uuids, this function returns the label of its zone
|
pub(crate) fn expect_get_node_capacity(&self, uuid: &Uuid) -> u64 {
|
||||||
pub(crate) fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> {
|
|
||||||
match self.node_role(uuid) {
|
|
||||||
Some(role) => Ok(&role.zone),
|
|
||||||
_ => Err(Error::Message(
|
|
||||||
"The Uuid does not correspond to a node present in the cluster.".into(),
|
|
||||||
)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn expect_get_node_capacity(&self, uuid: &Uuid) -> u64 {
|
|
||||||
self.get_node_capacity(&uuid)
|
self.get_node_capacity(&uuid)
|
||||||
.expect("non-gateway node with zero capacity")
|
.expect("non-gateway node with zero capacity")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn expect_get_node_zone(&self, uuid: &Uuid) -> &str {
|
||||||
|
self.get_node_zone(&uuid).expect("node without a zone")
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the sum of capacities of non gateway nodes in the cluster
|
/// Returns the sum of capacities of non gateway nodes in the cluster
|
||||||
fn get_total_capacity(&self) -> Result<u64, Error> {
|
fn get_total_capacity(&self) -> u64 {
|
||||||
let mut total_capacity = 0;
|
let mut total_capacity = 0;
|
||||||
for uuid in self.nongateway_nodes() {
|
for uuid in self.nongateway_nodes() {
|
||||||
total_capacity += self.expect_get_node_capacity(&uuid);
|
total_capacity += self.expect_get_node_capacity(&uuid);
|
||||||
}
|
}
|
||||||
Ok(total_capacity)
|
total_capacity
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the effective value of the zone_redundancy parameter
|
/// Returns the effective value of the zone_redundancy parameter
|
||||||
|
@ -227,10 +229,7 @@ impl LayoutVersion {
|
||||||
// Check that every partition is spread over at least zone_redundancy zones.
|
// Check that every partition is spread over at least zone_redundancy zones.
|
||||||
let zones_of_p = nodes_of_p
|
let zones_of_p = nodes_of_p
|
||||||
.iter()
|
.iter()
|
||||||
.map(|n| {
|
.map(|n| self.expect_get_node_zone(&self.node_id_vec[*n as usize]))
|
||||||
self.get_node_zone(&self.node_id_vec[*n as usize])
|
|
||||||
.expect("Zone not found.")
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
if zones_of_p.iter().unique().count() < zone_redundancy {
|
if zones_of_p.iter().unique().count() < zone_redundancy {
|
||||||
return Err(format!(
|
return Err(format!(
|
||||||
|
@ -516,7 +515,7 @@ impl LayoutVersion {
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut s_down = 1;
|
let mut s_down = 1;
|
||||||
let mut s_up = self.get_total_capacity()?;
|
let mut s_up = self.get_total_capacity();
|
||||||
while s_down + 1 < s_up {
|
while s_down + 1 < s_up {
|
||||||
g = self.generate_flow_graph(
|
g = self.generate_flow_graph(
|
||||||
(s_down + s_up) / 2,
|
(s_down + s_up) / 2,
|
||||||
|
@ -586,7 +585,7 @@ impl LayoutVersion {
|
||||||
}
|
}
|
||||||
for n in 0..self.nongateway_nodes().len() {
|
for n in 0..self.nongateway_nodes().len() {
|
||||||
let node_capacity = self.expect_get_node_capacity(&self.node_id_vec[n]);
|
let node_capacity = self.expect_get_node_capacity(&self.node_id_vec[n]);
|
||||||
let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?];
|
let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[n])];
|
||||||
g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
|
g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
|
||||||
for p in 0..NB_PARTITIONS {
|
for p in 0..NB_PARTITIONS {
|
||||||
if !exclude_assoc.contains(&(p, n)) {
|
if !exclude_assoc.contains(&(p, n)) {
|
||||||
|
@ -632,7 +631,7 @@ impl LayoutVersion {
|
||||||
// The algorithm is such that it will start with the flow that we just computed
|
// The algorithm is such that it will start with the flow that we just computed
|
||||||
// and find ameliorating paths from that.
|
// and find ameliorating paths from that.
|
||||||
for (p, n) in exclude_edge.iter() {
|
for (p, n) in exclude_edge.iter() {
|
||||||
let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?];
|
let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])];
|
||||||
g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
|
g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
|
||||||
}
|
}
|
||||||
g.compute_maximal_flow()?;
|
g.compute_maximal_flow()?;
|
||||||
|
@ -652,7 +651,7 @@ impl LayoutVersion {
|
||||||
let mut cost = CostFunction::new();
|
let mut cost = CostFunction::new();
|
||||||
for (p, assoc_p) in prev_assign.iter().enumerate() {
|
for (p, assoc_p) in prev_assign.iter().enumerate() {
|
||||||
for n in assoc_p.iter() {
|
for n in assoc_p.iter() {
|
||||||
let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?];
|
let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])];
|
||||||
cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1);
|
cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -707,7 +706,7 @@ impl LayoutVersion {
|
||||||
let mut msg = Message::new();
|
let mut msg = Message::new();
|
||||||
|
|
||||||
let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64;
|
let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64;
|
||||||
let total_cap = self.get_total_capacity()?;
|
let total_cap = self.get_total_capacity();
|
||||||
let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32);
|
let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32);
|
||||||
msg.push(format!(
|
msg.push(format!(
|
||||||
"Usable capacity / total cluster capacity: {} / {} ({:.1} %)",
|
"Usable capacity / total cluster capacity: {} / {} ({:.1} %)",
|
||||||
|
@ -754,7 +753,7 @@ impl LayoutVersion {
|
||||||
let mut old_zones_of_p = Vec::<usize>::new();
|
let mut old_zones_of_p = Vec::<usize>::new();
|
||||||
for n in prev_assign[p].iter() {
|
for n in prev_assign[p].iter() {
|
||||||
old_zones_of_p
|
old_zones_of_p
|
||||||
.push(zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?]);
|
.push(zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])]);
|
||||||
}
|
}
|
||||||
if !old_zones_of_p.contains(&z) {
|
if !old_zones_of_p.contains(&z) {
|
||||||
new_partitions_zone[z] += 1;
|
new_partitions_zone[z] += 1;
|
||||||
|
@ -796,7 +795,7 @@ impl LayoutVersion {
|
||||||
for z in 0..id_to_zone.len() {
|
for z in 0..id_to_zone.len() {
|
||||||
let mut nodes_of_z = Vec::<usize>::new();
|
let mut nodes_of_z = Vec::<usize>::new();
|
||||||
for n in 0..storing_nodes.len() {
|
for n in 0..storing_nodes.len() {
|
||||||
if self.get_node_zone(&self.node_id_vec[n])? == id_to_zone[z] {
|
if self.expect_get_node_zone(&self.node_id_vec[n]) == id_to_zone[z] {
|
||||||
nodes_of_z.push(n);
|
nodes_of_z.push(n);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue