diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 9beeda1f..f86ed599 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::fmt::Write; use std::net::SocketAddr; use std::sync::Arc; @@ -17,8 +16,7 @@ use opentelemetry_prometheus::PrometheusExporter; use prometheus::{Encoder, TextEncoder}; use garage_model::garage::Garage; -use garage_rpc::layout::NodeRoleV; -use garage_util::data::Uuid; +use garage_rpc::system::ClusterHealthStatus; use garage_util::error::Error as GarageError; use crate::generic_server::*; @@ -80,92 +78,61 @@ impl AdminApiServer { .body(Body::empty())?) } - fn handle_health(&self) -> Result, Error> { - let ring: Arc<_> = self.garage.system.ring.borrow().clone(); - let quorum = self.garage.replication_mode.write_quorum(); - let replication_factor = self.garage.replication_mode.replication_factor(); + fn handle_health(&self, format: Option<&str>) -> Result, Error> { + let health = self.garage.system.health(); - let nodes = self - .garage - .system - .get_known_nodes() - .into_iter() - .map(|n| (n.id, n)) - .collect::>(); - let n_nodes_connected = nodes.iter().filter(|(_, n)| n.is_up).count(); - - let storage_nodes = ring - .layout - .roles - .items() - .iter() - .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some())) - .collect::>(); - let n_storage_nodes_ok = storage_nodes - .iter() - .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) - .count(); - - let partitions = ring.partitions(); - let partitions_n_up = partitions - .iter() - .map(|(_, h)| { - let pn = ring.get_nodes(h, ring.replication_factor); - pn.iter() - .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) - .count() - }) - .collect::>(); - let n_partitions_full_ok = partitions_n_up - .iter() - .filter(|c| **c == replication_factor) - .count(); - let n_partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count(); - - let (status, status_str) = if n_partitions_quorum == partitions.len() - && n_storage_nodes_ok == storage_nodes.len() - { - (StatusCode::OK, "Garage is fully operational") - } else if n_partitions_quorum == partitions.len() { - ( + let (status, status_str) = match health.status { + ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"), + ClusterHealthStatus::Degraded => ( StatusCode::OK, "Garage is operational but some storage nodes are unavailable", - ) - } else { - ( + ), + ClusterHealthStatus::Unavailable => ( StatusCode::SERVICE_UNAVAILABLE, "Quorum is not available for some/all partitions, reads and writes will fail", - ) + ), }; - let mut buf = status_str.to_string(); - writeln!( - &mut buf, - "\nAll nodes: {} connected, {} known", - n_nodes_connected, - nodes.len() - ) - .unwrap(); - writeln!( - &mut buf, - "Storage nodes: {} connected, {} in layout", - n_storage_nodes_ok, - storage_nodes.len() - ) - .unwrap(); - writeln!(&mut buf, "Number of partitions: {}", partitions.len()).unwrap(); - writeln!(&mut buf, "Partitions with quorum: {}", n_partitions_quorum).unwrap(); - writeln!( - &mut buf, - "Partitions with all nodes available: {}", - n_partitions_full_ok - ) - .unwrap(); + let resp = Response::builder().status(status); - Ok(Response::builder() - .status(status) - .header(http::header::CONTENT_TYPE, "text/plain") - .body(Body::from(buf))?) + if matches!(format, Some("json")) { + let resp_json = + serde_json::to_string_pretty(&health).map_err(garage_util::error::Error::from)?; + Ok(resp + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(resp_json))?) + } else { + let mut buf = status_str.to_string(); + writeln!( + &mut buf, + "\nAll nodes: {} connected, {} known", + health.connected_nodes, health.known_nodes, + ) + .unwrap(); + writeln!( + &mut buf, + "Storage nodes: {} connected, {} in layout", + health.storage_nodes_ok, health.storage_nodes + ) + .unwrap(); + writeln!(&mut buf, "Number of partitions: {}", health.partitions).unwrap(); + writeln!( + &mut buf, + "Partitions with quorum: {}", + health.partitions_quorum + ) + .unwrap(); + writeln!( + &mut buf, + "Partitions with all nodes available: {}", + health.partitions_all_ok + ) + .unwrap(); + + Ok(resp + .header(http::header::CONTENT_TYPE, "text/plain") + .body(Body::from(buf))?) + } } fn handle_metrics(&self) -> Result, Error> { @@ -240,7 +207,7 @@ impl ApiHandler for AdminApiServer { match endpoint { Endpoint::Options => self.handle_options(&req), - Endpoint::Health => self.handle_health(), + Endpoint::Health { format } => self.handle_health(format.as_deref()), Endpoint::Metrics => self.handle_metrics(), Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await, Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await, diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs index 14411f75..6ffcc131 100644 --- a/src/api/admin/router.rs +++ b/src/api/admin/router.rs @@ -17,7 +17,9 @@ router_match! {@func #[derive(Debug, Clone, PartialEq, Eq)] pub enum Endpoint { Options, - Health, + Health { + format: Option, + }, Metrics, GetClusterStatus, ConnectClusterNodes, @@ -90,7 +92,7 @@ impl Endpoint { let res = router_match!(@gen_path_parser (req.method(), path, query) [ OPTIONS _ => Options, - GET "/health" => Health, + GET "/health" => Health (query_opt::format), GET "/metrics" => Metrics, GET "/v0/status" => GetClusterStatus, POST "/v0/connect" => ConnectClusterNodes, @@ -133,7 +135,7 @@ impl Endpoint { /// Get the kind of authorization which is required to perform the operation. pub fn authorization_type(&self) -> Authorization { match self { - Self::Health => Authorization::None, + Self::Health { .. } => Authorization::None, Self::Metrics => Authorization::MetricsToken, _ => Authorization::AdminToken, } @@ -141,6 +143,7 @@ impl Endpoint { } generateQueryParameters! { + "format" => format, "id" => id, "search" => search, "globalAlias" => global_alias, diff --git a/src/model/garage.rs b/src/model/garage.rs index c2aabea1..e34d034f 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -8,10 +8,10 @@ use garage_util::background::*; use garage_util::config::*; use garage_util::error::*; +use garage_rpc::replication_mode::ReplicationMode; use garage_rpc::system::System; use garage_block::manager::*; -use garage_table::replication::ReplicationMode; use garage_table::replication::TableFullReplication; use garage_table::replication::TableShardedReplication; use garage_table::*; @@ -167,12 +167,7 @@ impl Garage { .expect("Invalid replication_mode in config file."); info!("Initialize membership management system..."); - let system = System::new( - network_key, - background.clone(), - replication_mode.replication_factor(), - &config, - )?; + let system = System::new(network_key, background.clone(), replication_mode, &config)?; let data_rep_param = TableShardedReplication { system: system.clone(), diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 92caf75d..86f63568 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -9,6 +9,7 @@ mod consul; mod kubernetes; pub mod layout; +pub mod replication_mode; pub mod ring; pub mod system; diff --git a/src/table/replication/mode.rs b/src/rpc/replication_mode.rs similarity index 100% rename from src/table/replication/mode.rs rename to src/rpc/replication_mode.rs diff --git a/src/rpc/system.rs b/src/rpc/system.rs index d6576f20..2c6f14fd 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -35,6 +35,7 @@ use crate::consul::ConsulDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::*; +use crate::replication_mode::*; use crate::ring::*; use crate::rpc_helper::*; @@ -102,6 +103,7 @@ pub struct System { #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option, + replication_mode: ReplicationMode, replication_factor: usize, /// The ring @@ -136,6 +138,37 @@ pub struct KnownNodeInfo { pub status: NodeStatus, } +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct ClusterHealth { + /// The current health status of the cluster (see below) + pub status: ClusterHealthStatus, + /// Number of nodes already seen once in the cluster + pub known_nodes: usize, + /// Number of nodes currently connected + pub connected_nodes: usize, + /// Number of storage nodes declared in the current layout + pub storage_nodes: usize, + /// Number of storage nodes currently connected + pub storage_nodes_ok: usize, + /// Number of partitions in the layout + pub partitions: usize, + /// Number of partitions for which we have a quorum of connected nodes + pub partitions_quorum: usize, + /// Number of partitions for which all storage nodes are connected + pub partitions_all_ok: usize, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum ClusterHealthStatus { + /// All nodes are available + Healthy, + /// Some storage nodes are unavailable, but quorum is stil + /// achieved for all partitions + Degraded, + /// Quorum is not available for some partitions + Unavailable, +} + pub fn read_node_id(metadata_dir: &Path) -> Result { let mut pubkey_file = metadata_dir.to_path_buf(); pubkey_file.push("node_key.pub"); @@ -200,9 +233,11 @@ impl System { pub fn new( network_key: NetworkKey, background: Arc, - replication_factor: usize, + replication_mode: ReplicationMode, config: &Config, ) -> Result, Error> { + let replication_factor = replication_mode.replication_factor(); + let node_key = gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); info!( @@ -324,6 +359,7 @@ impl System { config.rpc_timeout_msec.map(Duration::from_millis), ), system_endpoint, + replication_mode, replication_factor, rpc_listen_addr: config.rpc_bind_addr, #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] @@ -429,6 +465,67 @@ impl System { } } + pub fn health(&self) -> ClusterHealth { + let ring: Arc<_> = self.ring.borrow().clone(); + let quorum = self.replication_mode.write_quorum(); + let replication_factor = self.replication_factor; + + let nodes = self + .get_known_nodes() + .into_iter() + .map(|n| (n.id, n)) + .collect::>(); + let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); + + let storage_nodes = ring + .layout + .roles + .items() + .iter() + .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some())) + .collect::>(); + let storage_nodes_ok = storage_nodes + .iter() + .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) + .count(); + + let partitions = ring.partitions(); + let partitions_n_up = partitions + .iter() + .map(|(_, h)| { + let pn = ring.get_nodes(h, ring.replication_factor); + pn.iter() + .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) + .count() + }) + .collect::>(); + let partitions_all_ok = partitions_n_up + .iter() + .filter(|c| **c == replication_factor) + .count(); + let partitions_quorum = partitions_n_up.iter().filter(|c| **c >= quorum).count(); + + let status = + if partitions_quorum == partitions.len() && storage_nodes_ok == storage_nodes.len() { + ClusterHealthStatus::Healthy + } else if partitions_quorum == partitions.len() { + ClusterHealthStatus::Degraded + } else { + ClusterHealthStatus::Unavailable + }; + + ClusterHealth { + status, + known_nodes: nodes.len(), + connected_nodes, + storage_nodes: storage_nodes.len(), + storage_nodes_ok, + partitions: partitions.len(), + partitions_quorum, + partitions_all_ok, + } + } + // ---- INTERNALS ---- #[cfg(feature = "consul-discovery")] diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs index 19e6772f..dfcb026a 100644 --- a/src/table/replication/mod.rs +++ b/src/table/replication/mod.rs @@ -1,10 +1,8 @@ mod parameters; mod fullcopy; -mod mode; mod sharded; pub use fullcopy::TableFullReplication; -pub use mode::ReplicationMode; pub use parameters::*; pub use sharded::TableShardedReplication;