WIP: Implement preemptive sends to alleviate slow RPC propagation #860
2 changed files with 20 additions and 2 deletions
|
@ -4,6 +4,8 @@ use opentelemetry::{global, metrics::*};
|
||||||
pub struct RpcMetrics {
|
pub struct RpcMetrics {
|
||||||
pub(crate) rpc_counter: Counter<u64>,
|
pub(crate) rpc_counter: Counter<u64>,
|
||||||
pub(crate) rpc_timeout_counter: Counter<u64>,
|
pub(crate) rpc_timeout_counter: Counter<u64>,
|
||||||
|
pub(crate) rpc_watchdogs_started_counter: Counter<u64>,
|
||||||
|
pub(crate) rpc_watchdogs_preemption_counter: Counter<u64>,
|
||||||
pub(crate) rpc_netapp_error_counter: Counter<u64>,
|
pub(crate) rpc_netapp_error_counter: Counter<u64>,
|
||||||
pub(crate) rpc_garage_error_counter: Counter<u64>,
|
pub(crate) rpc_garage_error_counter: Counter<u64>,
|
||||||
|
|
||||||
|
@ -21,6 +23,14 @@ impl RpcMetrics {
|
||||||
.u64_counter("rpc.timeout_counter")
|
.u64_counter("rpc.timeout_counter")
|
||||||
.with_description("Number of RPC timeouts")
|
.with_description("Number of RPC timeouts")
|
||||||
.init(),
|
.init(),
|
||||||
|
rpc_watchdogs_started_counter: meter
|
||||||
|
.u64_counter("rpc.watchdogs_started_counter")
|
||||||
|
.with_description("Number of RPC requests started with a watchdog")
|
||||||
|
.init(),
|
||||||
|
rpc_watchdogs_preemption_counter: meter
|
||||||
|
.u64_counter("rpc.watchdogs_preemption_counter")
|
||||||
|
.with_description("Number of RPC watchdogs which timed out and caused an extra RPC to be scheduled")
|
||||||
|
.init(),
|
||||||
rpc_netapp_error_counter: meter
|
rpc_netapp_error_counter: meter
|
||||||
.u64_counter("rpc.netapp_error_counter")
|
.u64_counter("rpc.netapp_error_counter")
|
||||||
.with_description("Number of communication errors (errors in the Netapp library)")
|
.with_description("Number of communication errors (errors in the Netapp library)")
|
||||||
|
|
|
@ -353,6 +353,7 @@ impl RpcHelper {
|
||||||
// Reorder requests to priorize closeness / low latency
|
// Reorder requests to priorize closeness / low latency
|
||||||
let request_order =
|
let request_order =
|
||||||
self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied());
|
self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied());
|
||||||
|
let layout_nodes_count = request_order.len();
|
||||||
|
|
||||||
// The send_all_at_once flag overrides the behaviour described
|
// The send_all_at_once flag overrides the behaviour described
|
||||||
// above and sends an RPC to every node from the get-go. This
|
// above and sends an RPC to every node from the get-go. This
|
||||||
|
@ -370,6 +371,11 @@ impl RpcHelper {
|
||||||
let preemptive_send = strategy.rs_preemptive_send.unwrap_or(false);
|
let preemptive_send = strategy.rs_preemptive_send.unwrap_or(false);
|
||||||
let mut preemptive_watchdogs = FuturesUnordered::new();
|
let mut preemptive_watchdogs = FuturesUnordered::new();
|
||||||
let mut completed_node_ids = HashSet::<Uuid>::new();
|
let mut completed_node_ids = HashSet::<Uuid>::new();
|
||||||
|
let metric_tags = [
|
||||||
|
KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
|
||||||
|
KeyValue::new("from", format!("{:?}", self.0.our_node_id)),
|
||||||
|
KeyValue::new("to", format!("{:?}", to)),
|
||||||
|
];
|
||||||
|
|
||||||
// Build future for each request
|
// Build future for each request
|
||||||
// They are not started now: they are added below in a FuturesUnordered
|
// They are not started now: they are added below in a FuturesUnordered
|
||||||
|
@ -399,7 +405,7 @@ impl RpcHelper {
|
||||||
|
|
||||||
// The number of in-flight requests we want at the moment
|
// The number of in-flight requests we want at the moment
|
||||||
let mut target_outbound_count = if send_all_at_once {
|
let mut target_outbound_count = if send_all_at_once {
|
||||||
requests.len()
|
layout_nodes_count
|
||||||
} else {
|
} else {
|
||||||
quorum
|
quorum
|
||||||
};
|
};
|
||||||
|
@ -411,6 +417,7 @@ impl RpcHelper {
|
||||||
if let Some((fut, watchdog)) = requests.next() {
|
if let Some((fut, watchdog)) = requests.next() {
|
||||||
if let Some(sleep) = watchdog {
|
if let Some(sleep) = watchdog {
|
||||||
preemptive_watchdogs.push(sleep);
|
preemptive_watchdogs.push(sleep);
|
||||||
|
self.0.metrics.rpc_watchdogs_started_counter.add(1, &metric_tags);
|
||||||
}
|
}
|
||||||
resp_stream.push(fut);
|
resp_stream.push(fut);
|
||||||
} else {
|
} else {
|
||||||
|
@ -451,8 +458,9 @@ impl RpcHelper {
|
||||||
Some(WatchedRPCResult::TimedOut(to)) => {
|
Some(WatchedRPCResult::TimedOut(to)) => {
|
||||||
// A watchdog has triggered, meaning one of the active requests is taking too long
|
// A watchdog has triggered, meaning one of the active requests is taking too long
|
||||||
// Note that we don't cancel watchdogs after requests complete, so we need to ignore those
|
// Note that we don't cancel watchdogs after requests complete, so we need to ignore those
|
||||||
if !completed_node_ids.contains(&to) {
|
if target_outbound_count < layout_nodes_count && !completed_node_ids.contains(&to) {
|
||||||
target_outbound_count += 1;
|
target_outbound_count += 1;
|
||||||
|
self.0.metrics.rpc_watchdogs_preemption_counter.add(1, &metric_tags);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => break,
|
None => break,
|
||||||
|
|
Loading…
Reference in a new issue