From e628072d37d83a49cc85a962e755829ff61bbadf Mon Sep 17 00:00:00 2001 From: Julien Kritter Date: Fri, 23 Aug 2024 13:31:48 +0200 Subject: [PATCH 1/3] rpc: implement preemptive sends to alleviate slow RPC propagation --- src/rpc/rpc_helper.rs | 116 +++++++++++++++++++++++++++++++++--------- src/table/table.rs | 6 ++- 2 files changed, 97 insertions(+), 25 deletions(-) diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index ea3e5e76..eb412912 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -1,5 +1,5 @@ //! Contain structs related to making RPCs -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -38,6 +38,8 @@ pub struct RequestStrategy { rs_quorum: Option, /// Send all requests at once rs_send_all_at_once: Option, + /// Start with enough RPCs to reach quorum, but send extras when some take too long + rs_preemptive_send: Option, /// Request priority rs_priority: RequestPriority, /// Custom timeout for this request @@ -58,6 +60,7 @@ impl Clone for RequestStrategy<()> { RequestStrategy { rs_quorum: self.rs_quorum, rs_send_all_at_once: self.rs_send_all_at_once, + rs_preemptive_send: self.rs_preemptive_send, rs_priority: self.rs_priority, rs_timeout: self.rs_timeout, rs_drop_on_complete: (), @@ -71,6 +74,7 @@ impl RequestStrategy<()> { RequestStrategy { rs_quorum: None, rs_send_all_at_once: None, + rs_preemptive_send: None, rs_priority: prio, rs_timeout: Timeout::Default, rs_drop_on_complete: (), @@ -81,6 +85,7 @@ impl RequestStrategy<()> { RequestStrategy { rs_quorum: self.rs_quorum, rs_send_all_at_once: self.rs_send_all_at_once, + rs_preemptive_send: self.rs_preemptive_send, rs_priority: self.rs_priority, rs_timeout: self.rs_timeout, rs_drop_on_complete: drop_on_complete, @@ -94,11 +99,16 @@ impl RequestStrategy { self.rs_quorum = Some(quorum); self } - /// Set quorum to be reached for request + /// Set flag to send all requests at once pub fn send_all_at_once(mut self, value: bool) -> Self { self.rs_send_all_at_once = Some(value); self } + /// Set flag to preemptively send extra requests after some wait + pub fn with_preemptive_send(mut self, value: bool) -> Self { + self.rs_preemptive_send = Some(value); + self + } /// Deactivate timeout for this request pub fn without_timeout(mut self) -> Self { self.rs_timeout = Timeout::None; @@ -115,6 +125,7 @@ impl RequestStrategy { RequestStrategy { rs_quorum: self.rs_quorum, rs_send_all_at_once: self.rs_send_all_at_once, + rs_preemptive_send: self.rs_preemptive_send, rs_priority: self.rs_priority, rs_timeout: self.rs_timeout, rs_drop_on_complete: (), @@ -335,29 +346,47 @@ impl RpcHelper { S: Send + 'static, { // Once quorum is reached, other requests don't matter. - // What we do here is only send the required number of requests + // The default here is to only send the required number of requests // to reach a quorum, priorizing nodes with the lowest latency. // When there are errors, we start new requests to compensate. - // TODO: this could be made more aggressive, e.g. if after 2x the - // average ping of a given request, the response is not yet received, - // preemptively send an additional request to any remaining nodes. - // Reorder requests to priorize closeness / low latency let request_order = self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied()); + + // The send_all_at_once flag overrides the behaviour described + // above and sends an RPC to every node from the get-go. This + // is more demanding on the network but also offers the best + // chance to reach quorum quickly. let send_all_at_once = strategy.rs_send_all_at_once.unwrap_or(false); + // The preemptive_send flag is an attempt at compromise: we + // start by sending just enough to reach quorum, but associate + // each RPC to a watchog which triggers after 2x the average + // ping for that peer. When a watchdog triggers, an extra RPC + // is sent as a preemptive "replacement" in case the slow node + // is having serious trouble and can't reply. This is + // overriden by send_all_at_once. + let preemptive_send = strategy.rs_preemptive_send.unwrap_or(false); + let mut preemptive_watchdogs = FuturesUnordered::new(); + let mut completed_node_ids = HashSet::::new(); + // Build future for each request // They are not started now: they are added below in a FuturesUnordered // object that will take care of polling them (see below) let msg = msg.into_req().map_err(garage_net::error::Error::from)?; - let mut requests = request_order.into_iter().map(|to| { + let mut requests = request_order.into_iter().map(|(avg_ping, to)| { let self2 = self.clone(); let msg = msg.clone(); let endpoint2 = endpoint.clone(); let strategy = strategy.clone(); - async move { self2.call(&endpoint2, to, msg, strategy).await } + ( + async move { (to, self2.call(&endpoint2, to, msg, strategy).await) }, + preemptive_send.then_some(async move { + tokio::time::sleep(2 * avg_ping).await; + to + }), + ) }); // Vectors in which success results and errors will be collected @@ -368,13 +397,22 @@ impl RpcHelper { // (for the moment none, they will be added in the loop below) let mut resp_stream = FuturesUnordered::new(); + // The number of in-flight requests we want at the moment + let mut target_outbound_count = if send_all_at_once { + requests.len() + } else { + quorum + }; + // Do some requests and collect results while successes.len() < quorum { - // If the current set of requests that are running is not enough to possibly - // reach quorum, start some new requests. - while send_all_at_once || successes.len() + resp_stream.len() < quorum { - if let Some(fut) = requests.next() { - resp_stream.push(fut) + // If our current outbound request count is not enough, start new ones + while successes.len() + resp_stream.len() < target_outbound_count { + if let Some((fut, watchdog)) = requests.next() { + if let Some(sleep) = watchdog { + preemptive_watchdogs.push(sleep); + } + resp_stream.push(fut); } else { break; } @@ -385,14 +423,39 @@ impl RpcHelper { break; } - // Wait for one request to terminate - match resp_stream.next().await.unwrap() { - Ok(msg) => { - successes.push(msg); + let response_or_watchdog = async { + if preemptive_watchdogs.is_empty() { + // We don't have any watchdogs to listen to, just wait for a request + // This avoids waiting on a empty FuturesUnordered, which creates a busy wait + resp_stream + .next() + .await + .map(|(res, to)| WatchedRPCResult::Completed(res, to)) + } else { + select! { + opt = resp_stream.next() => opt.map(|(res, to)| WatchedRPCResult::Completed(res, to)), + watchdog = preemptive_watchdogs.next() => watchdog.map(WatchedRPCResult::TimedOut), + } } - Err(e) => { - errors.push(e); + }; + + // Wait for the next completed request, or for a watchdog to trigger + match response_or_watchdog.await { + Some(WatchedRPCResult::Completed(to, res)) => { + completed_node_ids.insert(to); + match res { + Ok(msg) => successes.push(msg), + Err(e) => errors.push(e), + } } + Some(WatchedRPCResult::TimedOut(to)) => { + // A watchdog has triggered, meaning one of the active requests is taking too long + // Note that we don't cancel watchdogs after requests complete, so we need to ignore those + if !completed_node_ids.contains(&to) { + target_outbound_count += 1; + } + } + None => break, } } @@ -554,7 +617,7 @@ impl RpcHelper { continue; } let nodes = ver.nodes_of(position, ver.replication_factor); - for node in rpc_helper.request_order(layout.current(), nodes) { + for (_, node) in rpc_helper.request_order(layout.current(), nodes) { if !ret.contains(&node) { ret.push(node); } @@ -567,7 +630,7 @@ impl RpcHelper { &self, layout: &LayoutVersion, nodes: impl Iterator, - ) -> Vec { + ) -> Vec<(Duration, Uuid)> { // Retrieve some status variables that we will use to sort requests let peer_list = self.0.peering.get_peer_list(); let our_zone = layout.get_node_zone(&self.0.our_node_id).unwrap_or(""); @@ -600,7 +663,7 @@ impl RpcHelper { nodes .into_iter() - .map(|(_, _, _, to)| to) + .map(|(_, _, ping, to)| (ping, to)) .collect::>() } } @@ -709,3 +772,10 @@ where ) } } + +// ------- utility for tracking RPC results and watchdog triggers -------- + +enum WatchedRPCResult { + Completed(Uuid, Result), + TimedOut(Uuid), +} diff --git a/src/table/table.rs b/src/table/table.rs index a5be2910..88c4aa07 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -317,7 +317,8 @@ impl Table { &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()), + .with_quorum(self.data.replication.read_quorum()) + .with_preemptive_send(true), ) .await?; @@ -412,7 +413,8 @@ impl Table { &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()), + .with_quorum(self.data.replication.read_quorum()) + .with_preemptive_send(true), ) .await?; -- 2.45.2 From 5b6521b868a88d65f2b75c66c819629de1f27a63 Mon Sep 17 00:00:00 2001 From: Julien Kritter Date: Mon, 26 Aug 2024 11:19:28 +0200 Subject: [PATCH 2/3] rpc: add watchdog metrics, ignore edge cases beyond the layout size --- src/rpc/metrics.rs | 10 ++++++++++ src/rpc/rpc_helper.rs | 12 ++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/rpc/metrics.rs b/src/rpc/metrics.rs index 61f8fa79..10327e96 100644 --- a/src/rpc/metrics.rs +++ b/src/rpc/metrics.rs @@ -4,6 +4,8 @@ use opentelemetry::{global, metrics::*}; pub struct RpcMetrics { pub(crate) rpc_counter: Counter, pub(crate) rpc_timeout_counter: Counter, + pub(crate) rpc_watchdogs_started_counter: Counter, + pub(crate) rpc_watchdogs_preemption_counter: Counter, pub(crate) rpc_netapp_error_counter: Counter, pub(crate) rpc_garage_error_counter: Counter, @@ -21,6 +23,14 @@ impl RpcMetrics { .u64_counter("rpc.timeout_counter") .with_description("Number of RPC timeouts") .init(), + rpc_watchdogs_started_counter: meter + .u64_counter("rpc.watchdogs_started_counter") + .with_description("Number of RPC requests started with a watchdog") + .init(), + rpc_watchdogs_preemption_counter: meter + .u64_counter("rpc.watchdogs_preemption_counter") + .with_description("Number of RPC watchdogs which timed out and caused an extra RPC to be scheduled") + .init(), rpc_netapp_error_counter: meter .u64_counter("rpc.netapp_error_counter") .with_description("Number of communication errors (errors in the Netapp library)") diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index eb412912..6812a135 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -353,6 +353,7 @@ impl RpcHelper { // Reorder requests to priorize closeness / low latency let request_order = self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied()); + let layout_nodes_count = request_order.len(); // The send_all_at_once flag overrides the behaviour described // above and sends an RPC to every node from the get-go. This @@ -370,6 +371,11 @@ impl RpcHelper { let preemptive_send = strategy.rs_preemptive_send.unwrap_or(false); let mut preemptive_watchdogs = FuturesUnordered::new(); let mut completed_node_ids = HashSet::::new(); + let metric_tags = [ + KeyValue::new("rpc_endpoint", endpoint.path().to_string()), + KeyValue::new("from", format!("{:?}", self.0.our_node_id)), + KeyValue::new("to", format!("{:?}", to)), + ]; // Build future for each request // They are not started now: they are added below in a FuturesUnordered @@ -399,7 +405,7 @@ impl RpcHelper { // The number of in-flight requests we want at the moment let mut target_outbound_count = if send_all_at_once { - requests.len() + layout_nodes_count } else { quorum }; @@ -411,6 +417,7 @@ impl RpcHelper { if let Some((fut, watchdog)) = requests.next() { if let Some(sleep) = watchdog { preemptive_watchdogs.push(sleep); + self.0.metrics.rpc_watchdogs_started_counter.add(1, &metric_tags); } resp_stream.push(fut); } else { @@ -451,8 +458,9 @@ impl RpcHelper { Some(WatchedRPCResult::TimedOut(to)) => { // A watchdog has triggered, meaning one of the active requests is taking too long // Note that we don't cancel watchdogs after requests complete, so we need to ignore those - if !completed_node_ids.contains(&to) { + if target_outbound_count < layout_nodes_count && !completed_node_ids.contains(&to) { target_outbound_count += 1; + self.0.metrics.rpc_watchdogs_preemption_counter.add(1, &metric_tags); } } None => break, -- 2.45.2 From 1ca12a9cb621eb907864f9987376d95cecdd9851 Mon Sep 17 00:00:00 2001 From: Julien Kritter Date: Mon, 26 Aug 2024 15:36:04 +0200 Subject: [PATCH 3/3] rpc: formatting fixes in rpc_helper --- src/rpc/rpc_helper.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 6812a135..de60bfd3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -353,7 +353,7 @@ impl RpcHelper { // Reorder requests to priorize closeness / low latency let request_order = self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied()); - let layout_nodes_count = request_order.len(); + let layout_nodes_count = request_order.len(); // The send_all_at_once flag overrides the behaviour described // above and sends an RPC to every node from the get-go. This @@ -417,7 +417,10 @@ impl RpcHelper { if let Some((fut, watchdog)) = requests.next() { if let Some(sleep) = watchdog { preemptive_watchdogs.push(sleep); - self.0.metrics.rpc_watchdogs_started_counter.add(1, &metric_tags); + self.0 + .metrics + .rpc_watchdogs_started_counter + .add(1, &metric_tags); } resp_stream.push(fut); } else { @@ -458,9 +461,14 @@ impl RpcHelper { Some(WatchedRPCResult::TimedOut(to)) => { // A watchdog has triggered, meaning one of the active requests is taking too long // Note that we don't cancel watchdogs after requests complete, so we need to ignore those - if target_outbound_count < layout_nodes_count && !completed_node_ids.contains(&to) { + if target_outbound_count < layout_nodes_count + && !completed_node_ids.contains(&to) + { target_outbound_count += 1; - self.0.metrics.rpc_watchdogs_preemption_counter.add(1, &metric_tags); + self.0 + .metrics + .rpc_watchdogs_preemption_counter + .add(1, &metric_tags); } } None => break, -- 2.45.2