From ce89d1ddabe3b9e638b0173949726522ae9a0311 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 11 Nov 2023 12:08:32 +0100 Subject: [PATCH] table sync: adapt to new layout history --- src/rpc/layout/history.rs | 21 +++- src/rpc/layout/manager.rs | 1 + src/rpc/layout/version.rs | 16 ++- src/rpc/system.rs | 2 +- src/table/replication/fullcopy.rs | 25 +++- src/table/replication/parameters.rs | 19 ++- src/table/replication/sharded.rs | 39 +++++- src/table/sync.rs | 178 +++++++++++----------------- 8 files changed, 172 insertions(+), 129 deletions(-) diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index dbb02269..185dbb27 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -47,11 +47,19 @@ impl LayoutHistory { // ------------------ who stores what now? --------------- - pub fn max_ack(&self) -> u64 { + pub fn all_ack(&self) -> u64 { self.calculate_global_min(&self.update_trackers.ack_map) } - pub fn all_storage_nodes(&self) -> HashSet { + pub fn min_stored(&self) -> u64 { + self.versions.first().as_ref().unwrap().version + } + + pub fn sync_versions(&self) -> (u64, u64, u64) { + (self.current().version, self.all_ack(), self.min_stored()) + } + + pub fn all_nongateway_nodes(&self) -> HashSet { // TODO: cache this self.versions .iter() @@ -71,11 +79,10 @@ impl LayoutHistory { version.nodes_of(position, version.replication_factor) } - pub fn write_sets_of(&self, position: &Hash) -> Vec> { + pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator> + 'a { self.versions .iter() - .map(|x| x.nodes_of(position, x.replication_factor)) - .collect::>() + .map(move |x| x.nodes_of(position, x.replication_factor)) } // ------------------ update tracking --------------- @@ -129,7 +136,9 @@ impl LayoutHistory { } pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 { - let storage_nodes = self.all_storage_nodes(); + // TODO: for TableFullReplication, counting gateway nodes might be + // necessary? Think about this more. + let storage_nodes = self.all_nongateway_nodes(); storage_nodes .iter() .map(|x| tracker.0.get(x).copied().unwrap_or(0)) diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index b0302b12..7d60bae6 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -92,6 +92,7 @@ impl LayoutManager { persist_cluster_layout, layout, change_notify, + table_sync_version: Mutex::new(HashMap::new()), system_endpoint, rpc_helper, })) diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 8133672a..f45a3c35 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -98,15 +98,13 @@ impl LayoutVersion { } /// Get the list of partitions and the first hash of a partition key that would fall in it - pub fn partitions(&self) -> Vec<(Partition, Hash)> { - (0..(1 << PARTITION_BITS)) - .map(|i| { - let top = (i as u16) << (16 - PARTITION_BITS); - let mut location = [0u8; 32]; - location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]); - (i as u16, Hash::from(location)) - }) - .collect::>() + pub fn partitions(&self) -> impl Iterator + '_ { + (0..(1 << PARTITION_BITS)).map(|i| { + let top = (i as u16) << (16 - PARTITION_BITS); + let mut location = [0u8; 32]; + location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]); + (i as u16, Hash::from(location)) + }) } /// Return the n servers in which data for this hash should be replicated diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 6ce13d0d..3418600b 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -442,7 +442,7 @@ impl System { .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .count(); - let partitions = layout.current().partitions(); + let partitions = layout.current().partitions().collect::>(); let partitions_n_up = partitions .iter() .map(|(_, h)| { diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index a5c83d0f..5653a229 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -1,3 +1,4 @@ +use std::iter::FromIterator; use std::sync::Arc; use garage_rpc::layout::*; @@ -6,10 +7,17 @@ use garage_util::data::*; use crate::replication::*; +// TODO: find a way to track layout changes for this as well +// The hard thing is that this data is stored also on gateway nodes, +// whereas sharded data is stored only on non-Gateway nodes (storage nodes) +// Also we want to be more tolerant to failures of gateways so we don't +// want to do too much holding back of data when progress of gateway +// nodes is not reported in the layout history's ack/sync/sync_ack maps. + /// Full replication schema: all nodes store everything -/// Writes are disseminated in an epidemic manner in the network /// Advantage: do all reads locally, extremely fast /// Inconvenient: only suitable to reasonably small tables +/// Inconvenient: if some writes fail, nodes will read outdated data #[derive(Clone)] pub struct TableFullReplication { /// The membership manager of this node @@ -44,7 +52,18 @@ impl TableReplication for TableFullReplication { fn partition_of(&self, _hash: &Hash) -> Partition { 0u16 } - fn partitions(&self) -> Vec<(Partition, Hash)> { - vec![(0u16, [0u8; 32].into())] + + fn sync_partitions(&self) -> SyncPartitions { + let layout = self.system.cluster_layout(); + let layout_version = layout.current().version; + SyncPartitions { + layout_version, + partitions: vec![SyncPartition { + partition: 0u16, + first_hash: [0u8; 32].into(), + last_hash: [0xff; 32].into(), + storage_nodes: Vec::from_iter(layout.current().node_ids().to_vec()), + }], + } } } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 19b306f2..2a7d3585 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -20,6 +20,21 @@ pub trait TableReplication: Send + Sync + 'static { // Accessing partitions, for Merkle tree & sync /// Get partition for data with given hash fn partition_of(&self, hash: &Hash) -> Partition; - /// List of existing partitions - fn partitions(&self) -> Vec<(Partition, Hash)>; + + /// List of partitions and nodes to sync with in current layout + fn sync_partitions(&self) -> SyncPartitions; +} + +#[derive(Debug)] +pub struct SyncPartitions { + pub layout_version: u64, + pub partitions: Vec, +} + +#[derive(Debug)] +pub struct SyncPartition { + pub partition: Partition, + pub first_hash: Hash, + pub last_hash: Hash, + pub storage_nodes: Vec, } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 793d87fd..f02b1d66 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -51,7 +51,42 @@ impl TableReplication for TableShardedReplication { fn partition_of(&self, hash: &Hash) -> Partition { self.system.cluster_layout().current().partition_of(hash) } - fn partitions(&self) -> Vec<(Partition, Hash)> { - self.system.cluster_layout().current().partitions() + + fn sync_partitions(&self) -> SyncPartitions { + let layout = self.system.cluster_layout(); + let layout_version = layout.all_ack(); + + let mut partitions = layout + .current() + .partitions() + .map(|(partition, first_hash)| { + let mut storage_nodes = layout + .write_sets_of(&first_hash) + .map(|x| x.into_iter()) + .flatten() + .collect::>(); + storage_nodes.sort(); + storage_nodes.dedup(); + SyncPartition { + partition, + first_hash, + last_hash: [0u8; 32].into(), // filled in just after + storage_nodes, + } + }) + .collect::>(); + + for i in 0..partitions.len() { + partitions[i].last_hash = if i + 1 < partitions.len() { + partitions[i + 1].first_hash + } else { + [0xFFu8; 32].into() + }; + } + + SyncPartitions { + layout_version, + partitions, + } } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 4355bd9e..43636faa 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -6,7 +6,7 @@ use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures_util::stream::*; use opentelemetry::KeyValue; -use rand::Rng; +use rand::prelude::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use tokio::select; @@ -52,16 +52,6 @@ impl Rpc for SyncRpc { type Response = Result; } -#[derive(Debug, Clone)] -struct TodoPartition { - partition: Partition, - begin: Hash, - end: Hash, - - // Are we a node that stores this partition or not? - retain: bool, -} - impl TableSyncer { pub(crate) fn new( system: Arc, @@ -92,9 +82,9 @@ impl TableSyncer { bg.spawn_worker(SyncWorker { syncer: self.clone(), layout_notify: self.system.layout_notify(), - layout_version: self.system.cluster_layout().current().version, + layout_versions: self.system.cluster_layout().sync_versions(), add_full_sync_rx, - todo: vec![], + todo: None, next_full_sync: Instant::now() + Duration::from_secs(20), }); } @@ -112,31 +102,26 @@ impl TableSyncer { async fn sync_partition( self: &Arc, - partition: &TodoPartition, + partition: &SyncPartition, must_exit: &mut watch::Receiver, ) -> Result<(), Error> { - if partition.retain { - let my_id = self.system.id; - - let nodes = self - .data - .replication - .write_nodes(&partition.begin) - .into_iter() - .filter(|node| *node != my_id) - .collect::>(); + let my_id = self.system.id; + let retain = partition.storage_nodes.contains(&my_id); + if retain { debug!( "({}) Syncing {:?} with {:?}...", F::TABLE_NAME, partition, - nodes + partition.storage_nodes ); - let mut sync_futures = nodes + let mut sync_futures = partition + .storage_nodes .iter() + .filter(|node| **node != my_id) .map(|node| { self.clone() - .do_sync_with(partition.clone(), *node, must_exit.clone()) + .do_sync_with(&partition, *node, must_exit.clone()) }) .collect::>(); @@ -147,14 +132,14 @@ impl TableSyncer { warn!("({}) Sync error: {}", F::TABLE_NAME, e); } } - if n_errors > self.data.replication.max_write_errors() { + if n_errors > 0 { return Err(Error::Message(format!( - "Sync failed with too many nodes (should have been: {:?}).", - nodes + "Sync failed with {} nodes.", + n_errors ))); } } else { - self.offload_partition(&partition.begin, &partition.end, must_exit) + self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit) .await?; } @@ -285,7 +270,7 @@ impl TableSyncer { async fn do_sync_with( self: Arc, - partition: TodoPartition, + partition: &SyncPartition, who: Uuid, must_exit: watch::Receiver, ) -> Result<(), Error> { @@ -492,76 +477,23 @@ impl EndpointHandler for TableSync struct SyncWorker { syncer: Arc>, + layout_notify: Arc, - layout_version: u64, + layout_versions: (u64, u64, u64), + add_full_sync_rx: mpsc::UnboundedReceiver<()>, - todo: Vec, next_full_sync: Instant, + + todo: Option, } impl SyncWorker { fn add_full_sync(&mut self) { - let system = &self.syncer.system; - let data = &self.syncer.data; - - let my_id = system.id; - - self.todo.clear(); - - let partitions = data.replication.partitions(); - - for i in 0..partitions.len() { - let begin = partitions[i].1; - - let end = if i + 1 < partitions.len() { - partitions[i + 1].1 - } else { - [0xFFu8; 32].into() - }; - - let nodes = data.replication.write_nodes(&begin); - - let retain = nodes.contains(&my_id); - if !retain { - // Check if we have some data to send, otherwise skip - match data.store.range(begin..end) { - Ok(mut iter) => { - if iter.next().is_none() { - continue; - } - } - Err(e) => { - warn!("DB error in add_full_sync: {}", e); - continue; - } - } - } - - self.todo.push(TodoPartition { - partition: partitions[i].0, - begin, - end, - retain, - }); - } - + let mut partitions = self.syncer.data.replication.sync_partitions(); + partitions.partitions.shuffle(&mut thread_rng()); + self.todo = Some(partitions); self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; } - - fn pop_task(&mut self) -> Option { - if self.todo.is_empty() { - return None; - } - - let i = rand::thread_rng().gen_range(0..self.todo.len()); - if i == self.todo.len() - 1 { - self.todo.pop() - } else { - let replacement = self.todo.pop().unwrap(); - let ret = std::mem::replace(&mut self.todo[i], replacement); - Some(ret) - } - } } #[async_trait] @@ -572,18 +504,46 @@ impl Worker for SyncWorker { fn status(&self) -> WorkerStatus { WorkerStatus { - queue_length: Some(self.todo.len() as u64), + queue_length: Some(self.todo.as_ref().map(|x| x.partitions.len()).unwrap_or(0) as u64), ..Default::default() } } async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result { - if let Some(partition) = self.pop_task() { - self.syncer.sync_partition(&partition, must_exit).await?; - Ok(WorkerState::Busy) - } else { - Ok(WorkerState::Idle) + if let Some(todo) = &mut self.todo { + let partition = todo.partitions.pop().unwrap(); + + // process partition + if let Err(e) = self.syncer.sync_partition(&partition, must_exit).await { + error!( + "{}: Failed to sync partition {:?}: {}", + F::TABLE_NAME, + partition, + e + ); + // if error, put partition back at the other side of the queue, + // so that other partitions will be tried in the meantime + todo.partitions.insert(0, partition); + // TODO: returning an error here will cause the background job worker + // to delay this task for some time, but maybe we don't want to + // delay it if there are lots of failures from nodes that are gone + // (we also don't want zero delays as that will cause lots of useless retries) + return Err(e); + } + + // done + if !todo.partitions.is_empty() { + return Ok(WorkerState::Busy); + } + + self.syncer + .system + .layout_manager + .sync_table_until(F::TABLE_NAME, todo.layout_version); } + + self.todo = None; + Ok(WorkerState::Idle) } async fn wait_for_work(&mut self) -> WorkerState { @@ -594,10 +554,16 @@ impl Worker for SyncWorker { } }, _ = self.layout_notify.notified() => { - let new_version = self.syncer.system.cluster_layout().current().version; - if new_version > self.layout_version { - self.layout_version = new_version; - debug!("({}) Layout changed, adding full sync to syncer todo list", F::TABLE_NAME); + let layout_versions = self.syncer.system.cluster_layout().sync_versions(); + if layout_versions != self.layout_versions { + self.layout_versions = layout_versions; + debug!( + "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list", + F::TABLE_NAME, + layout_versions.0, + layout_versions.1, + layout_versions.2 + ); self.add_full_sync(); } }, @@ -605,9 +571,9 @@ impl Worker for SyncWorker { self.add_full_sync(); } } - match self.todo.is_empty() { - false => WorkerState::Busy, - true => WorkerState::Idle, + match self.todo.is_some() { + true => WorkerState::Busy, + false => WorkerState::Idle, } } }