rpc_helper: don't use tokio::spawn for individual requests

This commit is contained in:
Alex 2023-11-16 16:34:01 +01:00
parent 707442f5de
commit 22f38808e7
Signed by: lx
GPG key ID: 0E496D15096376BE

View file

@ -299,9 +299,7 @@ impl RpcHelper {
if let Some((req_to, fut)) = requests.next() { if let Some((req_to, fut)) = requests.next() {
let tracer = opentelemetry::global::tracer("garage"); let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", req_to)); let span = tracer.start(format!("RPC to {:?}", req_to));
resp_stream.push(tokio::spawn( resp_stream.push(fut.with_context(Context::current_with_span(span)));
fut.with_context(Context::current_with_span(span)),
));
} else { } else {
break; break;
} }
@ -313,7 +311,7 @@ impl RpcHelper {
} }
// Wait for one request to terminate // Wait for one request to terminate
match resp_stream.next().await.unwrap().unwrap() { match resp_stream.next().await.unwrap() {
Ok(msg) => { Ok(msg) => {
successes.push(msg); successes.push(msg);
} }
@ -448,7 +446,7 @@ impl RpcHelper {
let tracer = opentelemetry::global::tracer("garage"); let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", to)); let span = tracer.start(format!("RPC to {:?}", to));
let fut = async move { (to, self2.call(&endpoint2, to, msg, strategy).await) }; let fut = async move { (to, self2.call(&endpoint2, to, msg, strategy).await) };
tokio::spawn(fut.with_context(Context::current_with_span(span))) fut.with_context(Context::current_with_span(span))
}); });
let mut resp_stream = requests.collect::<FuturesUnordered<_>>(); let mut resp_stream = requests.collect::<FuturesUnordered<_>>();
@ -457,9 +455,7 @@ impl RpcHelper {
let mut set_counters = vec![(0, 0); to_sets.len()]; let mut set_counters = vec![(0, 0); to_sets.len()];
while !resp_stream.is_empty() { while let Some((node, resp)) = resp_stream.next().await {
let (node, resp) = resp_stream.next().await.unwrap().unwrap();
match resp { match resp {
Ok(msg) => { Ok(msg) => {
for set in peers.get(&node).unwrap().iter() { for set in peers.get(&node).unwrap().iter() {
@ -475,12 +471,12 @@ impl RpcHelper {
} }
} }
if set_counters.iter().all(|x| x.0 >= quorum) { if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) {
// Success // Success
// Continue all other requets in background // Continue all other requets in background
tokio::spawn(async move { tokio::spawn(async move {
resp_stream.collect::<Vec<Result<_, _>>>().await; resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await;
}); });
return Ok(successes); return Ok(successes);
@ -489,7 +485,7 @@ impl RpcHelper {
if set_counters if set_counters
.iter() .iter()
.enumerate() .enumerate()
.any(|(i, x)| x.1 + quorum > to_sets[i].len()) .any(|(i, (_, err_cnt))| err_cnt + quorum > to_sets[i].len())
{ {
// Too many errors in this set, we know we won't get a quorum // Too many errors in this set, we know we won't get a quorum
break; break;