diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index c65831a21..174650190 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -352,6 +352,12 @@ impl AsRef for WriteLock { } } +impl AsMut for WriteLock { + fn as_mut(&mut self) -> &mut T { + &mut self.value + } +} + impl Drop for WriteLock { fn drop(&mut self) { let layout = self.layout_manager.layout(); // acquire read lock diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index f71f5ae74..c6dcbe75a 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -484,15 +484,10 @@ impl RpcHelper { // Peers may appear in many quorum sets. Here, build a list of peers, // mapping to the index of the quorum sets in which they appear. - let mut peers = HashMap::>::new(); - for (i, set) in to_sets.iter().enumerate() { - for peer in set.iter() { - peers.entry(*peer).or_default().push(i); - } - } + let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum); // Send one request to each peer of the quorum sets - let requests = peers.iter().map(|(peer, _)| { + let requests = result_tracker.nodes.iter().map(|(peer, _)| { let self2 = self.clone(); let msg = msg.clone(); let endpoint2 = endpoint.clone(); @@ -501,52 +496,25 @@ impl RpcHelper { }); let mut resp_stream = requests.collect::>(); - // Success and error responses will be collected in these two vectors - let mut successes = vec![]; - let mut errors = vec![]; - - // `set_counters` is used to keep track of how many success and error - // responses are received within each quorum set. When a node returns - // its response, it counts as a sucess/an error for all of the quorum - // sets which it is part of. - let mut set_counters = vec![(0, 0); to_sets.len()]; - // Drive requests to completion while let Some((node, resp)) = resp_stream.next().await { // Store the response in the correct vector and increment the // appropriate counters - match resp { - Ok(msg) => { - for set in peers.get(&node).unwrap().iter() { - set_counters[*set].0 += 1; - } - successes.push(msg); - } - Err(e) => { - for set in peers.get(&node).unwrap().iter() { - set_counters[*set].1 += 1; - } - errors.push(e); - } - } + result_tracker.register_result(node, resp); // If we have a quorum of ok in all quorum sets, then it's a success! - if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) { + if result_tracker.all_quorums_ok() { // Continue all other requets in background tokio::spawn(async move { resp_stream.collect::)>>().await; }); - return Ok(successes); + return Ok(result_tracker.success_values()); } // If there is a quorum set for which too many errors were received, // we know it's impossible to get a quorum, so return immediately. - if set_counters - .iter() - .enumerate() - .any(|(i, (_, err_cnt))| err_cnt + quorum > to_sets[i].len()) - { + if result_tracker.too_many_failures() { break; } } @@ -563,13 +531,104 @@ impl RpcHelper { // running request handler.) // Failure, could not get quorum - let errors = errors.iter().map(|e| format!("{}", e)).collect::>(); - Err(Error::Quorum( - quorum, - Some(to_sets.len()), - successes.len(), - peers.len(), - errors, - )) + Err(result_tracker.quorum_error()) + } +} + +// ------- utility for tracking successes/errors among write sets -------- + +pub struct QuorumSetResultTracker { + // The set of nodes and the quorum sets they belong to + pub nodes: HashMap>, + pub quorum: usize, + + // The success and error responses received + pub successes: Vec<(Uuid, S)>, + pub failures: Vec<(Uuid, E)>, + + // The counters for successes and failures in each set + pub success_counters: Box<[usize]>, + pub failure_counters: Box<[usize]>, + pub set_lens: Box<[usize]>, +} + +impl QuorumSetResultTracker { + pub fn new(sets: &[A], quorum: usize) -> Self + where + A: AsRef<[Uuid]>, + { + let mut nodes = HashMap::>::new(); + for (i, set) in sets.iter().enumerate() { + for node in set.as_ref().iter() { + nodes.entry(*node).or_default().push(i); + } + } + + let num_nodes = nodes.len(); + Self { + nodes, + quorum, + successes: Vec::with_capacity(num_nodes), + failures: vec![], + success_counters: vec![0; sets.len()].into_boxed_slice(), + failure_counters: vec![0; sets.len()].into_boxed_slice(), + set_lens: sets + .iter() + .map(|x| x.as_ref().len()) + .collect::>() + .into_boxed_slice(), + } + } + + pub fn register_result(&mut self, node: Uuid, result: Result) { + match result { + Ok(s) => { + self.successes.push((node, s)); + for set in self.nodes.get(&node).unwrap().iter() { + self.success_counters[*set] += 1; + } + } + Err(e) => { + self.failures.push((node, e)); + for set in self.nodes.get(&node).unwrap().iter() { + self.failure_counters[*set] += 1; + } + } + } + } + + pub fn all_quorums_ok(&self) -> bool { + self.success_counters + .iter() + .all(|ok_cnt| *ok_cnt >= self.quorum) + } + + pub fn too_many_failures(&self) -> bool { + self.failure_counters + .iter() + .zip(self.set_lens.iter()) + .any(|(err_cnt, set_len)| *err_cnt + self.quorum > *set_len) + } + + pub fn success_values(self) -> Vec { + self.successes + .into_iter() + .map(|(_, x)| x) + .collect::>() + } + + pub fn quorum_error(self) -> Error { + let errors = self + .failures + .iter() + .map(|(n, e)| format!("{:?}: {}", n, e)) + .collect::>(); + Error::Quorum( + self.quorum, + Some(self.set_lens.len()), + self.successes.len(), + self.nodes.len(), + errors, + ) } } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index a4e701bbe..db11ff5f1 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -3,7 +3,7 @@ use garage_util::data::*; /// Trait to describe how a table shall be replicated pub trait TableReplication: Send + Sync + 'static { - type WriteSets: AsRef>> + Send + Sync + 'static; + type WriteSets: AsRef>> + AsMut>> + Send + Sync + 'static; // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods diff --git a/src/table/table.rs b/src/table/table.rs index 7d1ff31c9..6508cf5df 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -20,6 +20,7 @@ use garage_util::error::Error; use garage_util::metrics::RecordDuration; use garage_util::migrate::Migrate; +use garage_rpc::rpc_helper::QuorumSetResultTracker; use garage_rpc::system::System; use garage_rpc::*; @@ -180,10 +181,6 @@ impl Table { // a quorum of nodes has answered OK, then the insert has succeeded and // consistency properties (read-after-write) are preserved. - // Some code here might feel redundant with RpcHelper::try_write_many_sets, - // but I think deduplicating could lead to more spaghetti instead of - // improving the readability, so I'm leaving as is. - let quorum = self.data.replication.write_quorum(); // Serialize all entries and compute the write sets for each of them. @@ -197,7 +194,10 @@ impl Table { for entry in entries.into_iter() { let entry = entry.borrow(); let hash = entry.partition_key().hash(); - let write_sets = self.data.replication.write_sets(&hash); + let mut write_sets = self.data.replication.write_sets(&hash); + for set in write_sets.as_mut().iter_mut() { + set.sort(); + } let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); entries_vec.push((write_sets, e_enc)); } @@ -212,12 +212,8 @@ impl Table { .collect::>(); write_sets.sort(); write_sets.dedup(); - let mut write_set_index = HashMap::<&Uuid, Vec>::new(); - for (i, write_set) in write_sets.iter().enumerate() { - for node in write_set.iter() { - write_set_index.entry(node).or_default().push(i); - } - } + + let mut result_tracker = QuorumSetResultTracker::new(&write_sets, quorum); // Build a map of all nodes to the entries that must be sent to that node. let mut call_list: HashMap> = HashMap::new(); @@ -230,7 +226,6 @@ impl Table { } // Build futures to actually perform each of the corresponding RPC calls - let call_count = call_list.len(); let call_futures = call_list.into_iter().map(|(node, entries)| { let this = self.clone(); let tracer = opentelemetry::global::tracer("garage"); @@ -254,27 +249,11 @@ impl Table { // Run all requests in parallel thanks to FuturesUnordered, and collect results. let mut resps = call_futures.collect::>(); - let mut set_counters = vec![(0, 0); write_sets.len()]; - let mut successes = 0; - let mut errors = vec![]; while let Some((node, resp)) = resps.next().await { - match resp { - Ok(_) => { - successes += 1; - for set in write_set_index.get(&node).unwrap().iter() { - set_counters[*set].0 += 1; - } - } - Err(e) => { - errors.push(e); - for set in write_set_index.get(&node).unwrap().iter() { - set_counters[*set].1 += 1; - } - } - } + result_tracker.register_result(node, resp.map(|_| ())); - if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) { + if result_tracker.all_quorums_ok() { // Success // Continue all other requests in background @@ -285,25 +264,14 @@ impl Table { return Ok(()); } - if set_counters - .iter() - .enumerate() - .any(|(i, (_, err_cnt))| err_cnt + quorum > write_sets[i].len()) - { + if result_tracker.too_many_failures() { // Too many errors in this set, we know we won't get a quorum break; } } // Failure, could not get quorum within at least one set - let errors = errors.iter().map(|e| format!("{}", e)).collect::>(); - Err(Error::Quorum( - quorum, - Some(write_sets.len()), - successes, - call_count, - errors, - )) + Err(result_tracker.quorum_error()) } pub async fn get(