diff --git a/src/rpc/metrics.rs b/src/rpc/metrics.rs index 61f8fa79..10327e96 100644 --- a/src/rpc/metrics.rs +++ b/src/rpc/metrics.rs @@ -4,6 +4,8 @@ use opentelemetry::{global, metrics::*}; pub struct RpcMetrics { pub(crate) rpc_counter: Counter, pub(crate) rpc_timeout_counter: Counter, + pub(crate) rpc_watchdogs_started_counter: Counter, + pub(crate) rpc_watchdogs_preemption_counter: Counter, pub(crate) rpc_netapp_error_counter: Counter, pub(crate) rpc_garage_error_counter: Counter, @@ -21,6 +23,14 @@ impl RpcMetrics { .u64_counter("rpc.timeout_counter") .with_description("Number of RPC timeouts") .init(), + rpc_watchdogs_started_counter: meter + .u64_counter("rpc.watchdogs_started_counter") + .with_description("Number of RPC requests started with a watchdog") + .init(), + rpc_watchdogs_preemption_counter: meter + .u64_counter("rpc.watchdogs_preemption_counter") + .with_description("Number of RPC watchdogs which timed out and caused an extra RPC to be scheduled") + .init(), rpc_netapp_error_counter: meter .u64_counter("rpc.netapp_error_counter") .with_description("Number of communication errors (errors in the Netapp library)") diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index eb412912..6812a135 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -353,6 +353,7 @@ impl RpcHelper { // Reorder requests to priorize closeness / low latency let request_order = self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied()); + let layout_nodes_count = request_order.len(); // The send_all_at_once flag overrides the behaviour described // above and sends an RPC to every node from the get-go. This @@ -370,6 +371,11 @@ impl RpcHelper { let preemptive_send = strategy.rs_preemptive_send.unwrap_or(false); let mut preemptive_watchdogs = FuturesUnordered::new(); let mut completed_node_ids = HashSet::::new(); + let metric_tags = [ + KeyValue::new("rpc_endpoint", endpoint.path().to_string()), + KeyValue::new("from", format!("{:?}", self.0.our_node_id)), + KeyValue::new("to", format!("{:?}", to)), + ]; // Build future for each request // They are not started now: they are added below in a FuturesUnordered @@ -399,7 +405,7 @@ impl RpcHelper { // The number of in-flight requests we want at the moment let mut target_outbound_count = if send_all_at_once { - requests.len() + layout_nodes_count } else { quorum }; @@ -411,6 +417,7 @@ impl RpcHelper { if let Some((fut, watchdog)) = requests.next() { if let Some(sleep) = watchdog { preemptive_watchdogs.push(sleep); + self.0.metrics.rpc_watchdogs_started_counter.add(1, &metric_tags); } resp_stream.push(fut); } else { @@ -451,8 +458,9 @@ impl RpcHelper { Some(WatchedRPCResult::TimedOut(to)) => { // A watchdog has triggered, meaning one of the active requests is taking too long // Note that we don't cancel watchdogs after requests complete, so we need to ignore those - if !completed_node_ids.contains(&to) { + if target_outbound_count < layout_nodes_count && !completed_node_ids.contains(&to) { target_outbound_count += 1; + self.0.metrics.rpc_watchdogs_preemption_counter.add(1, &metric_tags); } } None => break,