table sync: use write quorums to report global success or failure of sync

This commit is contained in:
Alex 2023-12-07 11:16:10 +01:00
parent 95eb13eb08
commit d90de365b3
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
6 changed files with 36 additions and 28 deletions

View file

@ -180,7 +180,7 @@ impl LayoutHelper {
ret ret
} }
pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> { pub fn storage_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.layout() self.layout()
.versions .versions
.iter() .iter()

View file

@ -139,7 +139,7 @@ impl LayoutManager {
pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> { pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> {
let layout = self.layout(); let layout = self.layout();
let version = layout.current().version; let version = layout.current().version;
let nodes = layout.write_sets_of(position); let nodes = layout.storage_sets_of(position);
layout layout
.ack_lock .ack_lock
.get(&version) .get(&version)

View file

@ -1,4 +1,3 @@
use std::iter::FromIterator;
use std::sync::Arc; use std::sync::Arc;
use garage_rpc::layout::*; use garage_rpc::layout::*;
@ -69,7 +68,7 @@ impl TableReplication for TableFullReplication {
partition: 0u16, partition: 0u16,
first_hash: [0u8; 32].into(), first_hash: [0u8; 32].into(),
last_hash: [0xff; 32].into(), last_hash: [0xff; 32].into(),
storage_nodes: Vec::from_iter(layout.current().all_nodes().to_vec()), storage_sets: vec![layout.current().all_nodes().to_vec()],
}], }],
} }
} }

View file

@ -40,5 +40,5 @@ pub struct SyncPartition {
pub partition: Partition, pub partition: Partition,
pub first_hash: Hash, pub first_hash: Hash,
pub last_hash: Hash, pub last_hash: Hash,
pub storage_nodes: Vec<Uuid>, pub storage_sets: Vec<Vec<Uuid>>,
} }

View file

@ -60,12 +60,12 @@ impl TableReplication for TableShardedReplication {
.current() .current()
.partitions() .partitions()
.map(|(partition, first_hash)| { .map(|(partition, first_hash)| {
let storage_nodes = layout.storage_nodes_of(&first_hash); let storage_sets = layout.storage_sets_of(&first_hash);
SyncPartition { SyncPartition {
partition, partition,
first_hash, first_hash,
last_hash: [0u8; 32].into(), // filled in just after last_hash: [0u8; 32].into(), // filled in just after
storage_nodes, storage_sets,
} }
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();

View file

@ -18,6 +18,7 @@ use garage_util::encode::{debug_serialize, nonversioned_encode};
use garage_util::error::{Error, OkOrMessage}; use garage_util::error::{Error, OkOrMessage};
use garage_rpc::layout::*; use garage_rpc::layout::*;
use garage_rpc::rpc_helper::QuorumSetResultTracker;
use garage_rpc::system::System; use garage_rpc::system::System;
use garage_rpc::*; use garage_rpc::*;
@ -106,44 +107,52 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
must_exit: &mut watch::Receiver<bool>, must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let my_id = self.system.id; let my_id = self.system.id;
let retain = partition.storage_nodes.contains(&my_id); let retain = partition.storage_sets.iter().any(|x| x.contains(&my_id));
if retain { if retain {
debug!( debug!(
"({}) Syncing {:?} with {:?}...", "({}) Syncing {:?} with {:?}...",
F::TABLE_NAME, F::TABLE_NAME,
partition, partition,
partition.storage_nodes partition.storage_sets
); );
let mut sync_futures = partition let mut result_tracker = QuorumSetResultTracker::new(
.storage_nodes &partition.storage_sets,
self.data.replication.write_quorum(),
);
let mut sync_futures = result_tracker
.nodes
.iter() .iter()
.filter(|node| **node != my_id) .map(|(node, _)| *node)
.map(|node| { .map(|node| {
self.clone() let must_exit = must_exit.clone();
.do_sync_with(&partition, *node, must_exit.clone()) async move {
if node == my_id {
(node, Ok(()))
} else {
(node, self.do_sync_with(&partition, node, must_exit).await)
}
}
}) })
.collect::<FuturesUnordered<_>>(); .collect::<FuturesUnordered<_>>();
let mut n_errors = 0; while let Some((node, res)) = sync_futures.next().await {
while let Some(r) = sync_futures.next().await { if let Err(e) = &res {
if let Err(e) = r { warn!("({}) Sync error with {:?}: {}", F::TABLE_NAME, node, e);
n_errors += 1;
warn!("({}) Sync error: {}", F::TABLE_NAME, e);
} }
result_tracker.register_result(node, res);
} }
if n_errors > 0 {
return Err(Error::Message(format!( if result_tracker.too_many_failures() {
"Sync failed with {} nodes.", return Err(result_tracker.quorum_error());
n_errors } else {
))); Ok(())
} }
} else { } else {
self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit) self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit)
.await?; .await
} }
Ok(())
} }
// Offload partition: this partition is not something we are storing, // Offload partition: this partition is not something we are storing,
@ -264,7 +273,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
} }
async fn do_sync_with( async fn do_sync_with(
self: Arc<Self>, self: &Arc<Self>,
partition: &SyncPartition, partition: &SyncPartition,
who: Uuid, who: Uuid,
must_exit: watch::Receiver<bool>, must_exit: watch::Receiver<bool>,