WIP: Implement preemptive sends to alleviate slow RPC propagation #860
3 changed files with 123 additions and 25 deletions
|
@ -4,6 +4,8 @@ use opentelemetry::{global, metrics::*};
|
||||||
pub struct RpcMetrics {
|
pub struct RpcMetrics {
|
||||||
pub(crate) rpc_counter: Counter<u64>,
|
pub(crate) rpc_counter: Counter<u64>,
|
||||||
pub(crate) rpc_timeout_counter: Counter<u64>,
|
pub(crate) rpc_timeout_counter: Counter<u64>,
|
||||||
|
pub(crate) rpc_watchdogs_started_counter: Counter<u64>,
|
||||||
|
pub(crate) rpc_watchdogs_preemption_counter: Counter<u64>,
|
||||||
pub(crate) rpc_netapp_error_counter: Counter<u64>,
|
pub(crate) rpc_netapp_error_counter: Counter<u64>,
|
||||||
pub(crate) rpc_garage_error_counter: Counter<u64>,
|
pub(crate) rpc_garage_error_counter: Counter<u64>,
|
||||||
|
|
||||||
|
@ -21,6 +23,14 @@ impl RpcMetrics {
|
||||||
.u64_counter("rpc.timeout_counter")
|
.u64_counter("rpc.timeout_counter")
|
||||||
.with_description("Number of RPC timeouts")
|
.with_description("Number of RPC timeouts")
|
||||||
.init(),
|
.init(),
|
||||||
|
rpc_watchdogs_started_counter: meter
|
||||||
|
.u64_counter("rpc.watchdogs_started_counter")
|
||||||
|
.with_description("Number of RPC requests started with a watchdog")
|
||||||
|
.init(),
|
||||||
|
rpc_watchdogs_preemption_counter: meter
|
||||||
|
.u64_counter("rpc.watchdogs_preemption_counter")
|
||||||
|
.with_description("Number of RPC watchdogs which timed out and caused an extra RPC to be scheduled")
|
||||||
|
.init(),
|
||||||
rpc_netapp_error_counter: meter
|
rpc_netapp_error_counter: meter
|
||||||
.u64_counter("rpc.netapp_error_counter")
|
.u64_counter("rpc.netapp_error_counter")
|
||||||
.with_description("Number of communication errors (errors in the Netapp library)")
|
.with_description("Number of communication errors (errors in the Netapp library)")
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
//! Contain structs related to making RPCs
|
//! Contain structs related to making RPCs
|
||||||
use std::collections::HashMap;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -38,6 +38,8 @@ pub struct RequestStrategy<T> {
|
||||||
rs_quorum: Option<usize>,
|
rs_quorum: Option<usize>,
|
||||||
/// Send all requests at once
|
/// Send all requests at once
|
||||||
rs_send_all_at_once: Option<bool>,
|
rs_send_all_at_once: Option<bool>,
|
||||||
|
/// Start with enough RPCs to reach quorum, but send extras when some take too long
|
||||||
|
rs_preemptive_send: Option<bool>,
|
||||||
/// Request priority
|
/// Request priority
|
||||||
rs_priority: RequestPriority,
|
rs_priority: RequestPriority,
|
||||||
/// Custom timeout for this request
|
/// Custom timeout for this request
|
||||||
|
@ -58,6 +60,7 @@ impl Clone for RequestStrategy<()> {
|
||||||
RequestStrategy {
|
RequestStrategy {
|
||||||
rs_quorum: self.rs_quorum,
|
rs_quorum: self.rs_quorum,
|
||||||
rs_send_all_at_once: self.rs_send_all_at_once,
|
rs_send_all_at_once: self.rs_send_all_at_once,
|
||||||
|
rs_preemptive_send: self.rs_preemptive_send,
|
||||||
rs_priority: self.rs_priority,
|
rs_priority: self.rs_priority,
|
||||||
rs_timeout: self.rs_timeout,
|
rs_timeout: self.rs_timeout,
|
||||||
rs_drop_on_complete: (),
|
rs_drop_on_complete: (),
|
||||||
|
@ -71,6 +74,7 @@ impl RequestStrategy<()> {
|
||||||
RequestStrategy {
|
RequestStrategy {
|
||||||
rs_quorum: None,
|
rs_quorum: None,
|
||||||
rs_send_all_at_once: None,
|
rs_send_all_at_once: None,
|
||||||
|
rs_preemptive_send: None,
|
||||||
rs_priority: prio,
|
rs_priority: prio,
|
||||||
rs_timeout: Timeout::Default,
|
rs_timeout: Timeout::Default,
|
||||||
rs_drop_on_complete: (),
|
rs_drop_on_complete: (),
|
||||||
|
@ -81,6 +85,7 @@ impl RequestStrategy<()> {
|
||||||
RequestStrategy {
|
RequestStrategy {
|
||||||
rs_quorum: self.rs_quorum,
|
rs_quorum: self.rs_quorum,
|
||||||
rs_send_all_at_once: self.rs_send_all_at_once,
|
rs_send_all_at_once: self.rs_send_all_at_once,
|
||||||
|
rs_preemptive_send: self.rs_preemptive_send,
|
||||||
rs_priority: self.rs_priority,
|
rs_priority: self.rs_priority,
|
||||||
rs_timeout: self.rs_timeout,
|
rs_timeout: self.rs_timeout,
|
||||||
rs_drop_on_complete: drop_on_complete,
|
rs_drop_on_complete: drop_on_complete,
|
||||||
|
@ -94,11 +99,16 @@ impl<T> RequestStrategy<T> {
|
||||||
self.rs_quorum = Some(quorum);
|
self.rs_quorum = Some(quorum);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
/// Set quorum to be reached for request
|
/// Set flag to send all requests at once
|
||||||
pub fn send_all_at_once(mut self, value: bool) -> Self {
|
pub fn send_all_at_once(mut self, value: bool) -> Self {
|
||||||
self.rs_send_all_at_once = Some(value);
|
self.rs_send_all_at_once = Some(value);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
/// Set flag to preemptively send extra requests after some wait
|
||||||
|
pub fn with_preemptive_send(mut self, value: bool) -> Self {
|
||||||
|
self.rs_preemptive_send = Some(value);
|
||||||
|
self
|
||||||
|
}
|
||||||
/// Deactivate timeout for this request
|
/// Deactivate timeout for this request
|
||||||
pub fn without_timeout(mut self) -> Self {
|
pub fn without_timeout(mut self) -> Self {
|
||||||
self.rs_timeout = Timeout::None;
|
self.rs_timeout = Timeout::None;
|
||||||
|
@ -115,6 +125,7 @@ impl<T> RequestStrategy<T> {
|
||||||
RequestStrategy {
|
RequestStrategy {
|
||||||
rs_quorum: self.rs_quorum,
|
rs_quorum: self.rs_quorum,
|
||||||
rs_send_all_at_once: self.rs_send_all_at_once,
|
rs_send_all_at_once: self.rs_send_all_at_once,
|
||||||
|
rs_preemptive_send: self.rs_preemptive_send,
|
||||||
rs_priority: self.rs_priority,
|
rs_priority: self.rs_priority,
|
||||||
rs_timeout: self.rs_timeout,
|
rs_timeout: self.rs_timeout,
|
||||||
rs_drop_on_complete: (),
|
rs_drop_on_complete: (),
|
||||||
|
@ -335,29 +346,53 @@ impl RpcHelper {
|
||||||
S: Send + 'static,
|
S: Send + 'static,
|
||||||
{
|
{
|
||||||
// Once quorum is reached, other requests don't matter.
|
// Once quorum is reached, other requests don't matter.
|
||||||
// What we do here is only send the required number of requests
|
// The default here is to only send the required number of requests
|
||||||
// to reach a quorum, priorizing nodes with the lowest latency.
|
// to reach a quorum, priorizing nodes with the lowest latency.
|
||||||
// When there are errors, we start new requests to compensate.
|
// When there are errors, we start new requests to compensate.
|
||||||
|
|
||||||
// TODO: this could be made more aggressive, e.g. if after 2x the
|
|
||||||
// average ping of a given request, the response is not yet received,
|
|
||||||
// preemptively send an additional request to any remaining nodes.
|
|
||||||
|
|
||||||
// Reorder requests to priorize closeness / low latency
|
// Reorder requests to priorize closeness / low latency
|
||||||
let request_order =
|
let request_order =
|
||||||
self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied());
|
self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied());
|
||||||
|
let layout_nodes_count = request_order.len();
|
||||||
|
|
||||||
|
// The send_all_at_once flag overrides the behaviour described
|
||||||
|
// above and sends an RPC to every node from the get-go. This
|
||||||
|
// is more demanding on the network but also offers the best
|
||||||
|
// chance to reach quorum quickly.
|
||||||
let send_all_at_once = strategy.rs_send_all_at_once.unwrap_or(false);
|
let send_all_at_once = strategy.rs_send_all_at_once.unwrap_or(false);
|
||||||
|
|
||||||
|
// The preemptive_send flag is an attempt at compromise: we
|
||||||
|
// start by sending just enough to reach quorum, but associate
|
||||||
|
// each RPC to a watchog which triggers after 2x the average
|
||||||
|
// ping for that peer. When a watchdog triggers, an extra RPC
|
||||||
|
// is sent as a preemptive "replacement" in case the slow node
|
||||||
|
// is having serious trouble and can't reply. This is
|
||||||
|
// overriden by send_all_at_once.
|
||||||
|
let preemptive_send = strategy.rs_preemptive_send.unwrap_or(false);
|
||||||
|
let mut preemptive_watchdogs = FuturesUnordered::new();
|
||||||
|
let mut completed_node_ids = HashSet::<Uuid>::new();
|
||||||
|
let metric_tags = [
|
||||||
|
KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
|
||||||
|
KeyValue::new("from", format!("{:?}", self.0.our_node_id)),
|
||||||
|
KeyValue::new("to", format!("{:?}", to)),
|
||||||
|
];
|
||||||
|
|
||||||
// Build future for each request
|
// Build future for each request
|
||||||
// They are not started now: they are added below in a FuturesUnordered
|
// They are not started now: they are added below in a FuturesUnordered
|
||||||
// object that will take care of polling them (see below)
|
// object that will take care of polling them (see below)
|
||||||
let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
|
let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
|
||||||
let mut requests = request_order.into_iter().map(|to| {
|
let mut requests = request_order.into_iter().map(|(avg_ping, to)| {
|
||||||
let self2 = self.clone();
|
let self2 = self.clone();
|
||||||
let msg = msg.clone();
|
let msg = msg.clone();
|
||||||
let endpoint2 = endpoint.clone();
|
let endpoint2 = endpoint.clone();
|
||||||
let strategy = strategy.clone();
|
let strategy = strategy.clone();
|
||||||
async move { self2.call(&endpoint2, to, msg, strategy).await }
|
(
|
||||||
|
async move { (to, self2.call(&endpoint2, to, msg, strategy).await) },
|
||||||
|
preemptive_send.then_some(async move {
|
||||||
|
tokio::time::sleep(2 * avg_ping).await;
|
||||||
|
to
|
||||||
|
}),
|
||||||
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
// Vectors in which success results and errors will be collected
|
// Vectors in which success results and errors will be collected
|
||||||
|
@ -368,13 +403,26 @@ impl RpcHelper {
|
||||||
// (for the moment none, they will be added in the loop below)
|
// (for the moment none, they will be added in the loop below)
|
||||||
let mut resp_stream = FuturesUnordered::new();
|
let mut resp_stream = FuturesUnordered::new();
|
||||||
|
|
||||||
|
// The number of in-flight requests we want at the moment
|
||||||
|
let mut target_outbound_count = if send_all_at_once {
|
||||||
|
layout_nodes_count
|
||||||
|
} else {
|
||||||
|
quorum
|
||||||
|
};
|
||||||
|
|
||||||
// Do some requests and collect results
|
// Do some requests and collect results
|
||||||
while successes.len() < quorum {
|
while successes.len() < quorum {
|
||||||
// If the current set of requests that are running is not enough to possibly
|
// If our current outbound request count is not enough, start new ones
|
||||||
// reach quorum, start some new requests.
|
while successes.len() + resp_stream.len() < target_outbound_count {
|
||||||
while send_all_at_once || successes.len() + resp_stream.len() < quorum {
|
if let Some((fut, watchdog)) = requests.next() {
|
||||||
if let Some(fut) = requests.next() {
|
if let Some(sleep) = watchdog {
|
||||||
resp_stream.push(fut)
|
preemptive_watchdogs.push(sleep);
|
||||||
|
self.0
|
||||||
|
.metrics
|
||||||
|
.rpc_watchdogs_started_counter
|
||||||
|
.add(1, &metric_tags);
|
||||||
|
}
|
||||||
|
resp_stream.push(fut);
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -385,14 +433,45 @@ impl RpcHelper {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for one request to terminate
|
let response_or_watchdog = async {
|
||||||
match resp_stream.next().await.unwrap() {
|
if preemptive_watchdogs.is_empty() {
|
||||||
Ok(msg) => {
|
// We don't have any watchdogs to listen to, just wait for a request
|
||||||
successes.push(msg);
|
// This avoids waiting on a empty FuturesUnordered, which creates a busy wait
|
||||||
|
resp_stream
|
||||||
|
.next()
|
||||||
|
.await
|
||||||
|
.map(|(res, to)| WatchedRPCResult::Completed(res, to))
|
||||||
|
} else {
|
||||||
|
select! {
|
||||||
|
opt = resp_stream.next() => opt.map(|(res, to)| WatchedRPCResult::Completed(res, to)),
|
||||||
|
watchdog = preemptive_watchdogs.next() => watchdog.map(WatchedRPCResult::TimedOut),
|
||||||
}
|
}
|
||||||
Err(e) => {
|
|
||||||
errors.push(e);
|
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Wait for the next completed request, or for a watchdog to trigger
|
||||||
|
match response_or_watchdog.await {
|
||||||
|
Some(WatchedRPCResult::Completed(to, res)) => {
|
||||||
|
completed_node_ids.insert(to);
|
||||||
|
match res {
|
||||||
|
Ok(msg) => successes.push(msg),
|
||||||
|
Err(e) => errors.push(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(WatchedRPCResult::TimedOut(to)) => {
|
||||||
|
// A watchdog has triggered, meaning one of the active requests is taking too long
|
||||||
|
// Note that we don't cancel watchdogs after requests complete, so we need to ignore those
|
||||||
|
if target_outbound_count < layout_nodes_count
|
||||||
|
&& !completed_node_ids.contains(&to)
|
||||||
|
{
|
||||||
|
target_outbound_count += 1;
|
||||||
|
self.0
|
||||||
|
.metrics
|
||||||
|
.rpc_watchdogs_preemption_counter
|
||||||
|
.add(1, &metric_tags);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -554,7 +633,7 @@ impl RpcHelper {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let nodes = ver.nodes_of(position, ver.replication_factor);
|
let nodes = ver.nodes_of(position, ver.replication_factor);
|
||||||
for node in rpc_helper.request_order(layout.current(), nodes) {
|
for (_, node) in rpc_helper.request_order(layout.current(), nodes) {
|
||||||
if !ret.contains(&node) {
|
if !ret.contains(&node) {
|
||||||
ret.push(node);
|
ret.push(node);
|
||||||
}
|
}
|
||||||
|
@ -567,7 +646,7 @@ impl RpcHelper {
|
||||||
&self,
|
&self,
|
||||||
layout: &LayoutVersion,
|
layout: &LayoutVersion,
|
||||||
nodes: impl Iterator<Item = Uuid>,
|
nodes: impl Iterator<Item = Uuid>,
|
||||||
) -> Vec<Uuid> {
|
) -> Vec<(Duration, Uuid)> {
|
||||||
// Retrieve some status variables that we will use to sort requests
|
// Retrieve some status variables that we will use to sort requests
|
||||||
let peer_list = self.0.peering.get_peer_list();
|
let peer_list = self.0.peering.get_peer_list();
|
||||||
let our_zone = layout.get_node_zone(&self.0.our_node_id).unwrap_or("");
|
let our_zone = layout.get_node_zone(&self.0.our_node_id).unwrap_or("");
|
||||||
|
@ -600,7 +679,7 @@ impl RpcHelper {
|
||||||
|
|
||||||
nodes
|
nodes
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(_, _, _, to)| to)
|
.map(|(_, _, ping, to)| (ping, to))
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -709,3 +788,10 @@ where
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------- utility for tracking RPC results and watchdog triggers --------
|
||||||
|
|
||||||
|
enum WatchedRPCResult<S> {
|
||||||
|
Completed(Uuid, Result<S, Error>),
|
||||||
|
TimedOut(Uuid),
|
||||||
|
}
|
||||||
|
|
|
@ -317,7 +317,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
&who,
|
&who,
|
||||||
rpc,
|
rpc,
|
||||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||||
.with_quorum(self.data.replication.read_quorum()),
|
.with_quorum(self.data.replication.read_quorum())
|
||||||
|
.with_preemptive_send(true),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -412,7 +413,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
&who,
|
&who,
|
||||||
rpc,
|
rpc,
|
||||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||||
.with_quorum(self.data.replication.read_quorum()),
|
.with_quorum(self.data.replication.read_quorum())
|
||||||
|
.with_preemptive_send(true),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue