From 2f2a96b51d4061224caeec434273aabb4208f9d6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 25 Mar 2025 11:05:00 +0100 Subject: [PATCH] layout & replication mode refactoring --- src/rpc/layout/helper.rs | 70 ++++++++++++-------------------- src/rpc/layout/history.rs | 4 +- src/rpc/layout/manager.rs | 8 +++- src/rpc/layout/version.rs | 4 +- src/rpc/rpc_helper.rs | 4 +- src/rpc/system.rs | 5 +-- src/table/replication/sharded.rs | 21 ++++++++-- 7 files changed, 55 insertions(+), 61 deletions(-) diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index c08a5629..482a2eea 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -149,14 +149,27 @@ impl LayoutHelper { self.layout.as_ref().unwrap() } + /// Returns the current layout version pub fn current(&self) -> &LayoutVersion { self.inner().current() } + /// Returns all layout versions currently active in the cluster pub fn versions(&self) -> &[LayoutVersion] { &self.inner().versions } + /// Returns the latest layout version for which it is safe to read data from, + /// i.e. the version whose version number is sync_map_min + pub fn read_version(&self) -> &LayoutVersion { + let sync_min = self.sync_map_min; + self.versions() + .iter() + .find(|x| x.version == sync_min) + .or(self.versions().last()) + .unwrap() + } + pub fn is_check_ok(&self) -> bool { self.is_check_ok } @@ -173,6 +186,16 @@ impl LayoutHelper { &self.all_nongateway_nodes } + /// Returns the set of nodes for storing this hash in the current layout version. + /// + /// Used by the block maanger only: data blocks are immutable, so we don't have + /// to coordinate between old set of nodes and new set of nodes when layout changes. + /// As soon as the layout change is effective, blocks can be moved to the new + /// set of nodes. + pub fn current_storage_nodes_of(&self, hash: &Hash) -> Vec { + self.current().nodes_of(hash).collect() + } + pub fn ack_map_min(&self) -> u64 { self.ack_map_min } @@ -181,6 +204,8 @@ impl LayoutHelper { self.sync_map_min } + // ---- helpers for layout synchronization ---- + pub fn sync_digest(&self) -> SyncLayoutDigest { SyncLayoutDigest { current: self.current().version, @@ -189,50 +214,7 @@ impl LayoutHelper { } } - pub fn read_nodes_of(&self, position: &Hash) -> Vec { - let sync_min = self.sync_map_min; - let version = self - .versions() - .iter() - .find(|x| x.version == sync_min) - .or(self.versions().last()) - .unwrap(); - version - .nodes_of(position, version.replication_factor) - .collect() - } - - pub fn storage_sets_of(&self, position: &Hash) -> Vec> { - self.versions() - .iter() - .map(|x| x.nodes_of(position, x.replication_factor).collect()) - .collect() - } - - pub fn storage_nodes_of(&self, position: &Hash) -> Vec { - let mut ret = vec![]; - for version in self.versions().iter() { - ret.extend(version.nodes_of(position, version.replication_factor)); - } - ret.sort(); - ret.dedup(); - ret - } - - pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec { - let ver = self.current(); - ver.nodes_of(position, ver.replication_factor).collect() - } - - pub fn trackers_hash(&self) -> Hash { - self.trackers_hash - } - - pub fn staging_hash(&self) -> Hash { - self.staging_hash - } - - pub fn digest(&self) -> RpcLayoutDigest { + pub(crate) fn digest(&self) -> RpcLayoutDigest { RpcLayoutDigest { current_version: self.current().version, active_versions: self.versions().len(), diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 16c32fb2..1e6bc84b 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -180,9 +180,7 @@ impl LayoutHistory { // Determine set of nodes for partition p in layout version v. // Sort the node set to avoid duplicate computations. - let mut set = v - .nodes_of(&p_hash, v.replication_factor) - .collect::>(); + let mut set = v.nodes_of(&p_hash).collect::>(); set.sort(); // If this set was already processed, skip it. diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 21907ec7..55b67a27 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -143,10 +143,14 @@ impl LayoutManager { // ---- ACK LOCKING ---- - pub fn write_sets_of(self: &Arc, position: &Hash) -> WriteLock>> { + pub fn write_sets_of(self: &Arc, hash: &Hash) -> WriteLock>> { let layout = self.layout(); let version = layout.current().version; - let nodes = layout.storage_sets_of(position); + let nodes = layout + .versions() + .iter() + .map(|x| x.nodes_of(hash).collect()) + .collect(); layout .ack_lock .get(&version) diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 90a51de7..fdcccc46 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -114,9 +114,7 @@ impl LayoutVersion { } /// Return the n servers in which data for this hash should be replicated - pub fn nodes_of(&self, position: &Hash, n: usize) -> impl Iterator + '_ { - assert_eq!(n, self.replication_factor); - + pub fn nodes_of(&self, position: &Hash) -> impl Iterator + '_ { let data = &self.ring_assignment_data; let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) { diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 2505c2ce..87fff5d6 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -573,7 +573,7 @@ impl RpcHelper { // Compute, for each layout version, the set of nodes that might store // the block, and put them in their preferred order as of `request_order`. let mut vernodes = layout.versions().iter().map(|ver| { - let nodes = ver.nodes_of(position, ver.replication_factor); + let nodes = ver.nodes_of(position); rpc_helper.request_order(layout.current(), nodes) }); @@ -607,7 +607,7 @@ impl RpcHelper { // Second step: add nodes of older layout versions let old_ver_iter = layout.inner().old_versions.iter().rev(); for ver in old_ver_iter { - let nodes = ver.nodes_of(position, ver.replication_factor); + let nodes = ver.nodes_of(position); for node in rpc_helper.request_order(layout.current(), nodes) { if !ret.contains(&node) { ret.push(node); diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 198a5f6b..800b37f3 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -475,10 +475,7 @@ impl System { let mut partitions_quorum = 0; let mut partitions_all_ok = 0; for (_, hash) in partitions.iter() { - let mut write_sets = layout - .versions() - .iter() - .map(|x| x.nodes_of(hash, x.replication_factor)); + let mut write_sets = layout.versions().iter().map(|x| x.nodes_of(hash)); let has_quorum = write_sets .clone() .all(|set| set.filter(|x| node_up(x)).count() >= quorum); diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index e0245949..17a848fe 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -28,11 +28,22 @@ impl TableReplication for TableShardedReplication { type WriteSets = WriteLock>>; fn storage_nodes(&self, hash: &Hash) -> Vec { - self.system.cluster_layout().storage_nodes_of(hash) + let layout = self.system.cluster_layout(); + let mut ret = vec![]; + for version in layout.versions().iter() { + ret.extend(version.nodes_of(hash)); + } + ret.sort(); + ret.dedup(); + ret } fn read_nodes(&self, hash: &Hash) -> Vec { - self.system.cluster_layout().read_nodes_of(hash) + self.system + .cluster_layout() + .read_version() + .nodes_of(hash) + .collect() } fn read_quorum(&self) -> usize { self.read_quorum @@ -57,7 +68,11 @@ impl TableReplication for TableShardedReplication { .current() .partitions() .map(|(partition, first_hash)| { - let storage_sets = layout.storage_sets_of(&first_hash); + let storage_sets = layout + .versions() + .iter() + .map(|x| x.nodes_of(&first_hash).collect()) + .collect(); SyncPartition { partition, first_hash,