From 6ce14e2c9eb1ba81add3f61377a5a83854880b42 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 16 Apr 2020 23:13:15 +0200 Subject: [PATCH] Make all requests continue in the background even after we got enough responses. --- src/api_server.rs | 4 ++-- src/membership.rs | 2 +- src/rpc_client.rs | 39 +++++++++++++++++++++++++-------------- src/rpc_server.rs | 2 +- src/table.rs | 3 ++- 5 files changed, 31 insertions(+), 19 deletions(-) diff --git a/src/api_server.rs b/src/api_server.rs index 441fbe1..f3f7165 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -219,7 +219,7 @@ async fn put_block(garage: Arc, hash: Hash, data: Vec) -> Result<(), rpc_try_call_many( garage.system.clone(), &who[..], - &Message::PutBlock(PutBlockMessage { hash, data }), + Message::PutBlock(PutBlockMessage { hash, data }), (garage.system.config.data_replication_factor + 1) / 2, BLOCK_RW_TIMEOUT, ) @@ -366,7 +366,7 @@ async fn get_block(garage: Arc, hash: &Hash) -> Result, Error> { let resps = rpc_try_call_many( garage.system.clone(), &who[..], - &Message::GetBlock(hash.clone()), + Message::GetBlock(hash.clone()), 1, BLOCK_RW_TIMEOUT, ) diff --git a/src/membership.rs b/src/membership.rs index f511a4f..b49607b 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -270,7 +270,7 @@ impl System { .filter(|x| **x != self.id) .cloned() .collect::>(); - rpc_call_many(self.clone(), &to[..], &msg, timeout).await; + rpc_call_many(self.clone(), &to[..], msg, timeout).await; } pub async fn bootstrap(self: Arc) { diff --git a/src/rpc_client.rs b/src/rpc_client.rs index a1c5dde..81d2096 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use std::borrow::Borrow; use bytes::IntoBuf; use futures::stream::futures_unordered::FuturesUnordered; @@ -19,12 +20,13 @@ use crate::tls_util; pub async fn rpc_call_many( sys: Arc, to: &[UUID], - msg: &Message, + msg: Message, timeout: Duration, ) -> Vec> { + let msg = Arc::new(msg); let mut resp_stream = to .iter() - .map(|to| rpc_call(sys.clone(), to, msg, timeout)) + .map(|to| rpc_call(sys.clone(), to, msg.clone(), timeout)) .collect::>(); let mut results = vec![]; @@ -37,13 +39,15 @@ pub async fn rpc_call_many( pub async fn rpc_try_call_many( sys: Arc, to: &[UUID], - msg: &Message, + msg: Message, stop_after: usize, timeout: Duration, ) -> Result, Error> { - let mut resp_stream = to - .iter() - .map(|to| rpc_call(sys.clone(), to, msg, timeout)) + let sys2 = sys.clone(); + let msg = Arc::new(msg); + let mut resp_stream = to.to_vec() + .into_iter() + .map(move |to| rpc_call(sys2.clone(), to.clone(), msg.clone(), timeout)) .collect::>(); let mut results = vec![]; @@ -64,6 +68,13 @@ pub async fn rpc_try_call_many( } if results.len() >= stop_after { + // Continue requests in background + // TODO: make this optionnal (only usefull for write requests) + sys.background.spawn(async move { + resp_stream.collect::>().await; + Ok(()) + }); + Ok(results) } else { let mut msg = "Too many failures:".to_string(); @@ -74,17 +85,17 @@ pub async fn rpc_try_call_many( } } -pub async fn rpc_call( +pub async fn rpc_call, N: Borrow>( sys: Arc, - to: &UUID, - msg: &Message, + to: N, + msg: M, timeout: Duration, ) -> Result { let addr = { let status = sys.status.borrow().clone(); - match status.nodes.get(to) { + match status.nodes.get(to.borrow()) { Some(status) => status.addr.clone(), - None => return Err(Error::Message(format!("Peer ID not found: {:?}", to))), + None => return Err(Error::Message(format!("Peer ID not found: {:?}", to.borrow()))), } }; sys.rpc_client.call(&addr, msg, timeout).await @@ -119,10 +130,10 @@ impl RpcClient { } } - pub async fn call( + pub async fn call>( &self, to_addr: &SocketAddr, - msg: &Message, + msg: M, timeout: Duration, ) -> Result { let uri = match self { @@ -133,7 +144,7 @@ impl RpcClient { let req = Request::builder() .method(Method::POST) .uri(uri) - .body(Body::from(rmp_to_vec_all_named(msg)?))?; + .body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?; let resp_fut = match self { RpcClient::HTTP(client) => client.request(req).fuse(), diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 3527eda..b18e366 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -12,7 +12,7 @@ use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; -use crate::data::{rmp_to_vec_all_named, debug_serialize}; +use crate::data::*; use crate::error::Error; use crate::proto::Message; use crate::server::Garage; diff --git a/src/table.rs b/src/table.rs index 162f98e..3336451 100644 --- a/src/table.rs +++ b/src/table.rs @@ -280,7 +280,7 @@ impl Table { let resps = rpc_try_call_many( self.system.clone(), who, - &rpc_msg, + rpc_msg, quorum, self.param.timeout, ) @@ -384,6 +384,7 @@ impl Table { } pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { + eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end); // TODO Ok(()) }