diff --git a/src/block/manager.rs b/src/block/manager.rs index 2bb9c23d..0ca8bc31 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -354,8 +354,7 @@ impl BlockManager { /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { - // TODO: use quorums among latest write set - let who = self.replication.storage_nodes(&hash); + let who = self.replication.write_sets(&hash); let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) .await @@ -365,9 +364,9 @@ impl BlockManager { self.system .rpc_helper() - .try_call_many( + .try_write_many_sets( &self.endpoint, - &who[..], + &who, put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) .with_quorum(self.replication.write_quorum()), diff --git a/src/block/resync.rs b/src/block/resync.rs index 122d0142..15f210e4 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -434,7 +434,7 @@ impl BlockResyncManager { .rpc_helper() .try_call_many( &manager.endpoint, - &need_nodes[..], + &need_nodes, put_block_message, RequestStrategy::with_priority(PRIO_BACKGROUND) .with_quorum(need_nodes.len()), diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index aa3323d5..863a068a 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -134,16 +134,14 @@ impl K2VRpcHandler { .rpc_helper() .try_call_many( &self.endpoint, - &who[..], + &who, K2VRpc::InsertItem(InsertedItem { partition, sort_key, causal_context, value, }), - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(1) - .interrupt_after_quorum(true), + RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1), ) .await?; @@ -192,9 +190,7 @@ impl K2VRpcHandler { &self.endpoint, &nodes[..], K2VRpc::InsertManyItems(items), - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(1) - .interrupt_after_quorum(true), + RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1), ) .await?; Ok::<_, Error>((nodes, resp)) @@ -223,7 +219,7 @@ impl K2VRpcHandler { }, sort_key, }; - // TODO figure this out with write sets, does it still work???? + // TODO figure this out with write sets, is it still appropriate??? let nodes = self .item_table .data @@ -232,7 +228,7 @@ impl K2VRpcHandler { let rpc = self.system.rpc_helper().try_call_many( &self.endpoint, - &nodes[..], + &nodes, K2VRpc::PollItem { key: poll_key, causal_context, @@ -240,6 +236,7 @@ impl K2VRpcHandler { }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.item_table.data.replication.read_quorum()) + .send_all_at_once(true) .without_timeout(), ); let timeout_duration = diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index ce291068..12d073b6 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -1,4 +1,5 @@ //! Contain structs related to making RPCs +use std::collections::HashMap; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -35,11 +36,11 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); #[derive(Copy, Clone)] pub struct RequestStrategy { /// Min number of response to consider the request successful - pub rs_quorum: Option, - /// Should requests be dropped after enough response are received - pub rs_interrupt_after_quorum: bool, + rs_quorum: Option, + /// Send all requests at once + rs_send_all_at_once: Option, /// Request priority - pub rs_priority: RequestPriority, + rs_priority: RequestPriority, /// Custom timeout for this request rs_timeout: Timeout, } @@ -56,7 +57,7 @@ impl RequestStrategy { pub fn with_priority(prio: RequestPriority) -> Self { RequestStrategy { rs_quorum: None, - rs_interrupt_after_quorum: false, + rs_send_all_at_once: None, rs_priority: prio, rs_timeout: Timeout::Default, } @@ -66,10 +67,9 @@ impl RequestStrategy { self.rs_quorum = Some(quorum); self } - /// Set if requests can be dropped after quorum has been reached - /// In general true for read requests, and false for write - pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { - self.rs_interrupt_after_quorum = interrupt; + /// Set quorum to be reached for request + pub fn send_all_at_once(mut self, value: bool) -> Self { + self.rs_send_all_at_once = Some(value); self } /// Deactivate timeout for this request @@ -235,31 +235,19 @@ impl RpcHelper { let quorum = strategy.rs_quorum.unwrap_or(to.len()); let tracer = opentelemetry::global::tracer("garage"); - let span_name = if strategy.rs_interrupt_after_quorum { - format!("RPC {} to {} of {}", endpoint.path(), quorum, to.len()) - } else { - format!( - "RPC {} to {} (quorum {})", - endpoint.path(), - to.len(), - quorum - ) - }; + let span_name = format!("Read RPC {} to {} of {}", endpoint.path(), quorum, to.len()); + let mut span = tracer.start(span_name); span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id))); span.set_attribute(KeyValue::new("to", format!("{:?}", to))); span.set_attribute(KeyValue::new("quorum", quorum as i64)); - span.set_attribute(KeyValue::new( - "interrupt_after_quorum", - strategy.rs_interrupt_after_quorum.to_string(), - )); - self.try_call_many_internal(endpoint, to, msg, strategy, quorum) + self.try_call_many_inner(endpoint, to, msg, strategy, quorum) .with_context(Context::current_with_span(span)) .await } - async fn try_call_many_internal( + async fn try_call_many_inner( &self, endpoint: &Arc>, to: &[Uuid], @@ -273,12 +261,20 @@ impl RpcHelper { H: StreamingEndpointHandler + 'static, S: Send + 'static, { - let msg = msg.into_req().map_err(netapp::error::Error::from)?; + // Once quorum is reached, other requests don't matter. + // What we do here is only send the required number of requests + // to reach a quorum, priorizing nodes with the lowest latency. + // When there are errors, we start new requests to compensate. + + // Reorder requests to priorize closeness / low latency + let request_order = self.request_order(to); + let send_all_at_once = strategy.rs_send_all_at_once.unwrap_or(false); // Build future for each request // They are not started now: they are added below in a FuturesUnordered // object that will take care of polling them (see below) - let requests = to.iter().cloned().map(|to| { + let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let mut requests = request_order.into_iter().map(|to| { let self2 = self.clone(); let msg = msg.clone(); let endpoint2 = endpoint.clone(); @@ -291,92 +287,39 @@ impl RpcHelper { let mut successes = vec![]; let mut errors = vec![]; - if strategy.rs_interrupt_after_quorum { - // Case 1: once quorum is reached, other requests don't matter. - // What we do here is only send the required number of requests - // to reach a quorum, priorizing nodes with the lowest latency. - // When there are errors, we start new requests to compensate. + // resp_stream will contain all of the requests that are currently in flight. + // (for the moment none, they will be added in the loop below) + let mut resp_stream = FuturesUnordered::new(); - // Reorder requests to priorize closeness / low latency - let request_order = self.request_order(to); - let mut ord_requests = vec![(); request_order.len()] - .into_iter() - .map(|_| None) - .collect::>(); - for (to, fut) in requests { - let i = request_order.iter().position(|x| *x == to).unwrap(); - ord_requests[i] = Some((to, fut)); - } - - // Make an iterator to take requests in their sorted order - let mut requests = ord_requests.into_iter().map(Option::unwrap); - - // resp_stream will contain all of the requests that are currently in flight. - // (for the moment none, they will be added in the loop below) - let mut resp_stream = FuturesUnordered::new(); - - // Do some requests and collect results - 'request_loop: while successes.len() < quorum { - // If the current set of requests that are running is not enough to possibly - // reach quorum, start some new requests. - while successes.len() + resp_stream.len() < quorum { - if let Some((req_to, fut)) = requests.next() { - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer.start(format!("RPC to {:?}", req_to)); - resp_stream.push(tokio::spawn( - fut.with_context(Context::current_with_span(span)), - )); - } else { - // If we have no request to add, we know that we won't ever - // reach quorum: bail out now. - break 'request_loop; - } - } - assert!(!resp_stream.is_empty()); // because of loop invariants - - // Wait for one request to terminate - match resp_stream.next().await.unwrap().unwrap() { - Ok(msg) => { - successes.push(msg); - } - Err(e) => { - errors.push(e); - } - } - } - } else { - // Case 2: all of the requests need to be sent in all cases, - // and need to terminate. (this is the case for writes that - // must be spread to n nodes) - // Just start all the requests in parallel and return as soon - // as the quorum is reached. - let mut resp_stream = requests - .map(|(_, fut)| fut) - .collect::>(); - - while let Some(resp) = resp_stream.next().await { - match resp { - Ok(msg) => { - successes.push(msg); - if successes.len() >= quorum { - break; - } - } - Err(e) => { - errors.push(e); - } + // Do some requests and collect results + while successes.len() < quorum { + // If the current set of requests that are running is not enough to possibly + // reach quorum, start some new requests. + while send_all_at_once || successes.len() + resp_stream.len() < quorum { + if let Some((req_to, fut)) = requests.next() { + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer.start(format!("RPC to {:?}", req_to)); + resp_stream.push(tokio::spawn( + fut.with_context(Context::current_with_span(span)), + )); + } else { + break; } } - if !resp_stream.is_empty() { - // Continue remaining requests in background. - // Note: these requests can get interrupted on process shutdown, - // we must not count on them being executed for certain. - // For all background things that have to happen with certainty, - // they have to be put in a proper queue that is persisted to disk. - tokio::spawn(async move { - resp_stream.collect::>>().await; - }); + if successes.len() + resp_stream.len() < quorum { + // We know we won't ever reach quorum + break; + } + + // Wait for one request to terminate + match resp_stream.next().await.unwrap().unwrap() { + Ok(msg) => { + successes.push(msg); + } + Err(e) => { + errors.push(e); + } } } @@ -432,4 +375,123 @@ impl RpcHelper { .map(|(_, _, _, to)| to) .collect::>() } + + pub async fn try_write_many_sets( + &self, + endpoint: &Arc>, + to_sets: &[Vec], + msg: N, + strategy: RequestStrategy, + ) -> Result, Error> + where + M: Rpc> + 'static, + N: IntoReq, + H: StreamingEndpointHandler + 'static, + S: Send + 'static, + { + let quorum = strategy + .rs_quorum + .expect("internal error: missing quroum in try_write_many_sets"); + + let tracer = opentelemetry::global::tracer("garage"); + let span_name = format!( + "Write RPC {} (quorum {} in {} sets)", + endpoint.path(), + quorum, + to_sets.len() + ); + + let mut span = tracer.start(span_name); + span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id))); + span.set_attribute(KeyValue::new("to", format!("{:?}", to_sets))); + span.set_attribute(KeyValue::new("quorum", quorum as i64)); + + self.try_write_many_sets_inner(endpoint, to_sets, msg, strategy, quorum) + .with_context(Context::current_with_span(span)) + .await + } + + async fn try_write_many_sets_inner( + &self, + endpoint: &Arc>, + to_sets: &[Vec], + msg: N, + strategy: RequestStrategy, + quorum: usize, + ) -> Result, Error> + where + M: Rpc> + 'static, + N: IntoReq, + H: StreamingEndpointHandler + 'static, + S: Send + 'static, + { + let msg = msg.into_req().map_err(netapp::error::Error::from)?; + + let mut peers = HashMap::>::new(); + for (i, set) in to_sets.iter().enumerate() { + for peer in set.iter() { + peers.entry(*peer).or_default().push(i); + } + } + + let requests = peers.iter().map(|(peer, _)| { + let self2 = self.clone(); + let msg = msg.clone(); + let endpoint2 = endpoint.clone(); + let to = *peer; + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer.start(format!("RPC to {:?}", to)); + let fut = async move { (to, self2.call(&endpoint2, to, msg, strategy).await) }; + tokio::spawn(fut.with_context(Context::current_with_span(span))) + }); + let mut resp_stream = requests.collect::>(); + + let mut successes = vec![]; + let mut errors = vec![]; + + let mut set_counters = vec![(0, 0); to_sets.len()]; + + while !resp_stream.is_empty() { + let (node, resp) = resp_stream.next().await.unwrap().unwrap(); + + match resp { + Ok(msg) => { + for set in peers.get(&node).unwrap().iter() { + set_counters[*set].0 += 1; + } + successes.push(msg); + } + Err(e) => { + for set in peers.get(&node).unwrap().iter() { + set_counters[*set].1 += 1; + } + errors.push(e); + } + } + + if set_counters.iter().all(|x| x.0 > quorum) { + // Success + + // Continue all other requets in background + tokio::spawn(async move { + resp_stream.collect::>>().await; + }); + + return Ok(successes); + } + + if set_counters + .iter() + .enumerate() + .any(|(i, x)| x.1 + quorum > to_sets[i].len()) + { + // Too many errors in this set, we know we won't get a quorum + break; + } + } + + // Failure, could not get quorum + let errors = errors.iter().map(|e| format!("{}", e)).collect::>(); + Err(Error::Quorum(quorum, successes.len(), peers.len(), errors)) + } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 002cfbf4..ef788749 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -230,7 +230,7 @@ impl TableGc { .rpc_helper() .try_call_many( &self.endpoint, - &nodes[..], + &nodes, GcRpc::Update(updates), RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) @@ -251,7 +251,7 @@ impl TableGc { .rpc_helper() .try_call_many( &self.endpoint, - &nodes[..], + &nodes, GcRpc::DeleteIfEqualHash(deletes), RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) diff --git a/src/table/table.rs b/src/table/table.rs index bf08d5a0..c2efaeaf 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -119,17 +119,16 @@ impl Table { async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - // TODO: use write sets - let who = self.data.replication.storage_nodes(&hash); + let who = self.data.replication.write_sets(&hash); let e_enc = Arc::new(ByteBuf::from(e.encode()?)); let rpc = TableRpc::::Update(vec![e_enc]); self.system .rpc_helper() - .try_call_many( + .try_write_many_sets( &self.endpoint, - &who[..], + &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.data.replication.write_quorum()), @@ -243,11 +242,10 @@ impl Table { .rpc_helper() .try_call_many( &self.endpoint, - &who[..], + &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()) - .interrupt_after_quorum(true), + .with_quorum(self.data.replication.read_quorum()), ) .await?; @@ -339,11 +337,10 @@ impl Table { .rpc_helper() .try_call_many( &self.endpoint, - &who[..], + &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()) - .interrupt_after_quorum(true), + .with_quorum(self.data.replication.read_quorum()), ) .await?;