diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 6812a135..de60bfd3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -353,7 +353,7 @@ impl RpcHelper { // Reorder requests to priorize closeness / low latency let request_order = self.request_order(&self.0.layout.read().unwrap().current(), to.iter().copied()); - let layout_nodes_count = request_order.len(); + let layout_nodes_count = request_order.len(); // The send_all_at_once flag overrides the behaviour described // above and sends an RPC to every node from the get-go. This @@ -417,7 +417,10 @@ impl RpcHelper { if let Some((fut, watchdog)) = requests.next() { if let Some(sleep) = watchdog { preemptive_watchdogs.push(sleep); - self.0.metrics.rpc_watchdogs_started_counter.add(1, &metric_tags); + self.0 + .metrics + .rpc_watchdogs_started_counter + .add(1, &metric_tags); } resp_stream.push(fut); } else { @@ -458,9 +461,14 @@ impl RpcHelper { Some(WatchedRPCResult::TimedOut(to)) => { // A watchdog has triggered, meaning one of the active requests is taking too long // Note that we don't cancel watchdogs after requests complete, so we need to ignore those - if target_outbound_count < layout_nodes_count && !completed_node_ids.contains(&to) { + if target_outbound_count < layout_nodes_count + && !completed_node_ids.contains(&to) + { target_outbound_count += 1; - self.0.metrics.rpc_watchdogs_preemption_counter.add(1, &metric_tags); + self.0 + .metrics + .rpc_watchdogs_preemption_counter + .add(1, &metric_tags); } } None => break,