rpc: refactor result tracking for quorum sets
This commit is contained in:
parent
c8356a91d9
commit
95eb13eb08
4 changed files with 123 additions and 90 deletions
|
@ -352,6 +352,12 @@ impl<T> AsRef<T> for WriteLock<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T> AsMut<T> for WriteLock<T> {
|
||||||
|
fn as_mut(&mut self) -> &mut T {
|
||||||
|
&mut self.value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<T> Drop for WriteLock<T> {
|
impl<T> Drop for WriteLock<T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let layout = self.layout_manager.layout(); // acquire read lock
|
let layout = self.layout_manager.layout(); // acquire read lock
|
||||||
|
|
|
@ -484,15 +484,10 @@ impl RpcHelper {
|
||||||
|
|
||||||
// Peers may appear in many quorum sets. Here, build a list of peers,
|
// Peers may appear in many quorum sets. Here, build a list of peers,
|
||||||
// mapping to the index of the quorum sets in which they appear.
|
// mapping to the index of the quorum sets in which they appear.
|
||||||
let mut peers = HashMap::<Uuid, Vec<usize>>::new();
|
let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum);
|
||||||
for (i, set) in to_sets.iter().enumerate() {
|
|
||||||
for peer in set.iter() {
|
|
||||||
peers.entry(*peer).or_default().push(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send one request to each peer of the quorum sets
|
// Send one request to each peer of the quorum sets
|
||||||
let requests = peers.iter().map(|(peer, _)| {
|
let requests = result_tracker.nodes.iter().map(|(peer, _)| {
|
||||||
let self2 = self.clone();
|
let self2 = self.clone();
|
||||||
let msg = msg.clone();
|
let msg = msg.clone();
|
||||||
let endpoint2 = endpoint.clone();
|
let endpoint2 = endpoint.clone();
|
||||||
|
@ -501,52 +496,25 @@ impl RpcHelper {
|
||||||
});
|
});
|
||||||
let mut resp_stream = requests.collect::<FuturesUnordered<_>>();
|
let mut resp_stream = requests.collect::<FuturesUnordered<_>>();
|
||||||
|
|
||||||
// Success and error responses will be collected in these two vectors
|
|
||||||
let mut successes = vec![];
|
|
||||||
let mut errors = vec![];
|
|
||||||
|
|
||||||
// `set_counters` is used to keep track of how many success and error
|
|
||||||
// responses are received within each quorum set. When a node returns
|
|
||||||
// its response, it counts as a sucess/an error for all of the quorum
|
|
||||||
// sets which it is part of.
|
|
||||||
let mut set_counters = vec![(0, 0); to_sets.len()];
|
|
||||||
|
|
||||||
// Drive requests to completion
|
// Drive requests to completion
|
||||||
while let Some((node, resp)) = resp_stream.next().await {
|
while let Some((node, resp)) = resp_stream.next().await {
|
||||||
// Store the response in the correct vector and increment the
|
// Store the response in the correct vector and increment the
|
||||||
// appropriate counters
|
// appropriate counters
|
||||||
match resp {
|
result_tracker.register_result(node, resp);
|
||||||
Ok(msg) => {
|
|
||||||
for set in peers.get(&node).unwrap().iter() {
|
|
||||||
set_counters[*set].0 += 1;
|
|
||||||
}
|
|
||||||
successes.push(msg);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
for set in peers.get(&node).unwrap().iter() {
|
|
||||||
set_counters[*set].1 += 1;
|
|
||||||
}
|
|
||||||
errors.push(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we have a quorum of ok in all quorum sets, then it's a success!
|
// If we have a quorum of ok in all quorum sets, then it's a success!
|
||||||
if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) {
|
if result_tracker.all_quorums_ok() {
|
||||||
// Continue all other requets in background
|
// Continue all other requets in background
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await;
|
resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await;
|
||||||
});
|
});
|
||||||
|
|
||||||
return Ok(successes);
|
return Ok(result_tracker.success_values());
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there is a quorum set for which too many errors were received,
|
// If there is a quorum set for which too many errors were received,
|
||||||
// we know it's impossible to get a quorum, so return immediately.
|
// we know it's impossible to get a quorum, so return immediately.
|
||||||
if set_counters
|
if result_tracker.too_many_failures() {
|
||||||
.iter()
|
|
||||||
.enumerate()
|
|
||||||
.any(|(i, (_, err_cnt))| err_cnt + quorum > to_sets[i].len())
|
|
||||||
{
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -563,13 +531,104 @@ impl RpcHelper {
|
||||||
// running request handler.)
|
// running request handler.)
|
||||||
|
|
||||||
// Failure, could not get quorum
|
// Failure, could not get quorum
|
||||||
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
|
Err(result_tracker.quorum_error())
|
||||||
Err(Error::Quorum(
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------- utility for tracking successes/errors among write sets --------
|
||||||
|
|
||||||
|
pub struct QuorumSetResultTracker<S, E> {
|
||||||
|
// The set of nodes and the quorum sets they belong to
|
||||||
|
pub nodes: HashMap<Uuid, Vec<usize>>,
|
||||||
|
pub quorum: usize,
|
||||||
|
|
||||||
|
// The success and error responses received
|
||||||
|
pub successes: Vec<(Uuid, S)>,
|
||||||
|
pub failures: Vec<(Uuid, E)>,
|
||||||
|
|
||||||
|
// The counters for successes and failures in each set
|
||||||
|
pub success_counters: Box<[usize]>,
|
||||||
|
pub failure_counters: Box<[usize]>,
|
||||||
|
pub set_lens: Box<[usize]>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, E: std::fmt::Display> QuorumSetResultTracker<S, E> {
|
||||||
|
pub fn new<A>(sets: &[A], quorum: usize) -> Self
|
||||||
|
where
|
||||||
|
A: AsRef<[Uuid]>,
|
||||||
|
{
|
||||||
|
let mut nodes = HashMap::<Uuid, Vec<usize>>::new();
|
||||||
|
for (i, set) in sets.iter().enumerate() {
|
||||||
|
for node in set.as_ref().iter() {
|
||||||
|
nodes.entry(*node).or_default().push(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let num_nodes = nodes.len();
|
||||||
|
Self {
|
||||||
|
nodes,
|
||||||
quorum,
|
quorum,
|
||||||
Some(to_sets.len()),
|
successes: Vec::with_capacity(num_nodes),
|
||||||
successes.len(),
|
failures: vec![],
|
||||||
peers.len(),
|
success_counters: vec![0; sets.len()].into_boxed_slice(),
|
||||||
|
failure_counters: vec![0; sets.len()].into_boxed_slice(),
|
||||||
|
set_lens: sets
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.as_ref().len())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.into_boxed_slice(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_result(&mut self, node: Uuid, result: Result<S, E>) {
|
||||||
|
match result {
|
||||||
|
Ok(s) => {
|
||||||
|
self.successes.push((node, s));
|
||||||
|
for set in self.nodes.get(&node).unwrap().iter() {
|
||||||
|
self.success_counters[*set] += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
self.failures.push((node, e));
|
||||||
|
for set in self.nodes.get(&node).unwrap().iter() {
|
||||||
|
self.failure_counters[*set] += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn all_quorums_ok(&self) -> bool {
|
||||||
|
self.success_counters
|
||||||
|
.iter()
|
||||||
|
.all(|ok_cnt| *ok_cnt >= self.quorum)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn too_many_failures(&self) -> bool {
|
||||||
|
self.failure_counters
|
||||||
|
.iter()
|
||||||
|
.zip(self.set_lens.iter())
|
||||||
|
.any(|(err_cnt, set_len)| *err_cnt + self.quorum > *set_len)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn success_values(self) -> Vec<S> {
|
||||||
|
self.successes
|
||||||
|
.into_iter()
|
||||||
|
.map(|(_, x)| x)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn quorum_error(self) -> Error {
|
||||||
|
let errors = self
|
||||||
|
.failures
|
||||||
|
.iter()
|
||||||
|
.map(|(n, e)| format!("{:?}: {}", n, e))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
Error::Quorum(
|
||||||
|
self.quorum,
|
||||||
|
Some(self.set_lens.len()),
|
||||||
|
self.successes.len(),
|
||||||
|
self.nodes.len(),
|
||||||
errors,
|
errors,
|
||||||
))
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ use garage_util::data::*;
|
||||||
|
|
||||||
/// Trait to describe how a table shall be replicated
|
/// Trait to describe how a table shall be replicated
|
||||||
pub trait TableReplication: Send + Sync + 'static {
|
pub trait TableReplication: Send + Sync + 'static {
|
||||||
type WriteSets: AsRef<Vec<Vec<Uuid>>> + Send + Sync + 'static;
|
type WriteSets: AsRef<Vec<Vec<Uuid>>> + AsMut<Vec<Vec<Uuid>>> + Send + Sync + 'static;
|
||||||
|
|
||||||
// See examples in table_sharded.rs and table_fullcopy.rs
|
// See examples in table_sharded.rs and table_fullcopy.rs
|
||||||
// To understand various replication methods
|
// To understand various replication methods
|
||||||
|
|
|
@ -20,6 +20,7 @@ use garage_util::error::Error;
|
||||||
use garage_util::metrics::RecordDuration;
|
use garage_util::metrics::RecordDuration;
|
||||||
use garage_util::migrate::Migrate;
|
use garage_util::migrate::Migrate;
|
||||||
|
|
||||||
|
use garage_rpc::rpc_helper::QuorumSetResultTracker;
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
|
||||||
|
@ -180,10 +181,6 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
// a quorum of nodes has answered OK, then the insert has succeeded and
|
// a quorum of nodes has answered OK, then the insert has succeeded and
|
||||||
// consistency properties (read-after-write) are preserved.
|
// consistency properties (read-after-write) are preserved.
|
||||||
|
|
||||||
// Some code here might feel redundant with RpcHelper::try_write_many_sets,
|
|
||||||
// but I think deduplicating could lead to more spaghetti instead of
|
|
||||||
// improving the readability, so I'm leaving as is.
|
|
||||||
|
|
||||||
let quorum = self.data.replication.write_quorum();
|
let quorum = self.data.replication.write_quorum();
|
||||||
|
|
||||||
// Serialize all entries and compute the write sets for each of them.
|
// Serialize all entries and compute the write sets for each of them.
|
||||||
|
@ -197,7 +194,10 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
for entry in entries.into_iter() {
|
for entry in entries.into_iter() {
|
||||||
let entry = entry.borrow();
|
let entry = entry.borrow();
|
||||||
let hash = entry.partition_key().hash();
|
let hash = entry.partition_key().hash();
|
||||||
let write_sets = self.data.replication.write_sets(&hash);
|
let mut write_sets = self.data.replication.write_sets(&hash);
|
||||||
|
for set in write_sets.as_mut().iter_mut() {
|
||||||
|
set.sort();
|
||||||
|
}
|
||||||
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
|
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
|
||||||
entries_vec.push((write_sets, e_enc));
|
entries_vec.push((write_sets, e_enc));
|
||||||
}
|
}
|
||||||
|
@ -212,12 +212,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
.collect::<Vec<&[Uuid]>>();
|
.collect::<Vec<&[Uuid]>>();
|
||||||
write_sets.sort();
|
write_sets.sort();
|
||||||
write_sets.dedup();
|
write_sets.dedup();
|
||||||
let mut write_set_index = HashMap::<&Uuid, Vec<usize>>::new();
|
|
||||||
for (i, write_set) in write_sets.iter().enumerate() {
|
let mut result_tracker = QuorumSetResultTracker::new(&write_sets, quorum);
|
||||||
for node in write_set.iter() {
|
|
||||||
write_set_index.entry(node).or_default().push(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build a map of all nodes to the entries that must be sent to that node.
|
// Build a map of all nodes to the entries that must be sent to that node.
|
||||||
let mut call_list: HashMap<Uuid, Vec<_>> = HashMap::new();
|
let mut call_list: HashMap<Uuid, Vec<_>> = HashMap::new();
|
||||||
|
@ -230,7 +226,6 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build futures to actually perform each of the corresponding RPC calls
|
// Build futures to actually perform each of the corresponding RPC calls
|
||||||
let call_count = call_list.len();
|
|
||||||
let call_futures = call_list.into_iter().map(|(node, entries)| {
|
let call_futures = call_list.into_iter().map(|(node, entries)| {
|
||||||
let this = self.clone();
|
let this = self.clone();
|
||||||
let tracer = opentelemetry::global::tracer("garage");
|
let tracer = opentelemetry::global::tracer("garage");
|
||||||
|
@ -254,27 +249,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
|
|
||||||
// Run all requests in parallel thanks to FuturesUnordered, and collect results.
|
// Run all requests in parallel thanks to FuturesUnordered, and collect results.
|
||||||
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
|
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
|
||||||
let mut set_counters = vec![(0, 0); write_sets.len()];
|
|
||||||
let mut successes = 0;
|
|
||||||
let mut errors = vec![];
|
|
||||||
|
|
||||||
while let Some((node, resp)) = resps.next().await {
|
while let Some((node, resp)) = resps.next().await {
|
||||||
match resp {
|
result_tracker.register_result(node, resp.map(|_| ()));
|
||||||
Ok(_) => {
|
|
||||||
successes += 1;
|
|
||||||
for set in write_set_index.get(&node).unwrap().iter() {
|
|
||||||
set_counters[*set].0 += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
errors.push(e);
|
|
||||||
for set in write_set_index.get(&node).unwrap().iter() {
|
|
||||||
set_counters[*set].1 += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if set_counters.iter().all(|(ok_cnt, _)| *ok_cnt >= quorum) {
|
if result_tracker.all_quorums_ok() {
|
||||||
// Success
|
// Success
|
||||||
|
|
||||||
// Continue all other requests in background
|
// Continue all other requests in background
|
||||||
|
@ -285,25 +264,14 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
if set_counters
|
if result_tracker.too_many_failures() {
|
||||||
.iter()
|
|
||||||
.enumerate()
|
|
||||||
.any(|(i, (_, err_cnt))| err_cnt + quorum > write_sets[i].len())
|
|
||||||
{
|
|
||||||
// Too many errors in this set, we know we won't get a quorum
|
// Too many errors in this set, we know we won't get a quorum
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Failure, could not get quorum within at least one set
|
// Failure, could not get quorum within at least one set
|
||||||
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
|
Err(result_tracker.quorum_error())
|
||||||
Err(Error::Quorum(
|
|
||||||
quorum,
|
|
||||||
Some(write_sets.len()),
|
|
||||||
successes,
|
|
||||||
call_count,
|
|
||||||
errors,
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get(
|
pub async fn get(
|
||||||
|
|
Loading…
Reference in a new issue