diff --git a/src/rpc/system_metrics.rs b/src/rpc/system_metrics.rs index ad4aca2f..ffbef6df 100644 --- a/src/rpc/system_metrics.rs +++ b/src/rpc/system_metrics.rs @@ -3,6 +3,7 @@ use std::time::{Duration, Instant}; use opentelemetry::{global, metrics::*, KeyValue}; +use crate::ring::Ring; use crate::system::{ClusterHealthStatus, System}; /// TableMetrics reference all counter used for metrics @@ -25,6 +26,10 @@ pub struct SystemMetrics { pub(crate) _partitions: ValueObserver, pub(crate) _partitions_quorum: ValueObserver, pub(crate) _partitions_all_ok: ValueObserver, + + // Status report for individual cluster nodes + pub(crate) _layout_node_connected: ValueObserver, + pub(crate) _layout_node_disconnected_time: ValueObserver, } impl SystemMetrics { @@ -204,6 +209,95 @@ impl SystemMetrics { ) .init() }, + + // Status report for individual cluster nodes + _layout_node_connected: { + let system = system.clone(); + meter + .u64_value_observer("cluster_layout_node_connected", move |observer| { + let ring: Arc = system.ring.borrow().clone(); + let nodes = system.get_known_nodes(); + for (id, _, config) in ring.layout.roles.items().iter() { + if let Some(role) = &config.0 { + let mut kv = vec![ + KeyValue::new("id", format!("{:?}", id)), + KeyValue::new("role_zone", role.zone.clone()), + ]; + match role.capacity { + Some(cap) => { + kv.push(KeyValue::new("role_capacity", cap as i64)); + kv.push(KeyValue::new("role_gateway", 0)); + } + None => { + kv.push(KeyValue::new("role_gateway", 1)); + } + } + + let value; + if let Some(node) = nodes.iter().find(|n| n.id == *id) { + value = if node.is_up { 1 } else { 0 }; + // TODO: if we add address and hostname, and those change, we + // get duplicate metrics, due to bad otel aggregation :( + // Can probably be fixed when we upgrade opentelemetry + // kv.push(KeyValue::new("address", node.addr.to_string())); + // kv.push(KeyValue::new( + // "hostname", + // node.status.hostname.clone(), + // )); + } else { + value = 0; + } + + observer.observe(value, &kv); + } + } + }) + .with_description("Connection status for nodes in the cluster layout") + .init() + }, + _layout_node_disconnected_time: { + let system = system.clone(); + meter + .u64_value_observer("cluster_layout_node_disconnected_time", move |observer| { + let ring: Arc = system.ring.borrow().clone(); + let nodes = system.get_known_nodes(); + for (id, _, config) in ring.layout.roles.items().iter() { + if let Some(role) = &config.0 { + let mut kv = vec![ + KeyValue::new("id", format!("{:?}", id)), + KeyValue::new("role_zone", role.zone.clone()), + ]; + match role.capacity { + Some(cap) => { + kv.push(KeyValue::new("role_capacity", cap as i64)); + kv.push(KeyValue::new("role_gateway", 0)); + } + None => { + kv.push(KeyValue::new("role_gateway", 1)); + } + } + + if let Some(node) = nodes.iter().find(|n| n.id == *id) { + // TODO: see comment above + // kv.push(KeyValue::new("address", node.addr.to_string())); + // kv.push(KeyValue::new( + // "hostname", + // node.status.hostname.clone(), + // )); + if node.is_up { + observer.observe(0, &kv); + } else if let Some(secs) = node.last_seen_secs_ago { + observer.observe(secs, &kv); + } + } + } + } + }) + .with_description( + "Time (in seconds) since last connection to nodes in the cluster layout", + ) + .init() + }, } } }