Implement /health admin API endpoint to check node health #440

Merged
lx merged 5 commits from admin-health-api into main 2022-12-11 17:25:29 +00:00
7 changed files with 156 additions and 95 deletions
Showing only changes of commit 280d1be7b1 - Show all commits

View file

@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::fmt::Write; use std::fmt::Write;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@ -17,8 +16,7 @@ use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder}; use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_rpc::layout::NodeRoleV; use garage_rpc::system::ClusterHealthStatus;
use garage_util::data::Uuid;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use crate::generic_server::*; use crate::generic_server::*;
@ -80,93 +78,62 @@ impl AdminApiServer {
.body(Body::empty())?) .body(Body::empty())?)
} }
fn handle_health(&self) -> Result<Response<Body>, Error> { fn handle_health(&self, format: Option<&str>) -> Result<Response<Body>, Error> {
let ring: Arc<_> = self.garage.system.ring.borrow().clone(); let health = self.garage.system.health();
let quorum = self.garage.replication_mode.write_quorum();
let replication_factor = self.garage.replication_mode.replication_factor();
let nodes = self let (status, status_str) = match health.status {
.garage ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"),
.system ClusterHealthStatus::Degraded => (
.get_known_nodes()
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<Uuid, _>>();
let n_nodes_connected = nodes.iter().filter(|(_, n)| n.is_up).count();
let storage_nodes = ring
.layout
.roles
.items()
.iter()
.filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
.collect::<Vec<_>>();
let n_storage_nodes_ok = storage_nodes
.iter()
.filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count();
let partitions = ring.partitions();
let partitions_n_up = partitions
.iter()
.map(|(_, h)| {
let pn = ring.get_nodes(h, ring.replication_factor);
pn.iter()
.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count()
})
.collect::<Vec<usize>>();
let n_partitions_full_ok = partitions_n_up
.iter()
.filter(|c| **c == replication_factor)
.count();
let n_partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count();
let (status, status_str) = if n_partitions_quorum == partitions.len()
&& n_storage_nodes_ok == storage_nodes.len()
{
(StatusCode::OK, "Garage is fully operational")
} else if n_partitions_quorum == partitions.len() {
(
StatusCode::OK, StatusCode::OK,
"Garage is operational but some storage nodes are unavailable", "Garage is operational but some storage nodes are unavailable",
) ),
} else { ClusterHealthStatus::Unavailable => (
(
StatusCode::SERVICE_UNAVAILABLE, StatusCode::SERVICE_UNAVAILABLE,
"Quorum is not available for some/all partitions, reads and writes will fail", "Quorum is not available for some/all partitions, reads and writes will fail",
) ),
}; };
let resp = Response::builder().status(status);
if matches!(format, Some("json")) {
let resp_json =
serde_json::to_string_pretty(&health).map_err(garage_util::error::Error::from)?;
Ok(resp
.header(http::header::CONTENT_TYPE, "application/json")
.body(Body::from(resp_json))?)
} else {
let mut buf = status_str.to_string(); let mut buf = status_str.to_string();
writeln!( writeln!(
&mut buf, &mut buf,
"\nAll nodes: {} connected, {} known", "\nAll nodes: {} connected, {} known",
n_nodes_connected, health.connected_nodes, health.known_nodes,
nodes.len()
) )
.unwrap(); .unwrap();
writeln!( writeln!(
&mut buf, &mut buf,
"Storage nodes: {} connected, {} in layout", "Storage nodes: {} connected, {} in layout",
n_storage_nodes_ok, health.storage_nodes_ok, health.storage_nodes
storage_nodes.len() )
.unwrap();
writeln!(&mut buf, "Number of partitions: {}", health.partitions).unwrap();
writeln!(
&mut buf,
"Partitions with quorum: {}",
health.partitions_quorum
) )
.unwrap(); .unwrap();
writeln!(&mut buf, "Number of partitions: {}", partitions.len()).unwrap();
writeln!(&mut buf, "Partitions with quorum: {}", n_partitions_quorum).unwrap();
writeln!( writeln!(
&mut buf, &mut buf,
"Partitions with all nodes available: {}", "Partitions with all nodes available: {}",
n_partitions_full_ok health.partitions_all_ok
) )
.unwrap(); .unwrap();
Ok(Response::builder() Ok(resp
.status(status)
.header(http::header::CONTENT_TYPE, "text/plain") .header(http::header::CONTENT_TYPE, "text/plain")
.body(Body::from(buf))?) .body(Body::from(buf))?)
} }
}
fn handle_metrics(&self) -> Result<Response<Body>, Error> { fn handle_metrics(&self) -> Result<Response<Body>, Error> {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
@ -240,7 +207,7 @@ impl ApiHandler for AdminApiServer {
match endpoint { match endpoint {
Endpoint::Options => self.handle_options(&req), Endpoint::Options => self.handle_options(&req),
Endpoint::Health => self.handle_health(), Endpoint::Health { format } => self.handle_health(format.as_deref()),
Endpoint::Metrics => self.handle_metrics(), Endpoint::Metrics => self.handle_metrics(),
Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await, Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await, Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await,

View file

@ -17,7 +17,9 @@ router_match! {@func
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum Endpoint { pub enum Endpoint {
Options, Options,
Health, Health {
format: Option<String>,
},
Metrics, Metrics,
GetClusterStatus, GetClusterStatus,
ConnectClusterNodes, ConnectClusterNodes,
@ -90,7 +92,7 @@ impl Endpoint {
let res = router_match!(@gen_path_parser (req.method(), path, query) [ let res = router_match!(@gen_path_parser (req.method(), path, query) [
OPTIONS _ => Options, OPTIONS _ => Options,
GET "/health" => Health, GET "/health" => Health (query_opt::format),
GET "/metrics" => Metrics, GET "/metrics" => Metrics,
GET "/v0/status" => GetClusterStatus, GET "/v0/status" => GetClusterStatus,
POST "/v0/connect" => ConnectClusterNodes, POST "/v0/connect" => ConnectClusterNodes,
@ -133,7 +135,7 @@ impl Endpoint {
/// Get the kind of authorization which is required to perform the operation. /// Get the kind of authorization which is required to perform the operation.
pub fn authorization_type(&self) -> Authorization { pub fn authorization_type(&self) -> Authorization {
match self { match self {
Self::Health => Authorization::None, Self::Health { .. } => Authorization::None,
Self::Metrics => Authorization::MetricsToken, Self::Metrics => Authorization::MetricsToken,
_ => Authorization::AdminToken, _ => Authorization::AdminToken,
} }
@ -141,6 +143,7 @@ impl Endpoint {
} }
generateQueryParameters! { generateQueryParameters! {
"format" => format,
"id" => id, "id" => id,
"search" => search, "search" => search,
"globalAlias" => global_alias, "globalAlias" => global_alias,

View file

@ -8,10 +8,10 @@ use garage_util::background::*;
use garage_util::config::*; use garage_util::config::*;
use garage_util::error::*; use garage_util::error::*;
use garage_rpc::replication_mode::ReplicationMode;
use garage_rpc::system::System; use garage_rpc::system::System;
use garage_block::manager::*; use garage_block::manager::*;
use garage_table::replication::ReplicationMode;
use garage_table::replication::TableFullReplication; use garage_table::replication::TableFullReplication;
use garage_table::replication::TableShardedReplication; use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
@ -167,12 +167,7 @@ impl Garage {
.expect("Invalid replication_mode in config file."); .expect("Invalid replication_mode in config file.");
info!("Initialize membership management system..."); info!("Initialize membership management system...");
let system = System::new( let system = System::new(network_key, background.clone(), replication_mode, &config)?;
network_key,
background.clone(),
replication_mode.replication_factor(),
&config,
)?;
let data_rep_param = TableShardedReplication { let data_rep_param = TableShardedReplication {
system: system.clone(), system: system.clone(),

View file

@ -9,6 +9,7 @@ mod consul;
mod kubernetes; mod kubernetes;
pub mod layout; pub mod layout;
pub mod replication_mode;
pub mod ring; pub mod ring;
pub mod system; pub mod system;

View file

@ -35,6 +35,7 @@ use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*; use crate::kubernetes::*;
use crate::layout::*; use crate::layout::*;
use crate::replication_mode::*;
use crate::ring::*; use crate::ring::*;
use crate::rpc_helper::*; use crate::rpc_helper::*;
@ -102,6 +103,7 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>, kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
replication_mode: ReplicationMode,
replication_factor: usize, replication_factor: usize,
/// The ring /// The ring
@ -136,6 +138,37 @@ pub struct KnownNodeInfo {
pub status: NodeStatus, pub status: NodeStatus,
} }
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct ClusterHealth {
/// The current health status of the cluster (see below)
pub status: ClusterHealthStatus,
/// Number of nodes already seen once in the cluster
pub known_nodes: usize,
/// Number of nodes currently connected
pub connected_nodes: usize,
/// Number of storage nodes declared in the current layout
pub storage_nodes: usize,
/// Number of storage nodes currently connected
pub storage_nodes_ok: usize,
/// Number of partitions in the layout
pub partitions: usize,
/// Number of partitions for which we have a quorum of connected nodes
pub partitions_quorum: usize,
/// Number of partitions for which all storage nodes are connected
pub partitions_all_ok: usize,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum ClusterHealthStatus {
/// All nodes are available
Healthy,
/// Some storage nodes are unavailable, but quorum is stil
/// achieved for all partitions
Degraded,
/// Quorum is not available for some partitions
Unavailable,
}
pub fn read_node_id(metadata_dir: &Path) -> Result<NodeID, Error> { pub fn read_node_id(metadata_dir: &Path) -> Result<NodeID, Error> {
let mut pubkey_file = metadata_dir.to_path_buf(); let mut pubkey_file = metadata_dir.to_path_buf();
pubkey_file.push("node_key.pub"); pubkey_file.push("node_key.pub");
@ -200,9 +233,11 @@ impl System {
pub fn new( pub fn new(
network_key: NetworkKey, network_key: NetworkKey,
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
replication_factor: usize, replication_mode: ReplicationMode,
config: &Config, config: &Config,
) -> Result<Arc<Self>, Error> { ) -> Result<Arc<Self>, Error> {
let replication_factor = replication_mode.replication_factor();
let node_key = let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!( info!(
@ -324,6 +359,7 @@ impl System {
config.rpc_timeout_msec.map(Duration::from_millis), config.rpc_timeout_msec.map(Duration::from_millis),
), ),
system_endpoint, system_endpoint,
replication_mode,
replication_factor, replication_factor,
rpc_listen_addr: config.rpc_bind_addr, rpc_listen_addr: config.rpc_bind_addr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
@ -429,6 +465,67 @@ impl System {
} }
} }
pub fn health(&self) -> ClusterHealth {
let ring: Arc<_> = self.ring.borrow().clone();
let quorum = self.replication_mode.write_quorum();
let replication_factor = self.replication_factor;
let nodes = self
.get_known_nodes()
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
let storage_nodes = ring
.layout
.roles
.items()
.iter()
.filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
.collect::<Vec<_>>();
let storage_nodes_ok = storage_nodes
.iter()
.filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count();
let partitions = ring.partitions();
let partitions_n_up = partitions
.iter()
.map(|(_, h)| {
let pn = ring.get_nodes(h, ring.replication_factor);
pn.iter()
.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count()
})
.collect::<Vec<usize>>();
let partitions_all_ok = partitions_n_up
.iter()
.filter(|c| **c == replication_factor)
.count();
let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count();
let status =
if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() {
ClusterHealthStatus::Healthy
} else if partitions_quorum == partitions.len() {
ClusterHealthStatus::Degraded
} else {
ClusterHealthStatus::Unavailable
};
ClusterHealth {
status,
known_nodes: nodes.len(),
connected_nodes,
storage_nodes: storage_nodes.len(),
storage_nodes_ok,
partitions: partitions.len(),
partitions_quorum,
partitions_all_ok,
}
}
// ---- INTERNALS ---- // ---- INTERNALS ----
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]

View file

@ -1,10 +1,8 @@
mod parameters; mod parameters;
mod fullcopy; mod fullcopy;
mod mode;
mod sharded; mod sharded;
pub use fullcopy::TableFullReplication; pub use fullcopy::TableFullReplication;
pub use mode::ReplicationMode;
pub use parameters::*; pub use parameters::*;
pub use sharded::TableShardedReplication; pub use sharded::TableShardedReplication;