garage_rpc: reorder functions in layout.rs

This commit is contained in:
Alex 2023-11-08 13:11:13 +01:00
parent f4d3905d15
commit 0962313ebd
Signed by: lx
GPG key ID: 0E496D15096376BE

View file

@ -278,11 +278,119 @@ impl ClusterLayout {
ret ret
} }
// ===================== accessors ======================
/// Returns a list of IDs of nodes that currently have
/// a role in the cluster
pub fn node_ids(&self) -> &[Uuid] {
&self.node_id_vec[..]
}
pub fn num_nodes(&self) -> usize {
self.node_id_vec.len()
}
/// Returns the role of a node in the layout
pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> {
match self.roles.get(node) {
Some(NodeRoleV(Some(v))) => Some(v),
_ => None,
}
}
/// Given a node uuids, this function returns its capacity or fails if it does not have any
pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> {
match self.node_role(uuid) {
Some(NodeRole {
capacity: Some(cap),
zone: _,
tags: _,
}) => Ok(*cap),
_ => Err(Error::Message(
"The Uuid does not correspond to a node present in the \
cluster or this node does not have a positive capacity."
.into(),
)),
}
}
/// Returns the number of partitions associated to this node in the ring
pub fn get_node_usage(&self, uuid: &Uuid) -> Result<usize, Error> {
for (i, id) in self.node_id_vec.iter().enumerate() {
if id == uuid {
let mut count = 0;
for nod in self.ring_assignment_data.iter() {
if i as u8 == *nod {
count += 1
}
}
return Ok(count);
}
}
Err(Error::Message(
"The Uuid does not correspond to a node present in the \
cluster or this node does not have a positive capacity."
.into(),
))
}
// ===================== internal information extractors ======================
/// Returns the uuids of the non_gateway nodes in self.node_id_vec.
fn nongateway_nodes(&self) -> Vec<Uuid> {
let mut result = Vec::<Uuid>::new();
for uuid in self.node_id_vec.iter() {
match self.node_role(uuid) {
Some(role) if role.capacity.is_some() => result.push(*uuid),
_ => (),
}
}
result
}
/// Given a node uuids, this function returns the label of its zone
fn get_node_zone(&self, uuid: &Uuid) -> Result<&str, Error> {
match self.node_role(uuid) {
Some(role) => Ok(&role.zone),
_ => Err(Error::Message(
"The Uuid does not correspond to a node present in the cluster.".into(),
)),
}
}
/// Returns the sum of capacities of non gateway nodes in the cluster
fn get_total_capacity(&self) -> Result<u64, Error> {
let mut total_capacity = 0;
for uuid in self.nongateway_nodes().iter() {
total_capacity += self.get_node_capacity(uuid)?;
}
Ok(total_capacity)
}
/// Returns the effective value of the zone_redundancy parameter
fn effective_zone_redundancy(&self) -> usize {
match self.parameters.zone_redundancy {
ZoneRedundancy::AtLeast(v) => v,
ZoneRedundancy::Maximum => {
let n_zones = self
.roles
.items()
.iter()
.filter_map(|(_, _, role)| role.0.as_ref().map(|x| x.zone.as_str()))
.collect::<HashSet<&str>>()
.len();
std::cmp::min(n_zones, self.replication_factor)
}
}
}
fn calculate_staging_hash(&self) -> Hash { fn calculate_staging_hash(&self) -> Hash {
let hashed_tuple = (&self.staging_roles, &self.staging_parameters); let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
} }
// ================== updates to layout, public interface ===================
pub fn merge(&mut self, other: &ClusterLayout) -> bool { pub fn merge(&mut self, other: &ClusterLayout) -> bool {
match other.version.cmp(&self.version) { match other.version.cmp(&self.version) {
Ordering::Greater => { Ordering::Greater => {
@ -359,108 +467,6 @@ To know the correct value of the new layout version, invoke `garage layout show`
Ok(self) Ok(self)
} }
/// Returns a list of IDs of nodes that currently have
/// a role in the cluster
pub fn node_ids(&self) -> &[Uuid] {
&self.node_id_vec[..]
}
pub fn num_nodes(&self) -> usize {
self.node_id_vec.len()
}
/// Returns the role of a node in the layout
pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> {
match self.roles.get(node) {
Some(NodeRoleV(Some(v))) => Some(v),
_ => None,
}
}
/// Returns the uuids of the non_gateway nodes in self.node_id_vec.
fn nongateway_nodes(&self) -> Vec<Uuid> {
let mut result = Vec::<Uuid>::new();
for uuid in self.node_id_vec.iter() {
match self.node_role(uuid) {
Some(role) if role.capacity.is_some() => result.push(*uuid),
_ => (),
}
}
result
}
/// Given a node uuids, this function returns the label of its zone
fn get_node_zone(&self, uuid: &Uuid) -> Result<String, Error> {
match self.node_role(uuid) {
Some(role) => Ok(role.zone.clone()),
_ => Err(Error::Message(
"The Uuid does not correspond to a node present in the cluster.".into(),
)),
}
}
/// Given a node uuids, this function returns its capacity or fails if it does not have any
pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> {
match self.node_role(uuid) {
Some(NodeRole {
capacity: Some(cap),
zone: _,
tags: _,
}) => Ok(*cap),
_ => Err(Error::Message(
"The Uuid does not correspond to a node present in the \
cluster or this node does not have a positive capacity."
.into(),
)),
}
}
/// Returns the number of partitions associated to this node in the ring
pub fn get_node_usage(&self, uuid: &Uuid) -> Result<usize, Error> {
for (i, id) in self.node_id_vec.iter().enumerate() {
if id == uuid {
let mut count = 0;
for nod in self.ring_assignment_data.iter() {
if i as u8 == *nod {
count += 1
}
}
return Ok(count);
}
}
Err(Error::Message(
"The Uuid does not correspond to a node present in the \
cluster or this node does not have a positive capacity."
.into(),
))
}
/// Returns the sum of capacities of non gateway nodes in the cluster
fn get_total_capacity(&self) -> Result<u64, Error> {
let mut total_capacity = 0;
for uuid in self.nongateway_nodes().iter() {
total_capacity += self.get_node_capacity(uuid)?;
}
Ok(total_capacity)
}
/// Returns the effective value of the zone_redundancy parameter
fn effective_zone_redundancy(&self) -> usize {
match self.parameters.zone_redundancy {
ZoneRedundancy::AtLeast(v) => v,
ZoneRedundancy::Maximum => {
let n_zones = self
.roles
.items()
.iter()
.filter_map(|(_, _, role)| role.0.as_ref().map(|x| x.zone.as_str()))
.collect::<HashSet<&str>>()
.len();
std::cmp::min(n_zones, self.replication_factor)
}
}
}
/// Check a cluster layout for internal consistency /// Check a cluster layout for internal consistency
/// (assignment, roles, parameters, partition size) /// (assignment, roles, parameters, partition size)
/// returns true if consistent, false if error /// returns true if consistent, false if error
@ -574,12 +580,9 @@ To know the correct value of the new layout version, invoke `garage layout show`
Ok(()) Ok(())
} }
}
// ==================================================================================== // ================== updates to layout, internals ===================
// Implementation of the ClusterLayout methods related to the assignment algorithm.
impl ClusterLayout {
/// This function calculates a new partition-to-node assignment. /// This function calculates a new partition-to-node assignment.
/// The computed assignment respects the node replication factor /// The computed assignment respects the node replication factor
/// and the zone redundancy parameter It maximizes the capacity of a /// and the zone redundancy parameter It maximizes the capacity of a
@ -867,7 +870,7 @@ impl ClusterLayout {
} }
for n in 0..self.nongateway_nodes().len() { for n in 0..self.nongateway_nodes().len() {
let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?;
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?]; let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?];
g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
for p in 0..NB_PARTITIONS { for p in 0..NB_PARTITIONS {
if !exclude_assoc.contains(&(p, n)) { if !exclude_assoc.contains(&(p, n)) {
@ -913,7 +916,7 @@ impl ClusterLayout {
// The algorithm is such that it will start with the flow that we just computed // The algorithm is such that it will start with the flow that we just computed
// and find ameliorating paths from that. // and find ameliorating paths from that.
for (p, n) in exclude_edge.iter() { for (p, n) in exclude_edge.iter() {
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?];
g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?; g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
} }
g.compute_maximal_flow()?; g.compute_maximal_flow()?;
@ -933,7 +936,7 @@ impl ClusterLayout {
let mut cost = CostFunction::new(); let mut cost = CostFunction::new();
for (p, assoc_p) in prev_assign.iter().enumerate() { for (p, assoc_p) in prev_assign.iter().enumerate() {
for n in assoc_p.iter() { for n in assoc_p.iter() {
let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]; let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?];
cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1); cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1);
} }
} }
@ -1035,7 +1038,7 @@ impl ClusterLayout {
let mut old_zones_of_p = Vec::<usize>::new(); let mut old_zones_of_p = Vec::<usize>::new();
for n in prev_assign[p].iter() { for n in prev_assign[p].iter() {
old_zones_of_p old_zones_of_p
.push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]); .push(zone_to_id[self.get_node_zone(&self.node_id_vec[*n])?]);
} }
if !old_zones_of_p.contains(&z) { if !old_zones_of_p.contains(&z) {
new_partitions_zone[z] += 1; new_partitions_zone[z] += 1;