system metrics improvements #726

Merged
lx merged 5 commits from peer-metrics into main 2024-02-20 15:35:13 +00:00
Showing only changes of commit b868493da9 - Show all commits

View file

@ -3,6 +3,7 @@ use std::time::{Duration, Instant};
use opentelemetry::{global, metrics::*, KeyValue};
use crate::ring::Ring;
use crate::system::{ClusterHealthStatus, System};
/// TableMetrics reference all counter used for metrics
@ -25,6 +26,10 @@ pub struct SystemMetrics {
pub(crate) _partitions: ValueObserver<u64>,
pub(crate) _partitions_quorum: ValueObserver<u64>,
pub(crate) _partitions_all_ok: ValueObserver<u64>,
// Status report for individual cluster nodes
pub(crate) _layout_node_connected: ValueObserver<u64>,
pub(crate) _layout_node_disconnected_time: ValueObserver<u64>,
}
impl SystemMetrics {
@ -204,6 +209,95 @@ impl SystemMetrics {
)
.init()
},
// Status report for individual cluster nodes
_layout_node_connected: {
let system = system.clone();
meter
.u64_value_observer("cluster_layout_node_connected", move |observer| {
let ring: Arc<Ring> = system.ring.borrow().clone();
let nodes = system.get_known_nodes();
for (id, _, config) in ring.layout.roles.items().iter() {
if let Some(role) = &config.0 {
let mut kv = vec![
KeyValue::new("id", format!("{:?}", id)),
KeyValue::new("role_zone", role.zone.clone()),
];
match role.capacity {
Some(cap) => {
kv.push(KeyValue::new("role_capacity", cap as i64));
kv.push(KeyValue::new("role_gateway", 0));
}
None => {
kv.push(KeyValue::new("role_gateway", 1));
}
}
let value;
if let Some(node) = nodes.iter().find(|n| n.id == *id) {
value = if node.is_up { 1 } else { 0 };
// TODO: if we add address and hostname, and those change, we
// get duplicate metrics, due to bad otel aggregation :(
// Can probably be fixed when we upgrade opentelemetry
// kv.push(KeyValue::new("address", node.addr.to_string()));
// kv.push(KeyValue::new(
// "hostname",
// node.status.hostname.clone(),
// ));
} else {
value = 0;
}
observer.observe(value, &kv);
}
}
})
.with_description("Connection status for nodes in the cluster layout")
.init()
},
_layout_node_disconnected_time: {
let system = system.clone();
meter
.u64_value_observer("cluster_layout_node_disconnected_time", move |observer| {
let ring: Arc<Ring> = system.ring.borrow().clone();
let nodes = system.get_known_nodes();
for (id, _, config) in ring.layout.roles.items().iter() {
if let Some(role) = &config.0 {
let mut kv = vec![
KeyValue::new("id", format!("{:?}", id)),
KeyValue::new("role_zone", role.zone.clone()),
];
match role.capacity {
Some(cap) => {
kv.push(KeyValue::new("role_capacity", cap as i64));
kv.push(KeyValue::new("role_gateway", 0));
}
None => {
kv.push(KeyValue::new("role_gateway", 1));
}
}
if let Some(node) = nodes.iter().find(|n| n.id == *id) {
// TODO: see comment above
// kv.push(KeyValue::new("address", node.addr.to_string()));
// kv.push(KeyValue::new(
// "hostname",
// node.status.hostname.clone(),
// ));
if node.is_up {
observer.observe(0, &kv);
} else if let Some(secs) = node.last_seen_secs_ago {
observer.observe(secs, &kv);
}
}
}
}
})
.with_description(
"Time (in seconds) since last connection to nodes in the cluster layout",
)
.init()
},
}
}
}