From 514eb298744d680949cbb6819ddaf0f4bad56098 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 25 Mar 2025 13:18:14 +0100 Subject: [PATCH] use a WriteLock for write operations on fullcopy tables --- src/rpc/layout/helper.rs | 13 +++++++++++++ src/rpc/layout/manager.rs | 13 ++++++------- src/table/replication/fullcopy.rs | 11 ++++------- src/table/replication/sharded.rs | 11 ++++------- 4 files changed, 27 insertions(+), 21 deletions(-) diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 482a2eea..6614ac22 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -196,6 +196,19 @@ impl LayoutHelper { self.current().nodes_of(hash).collect() } + /// For a given hash, or for all cluster if no hash is given, + /// return for each layout version the set of nodes that writes should be sent to + /// and for which a quorum of OK responses should be awaited. + pub fn write_sets_of(&self, hash: Option<&Hash>) -> Vec> { + self.versions() + .iter() + .map(|x| match hash { + Some(h) => x.nodes_of(h).collect(), + None => x.all_nodes().to_vec(), + }) + .collect() + } + pub fn ack_map_min(&self) -> u64 { self.ack_map_min } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 55b67a27..0c75742b 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -143,20 +143,19 @@ impl LayoutManager { // ---- ACK LOCKING ---- - pub fn write_sets_of(self: &Arc, hash: &Hash) -> WriteLock>> { + pub fn write_lock_with(self: &Arc, f: F) -> WriteLock + where + F: FnOnce(&LayoutHelper) -> T, + { let layout = self.layout(); let version = layout.current().version; - let nodes = layout - .versions() - .iter() - .map(|x| x.nodes_of(hash).collect()) - .collect(); + let value = f(&layout); layout .ack_lock .get(&version) .unwrap() .fetch_add(1, Ordering::Relaxed); - WriteLock::new(version, self, nodes) + WriteLock::new(version, self, value) } // ---- INTERNALS --- diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 63ca5181..cdb7361f 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -25,7 +25,7 @@ pub struct TableFullReplication { } impl TableReplication for TableFullReplication { - type WriteSets = Vec>; + type WriteSets = WriteLock>>; // Do anti-entropy every 10 seconds. // Compared to sharded tables, anti-entropy is much less costly as there is @@ -52,11 +52,8 @@ impl TableReplication for TableFullReplication { fn write_sets(&self, _hash: &Hash) -> Self::WriteSets { self.system - .cluster_layout() - .versions() - .iter() - .map(|ver| ver.all_nodes().to_vec()) - .collect() + .layout_manager + .write_lock_with(|l| l.write_sets_of(None)) } fn write_quorum(&self) -> usize { let layout = self.system.cluster_layout(); @@ -92,7 +89,7 @@ impl TableReplication for TableFullReplication { partition: 0u16, first_hash: [0u8; 32].into(), last_hash: [0xff; 32].into(), - storage_sets: self.write_sets(&[0u8; 32].into()), + storage_sets: layout.write_sets_of(None), }]; SyncPartitions { diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index e1041174..4f73b277 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -54,7 +54,9 @@ impl TableReplication for TableShardedReplication { } fn write_sets(&self, hash: &Hash) -> Self::WriteSets { - self.system.layout_manager.write_sets_of(hash) + self.system + .layout_manager + .write_lock_with(|l| l.write_sets_of(Some(hash))) } fn write_quorum(&self) -> usize { self.write_quorum @@ -72,16 +74,11 @@ impl TableReplication for TableShardedReplication { .current() .partitions() .map(|(partition, first_hash)| { - let storage_sets = layout - .versions() - .iter() - .map(|x| x.nodes_of(&first_hash).collect()) - .collect(); SyncPartition { partition, first_hash, last_hash: [0u8; 32].into(), // filled in just after - storage_sets, + storage_sets: layout.write_sets_of(Some(&first_hash)), } }) .collect::>();