diff --git a/src/rpc/metrics.rs b/src/rpc/metrics.rs index 61f8fa79..10327e96 100644 --- a/src/rpc/metrics.rs +++ b/src/rpc/metrics.rs @@ -4,6 +4,8 @@ use opentelemetry::{global, metrics::*}; pub struct RpcMetrics { pub(crate) rpc_counter: Counter, pub(crate) rpc_timeout_counter: Counter, + pub(crate) rpc_watchdogs_started_counter: Counter, + pub(crate) rpc_watchdogs_preemption_counter: Counter, pub(crate) rpc_netapp_error_counter: Counter, pub(crate) rpc_garage_error_counter: Counter, @@ -21,6 +23,14 @@ impl RpcMetrics { .u64_counter("rpc.timeout_counter") .with_description("Number of RPC timeouts") .init(), + rpc_watchdogs_started_counter: meter + .u64_counter("rpc.watchdogs_started_counter") + .with_description("Number of RPC requests started with a watchdog") + .init(), + rpc_watchdogs_preemption_counter: meter + .u64_counter("rpc.watchdogs_preemption_counter") + .with_description("Number of RPC watchdogs which timed out and caused an extra RPC to be scheduled") + .init(), rpc_netapp_error_counter: meter .u64_counter("rpc.netapp_error_counter") .with_description("Number of communication errors (errors in the Netapp library)") diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index ea3e5e76..de60bfd3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -1,5 +1,5 @@ //! Contain structs related to making RPCs -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -38,6 +38,8 @@ pub struct RequestStrategy { rs_quorum: Option, /// Send all requests at once rs_send_all_at_once: Option, + /// Start with enough RPCs to reach quorum, but send extras when some take too long + rs_preemptive_send: Option, /// Request priority rs_priority: RequestPriority, /// Custom timeout for this request @@ -58,6 +60,7 @@ impl Clone for RequestStrategy<()> { RequestStrategy { rs_quorum: self.rs_quorum, rs_send_all_at_once: self.rs_send_all_at_once, + rs_preemptive_send: self.rs_preemptive_send, rs_priority: self.rs_priority, rs_timeout: self.rs_timeout, rs_drop_on_complete: (), @@ -71,6 +74,7 @@ impl RequestStrategy<()> { RequestStrategy { rs_quorum: None, rs_send_all_at_once: None, + rs_preemptive_send: None, rs_priority: prio, rs_timeout: Timeout::Default, rs_drop_on_complete: (), @@ -81,6 +85,7 @@ impl RequestStrategy<()> { RequestStrategy { rs_quorum: self.rs_quorum, rs_send_all_at_once: self.rs_send_all_at_once, + rs_preemptive_send: self.rs_preemptive_send, rs_priority: self.rs_priority, rs_timeout: self.rs_timeout, rs_drop_on_complete: drop_on_complete, @@ -94,11 +99,16 @@ impl RequestStrategy { self.rs_quorum = Some(quorum); self } - /// Set quorum to be reached for request + /// Set flag to send all requests at once pub fn send_all_at_once(mut self, value: bool) -> Self { self.rs_send_all_at_once = Some(value); self } + /// Set flag to preemptively send extra requests after some wait + pub fn with_preemptive_send(mut self, value: bool) -> Self { + self.rs_preemptive_send = Some(value); + self + } /// Deactivate timeout for this request pub fn without_timeout(mut self) -> Self { self.rs_timeout = Timeout::None; @@ -115,6 +125,7 @@ impl RequestStrategy { RequestStrategy { rs_quorum: self.rs_quorum, rs_send_all_at_once: self.rs_send_all_at_once, + rs_preemptive_send: self.rs_preemptive_send, rs_priority: self.rs_priority, rs_timeout: self.rs_timeout, rs_drop_on_complete: (), @@ -335,29 +346,53 @@ impl RpcHelper { S: Send + 'static, { // Once quorum is reached, other requests don't matter. - // What we do here is only send the required number of requests + // The default here is to only send the required number of requests // to reach a quorum, priorizing nodes with the lowest latency. // When there are errors, we start new requests to compensate. - // TODO: this could be made more aggressive, e.g. if after 2x the - // average ping of a given request, the response is not yet received, - // preemptively send an additional request to any remaining nodes. - // Reorder requests to priorize closeness / low latency let request_order = self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied()); + let layout_nodes_count = request_order.len(); + + // The send_all_at_once flag overrides the behaviour described + // above and sends an RPC to every node from the get-go. This + // is more demanding on the network but also offers the best + // chance to reach quorum quickly. let send_all_at_once = strategy.rs_send_all_at_once.unwrap_or(false); + // The preemptive_send flag is an attempt at compromise: we + // start by sending just enough to reach quorum, but associate + // each RPC to a watchog which triggers after 2x the average + // ping for that peer. When a watchdog triggers, an extra RPC + // is sent as a preemptive "replacement" in case the slow node + // is having serious trouble and can't reply. This is + // overriden by send_all_at_once. + let preemptive_send = strategy.rs_preemptive_send.unwrap_or(false); + let mut preemptive_watchdogs = FuturesUnordered::new(); + let mut completed_node_ids = HashSet::::new(); + let metric_tags = [ + KeyValue::new("rpc_endpoint", endpoint.path().to_string()), + KeyValue::new("from", format!("{:?}", self.0.our_node_id)), + KeyValue::new("to", format!("{:?}", to)), + ]; + // Build future for each request // They are not started now: they are added below in a FuturesUnordered // object that will take care of polling them (see below) let msg = msg.into_req().map_err(garage_net::error::Error::from)?; - let mut requests = request_order.into_iter().map(|to| { + let mut requests = request_order.into_iter().map(|(avg_ping, to)| { let self2 = self.clone(); let msg = msg.clone(); let endpoint2 = endpoint.clone(); let strategy = strategy.clone(); - async move { self2.call(&endpoint2, to, msg, strategy).await } + ( + async move { (to, self2.call(&endpoint2, to, msg, strategy).await) }, + preemptive_send.then_some(async move { + tokio::time::sleep(2 * avg_ping).await; + to + }), + ) }); // Vectors in which success results and errors will be collected @@ -368,13 +403,26 @@ impl RpcHelper { // (for the moment none, they will be added in the loop below) let mut resp_stream = FuturesUnordered::new(); + // The number of in-flight requests we want at the moment + let mut target_outbound_count = if send_all_at_once { + layout_nodes_count + } else { + quorum + }; + // Do some requests and collect results while successes.len() < quorum { - // If the current set of requests that are running is not enough to possibly - // reach quorum, start some new requests. - while send_all_at_once || successes.len() + resp_stream.len() < quorum { - if let Some(fut) = requests.next() { - resp_stream.push(fut) + // If our current outbound request count is not enough, start new ones + while successes.len() + resp_stream.len() < target_outbound_count { + if let Some((fut, watchdog)) = requests.next() { + if let Some(sleep) = watchdog { + preemptive_watchdogs.push(sleep); + self.0 + .metrics + .rpc_watchdogs_started_counter + .add(1, &metric_tags); + } + resp_stream.push(fut); } else { break; } @@ -385,14 +433,45 @@ impl RpcHelper { break; } - // Wait for one request to terminate - match resp_stream.next().await.unwrap() { - Ok(msg) => { - successes.push(msg); + let response_or_watchdog = async { + if preemptive_watchdogs.is_empty() { + // We don't have any watchdogs to listen to, just wait for a request + // This avoids waiting on a empty FuturesUnordered, which creates a busy wait + resp_stream + .next() + .await + .map(|(res, to)| WatchedRPCResult::Completed(res, to)) + } else { + select! { + opt = resp_stream.next() => opt.map(|(res, to)| WatchedRPCResult::Completed(res, to)), + watchdog = preemptive_watchdogs.next() => watchdog.map(WatchedRPCResult::TimedOut), + } } - Err(e) => { - errors.push(e); + }; + + // Wait for the next completed request, or for a watchdog to trigger + match response_or_watchdog.await { + Some(WatchedRPCResult::Completed(to, res)) => { + completed_node_ids.insert(to); + match res { + Ok(msg) => successes.push(msg), + Err(e) => errors.push(e), + } } + Some(WatchedRPCResult::TimedOut(to)) => { + // A watchdog has triggered, meaning one of the active requests is taking too long + // Note that we don't cancel watchdogs after requests complete, so we need to ignore those + if target_outbound_count < layout_nodes_count + && !completed_node_ids.contains(&to) + { + target_outbound_count += 1; + self.0 + .metrics + .rpc_watchdogs_preemption_counter + .add(1, &metric_tags); + } + } + None => break, } } @@ -554,7 +633,7 @@ impl RpcHelper { continue; } let nodes = ver.nodes_of(position, ver.replication_factor); - for node in rpc_helper.request_order(layout.current(), nodes) { + for (_, node) in rpc_helper.request_order(layout.current(), nodes) { if !ret.contains(&node) { ret.push(node); } @@ -567,7 +646,7 @@ impl RpcHelper { &self, layout: &LayoutVersion, nodes: impl Iterator, - ) -> Vec { + ) -> Vec<(Duration, Uuid)> { // Retrieve some status variables that we will use to sort requests let peer_list = self.0.peering.get_peer_list(); let our_zone = layout.get_node_zone(&self.0.our_node_id).unwrap_or(""); @@ -600,7 +679,7 @@ impl RpcHelper { nodes .into_iter() - .map(|(_, _, _, to)| to) + .map(|(_, _, ping, to)| (ping, to)) .collect::>() } } @@ -709,3 +788,10 @@ where ) } } + +// ------- utility for tracking RPC results and watchdog triggers -------- + +enum WatchedRPCResult { + Completed(Uuid, Result), + TimedOut(Uuid), +} diff --git a/src/table/table.rs b/src/table/table.rs index a5be2910..88c4aa07 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -317,7 +317,8 @@ impl Table { &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()), + .with_quorum(self.data.replication.read_quorum()) + .with_preemptive_send(true), ) .await?; @@ -412,7 +413,8 @@ impl Table { &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()), + .with_quorum(self.data.replication.read_quorum()) + .with_preemptive_send(true), ) .await?;