WIP: Implement preemptive sends to alleviate slow RPC propagation #860
1 changed files with 12 additions and 4 deletions
|
@ -353,7 +353,7 @@ impl RpcHelper {
|
||||||
// Reorder requests to priorize closeness / low latency
|
// Reorder requests to priorize closeness / low latency
|
||||||
let request_order =
|
let request_order =
|
||||||
self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied());
|
self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied());
|
||||||
let layout_nodes_count = request_order.len();
|
let layout_nodes_count = request_order.len();
|
||||||
|
|
||||||
// The send_all_at_once flag overrides the behaviour described
|
// The send_all_at_once flag overrides the behaviour described
|
||||||
// above and sends an RPC to every node from the get-go. This
|
// above and sends an RPC to every node from the get-go. This
|
||||||
|
@ -417,7 +417,10 @@ impl RpcHelper {
|
||||||
if let Some((fut, watchdog)) = requests.next() {
|
if let Some((fut, watchdog)) = requests.next() {
|
||||||
if let Some(sleep) = watchdog {
|
if let Some(sleep) = watchdog {
|
||||||
preemptive_watchdogs.push(sleep);
|
preemptive_watchdogs.push(sleep);
|
||||||
self.0.metrics.rpc_watchdogs_started_counter.add(1, &metric_tags);
|
self.0
|
||||||
|
.metrics
|
||||||
|
.rpc_watchdogs_started_counter
|
||||||
|
.add(1, &metric_tags);
|
||||||
}
|
}
|
||||||
resp_stream.push(fut);
|
resp_stream.push(fut);
|
||||||
} else {
|
} else {
|
||||||
|
@ -458,9 +461,14 @@ impl RpcHelper {
|
||||||
Some(WatchedRPCResult::TimedOut(to)) => {
|
Some(WatchedRPCResult::TimedOut(to)) => {
|
||||||
// A watchdog has triggered, meaning one of the active requests is taking too long
|
// A watchdog has triggered, meaning one of the active requests is taking too long
|
||||||
// Note that we don't cancel watchdogs after requests complete, so we need to ignore those
|
// Note that we don't cancel watchdogs after requests complete, so we need to ignore those
|
||||||
if target_outbound_count < layout_nodes_count && !completed_node_ids.contains(&to) {
|
if target_outbound_count < layout_nodes_count
|
||||||
|
&& !completed_node_ids.contains(&to)
|
||||||
|
{
|
||||||
target_outbound_count += 1;
|
target_outbound_count += 1;
|
||||||
self.0.metrics.rpc_watchdogs_preemption_counter.add(1, &metric_tags);
|
self.0
|
||||||
|
.metrics
|
||||||
|
.rpc_watchdogs_preemption_counter
|
||||||
|
.add(1, &metric_tags);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => break,
|
None => break,
|
||||||
|
|
Loading…
Reference in a new issue