rpc: update system::health to take into account write sets for all partitions

This commit is contained in:
Alex 2023-11-27 12:10:21 +01:00
parent d6d239fc79
commit 78362140f5
Signed by: lx
GPG key ID: 0E496D15096376BE

View file

@ -1,5 +1,5 @@
//! Module containing structs related to membership management //! Module containing structs related to membership management
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -418,48 +418,61 @@ impl System {
} }
pub fn health(&self) -> ClusterHealth { pub fn health(&self) -> ClusterHealth {
// TODO: adapt this function to take into account layout history
// when estimating cluster health, and not just use current layout
let quorum = self.replication_mode.write_quorum(); let quorum = self.replication_mode.write_quorum();
let replication_factor = self.replication_factor;
// Gather information about running nodes.
// Technically, `nodes` contains currently running nodes, as well
// as nodes that this Garage process has been connected to at least
// once since it started.
let nodes = self let nodes = self
.get_known_nodes() .get_known_nodes()
.into_iter() .into_iter()
.map(|n| (n.id, n)) .map(|n| (n.id, n))
.collect::<HashMap<Uuid, _>>(); .collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
let node_up = |x: &Uuid| nodes.get(x).map(|n| n.is_up).unwrap_or(false);
let layout = self.cluster_layout(); // acquires a rwlock // Acquire a rwlock read-lock to the current cluster layout
let layout = self.cluster_layout();
let storage_nodes = layout // Obtain information about nodes that have a role as storage nodes
.current() // in one of the active layout versions
.roles let mut storage_nodes = HashSet::<Uuid>::with_capacity(16);
.items() for ver in layout.versions.iter() {
.iter() storage_nodes.extend(
.filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some())) ver.roles
.collect::<Vec<_>>(); .items()
let storage_nodes_ok = storage_nodes .iter()
.iter() .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
.filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .map(|(n, _, _)| *n),
.count(); )
}
let storage_nodes_ok = storage_nodes.iter().filter(|x| node_up(x)).count();
// Determine the number of partitions that have:
// - a quorum of up nodes for all write sets (i.e. are available)
// - for which all nodes in all write sets are up (i.e. are fully healthy)
let partitions = layout.current().partitions().collect::<Vec<_>>(); let partitions = layout.current().partitions().collect::<Vec<_>>();
let partitions_n_up = partitions let mut partitions_quorum = 0;
.iter() let mut partitions_all_ok = 0;
.map(|(_, h)| { for (_, hash) in partitions.iter() {
let pn = layout.current().nodes_of(h, replication_factor); let write_sets = layout
pn.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .versions
.count() .iter()
}) .map(|x| x.nodes_of(hash, x.replication_factor));
.collect::<Vec<usize>>(); let has_quorum = write_sets
let partitions_all_ok = partitions_n_up .clone()
.iter() .all(|set| set.filter(|x| node_up(x)).count() >= quorum);
.filter(|c| **c == replication_factor) let all_ok = write_sets.clone().all(|mut set| set.all(|x| node_up(&x)));
.count(); if has_quorum {
let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count(); partitions_quorum += 1;
}
if all_ok {
partitions_all_ok += 1;
}
}
// Determine overall cluster status
let status = let status =
if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() { if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() {
ClusterHealthStatus::Healthy ClusterHealthStatus::Healthy