use a WriteLock for write operations on fullcopy tables
All checks were successful
ci/woodpecker/push/debug Pipeline was successful
ci/woodpecker/pr/debug Pipeline was successful

This commit is contained in:
Alex 2025-03-25 13:18:14 +01:00
parent 8ba6454e21
commit 514eb29874
4 changed files with 27 additions and 21 deletions

View file

@ -196,6 +196,19 @@ impl LayoutHelper {
self.current().nodes_of(hash).collect() self.current().nodes_of(hash).collect()
} }
/// For a given hash, or for all cluster if no hash is given,
/// return for each layout version the set of nodes that writes should be sent to
/// and for which a quorum of OK responses should be awaited.
pub fn write_sets_of(&self, hash: Option<&Hash>) -> Vec<Vec<Uuid>> {
self.versions()
.iter()
.map(|x| match hash {
Some(h) => x.nodes_of(h).collect(),
None => x.all_nodes().to_vec(),
})
.collect()
}
pub fn ack_map_min(&self) -> u64 { pub fn ack_map_min(&self) -> u64 {
self.ack_map_min self.ack_map_min
} }

View file

@ -143,20 +143,19 @@ impl LayoutManager {
// ---- ACK LOCKING ---- // ---- ACK LOCKING ----
pub fn write_sets_of(self: &Arc<Self>, hash: &Hash) -> WriteLock<Vec<Vec<Uuid>>> { pub fn write_lock_with<T, F>(self: &Arc<Self>, f: F) -> WriteLock<T>
where
F: FnOnce(&LayoutHelper) -> T,
{
let layout = self.layout(); let layout = self.layout();
let version = layout.current().version; let version = layout.current().version;
let nodes = layout let value = f(&layout);
.versions()
.iter()
.map(|x| x.nodes_of(hash).collect())
.collect();
layout layout
.ack_lock .ack_lock
.get(&version) .get(&version)
.unwrap() .unwrap()
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
WriteLock::new(version, self, nodes) WriteLock::new(version, self, value)
} }
// ---- INTERNALS --- // ---- INTERNALS ---

View file

@ -25,7 +25,7 @@ pub struct TableFullReplication {
} }
impl TableReplication for TableFullReplication { impl TableReplication for TableFullReplication {
type WriteSets = Vec<Vec<Uuid>>; type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
// Do anti-entropy every 10 seconds. // Do anti-entropy every 10 seconds.
// Compared to sharded tables, anti-entropy is much less costly as there is // Compared to sharded tables, anti-entropy is much less costly as there is
@ -52,11 +52,8 @@ impl TableReplication for TableFullReplication {
fn write_sets(&self, _hash: &Hash) -> Self::WriteSets { fn write_sets(&self, _hash: &Hash) -> Self::WriteSets {
self.system self.system
.cluster_layout() .layout_manager
.versions() .write_lock_with(|l| l.write_sets_of(None))
.iter()
.map(|ver| ver.all_nodes().to_vec())
.collect()
} }
fn write_quorum(&self) -> usize { fn write_quorum(&self) -> usize {
let layout = self.system.cluster_layout(); let layout = self.system.cluster_layout();
@ -92,7 +89,7 @@ impl TableReplication for TableFullReplication {
partition: 0u16, partition: 0u16,
first_hash: [0u8; 32].into(), first_hash: [0u8; 32].into(),
last_hash: [0xff; 32].into(), last_hash: [0xff; 32].into(),
storage_sets: self.write_sets(&[0u8; 32].into()), storage_sets: layout.write_sets_of(None),
}]; }];
SyncPartitions { SyncPartitions {

View file

@ -54,7 +54,9 @@ impl TableReplication for TableShardedReplication {
} }
fn write_sets(&self, hash: &Hash) -> Self::WriteSets { fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
self.system.layout_manager.write_sets_of(hash) self.system
.layout_manager
.write_lock_with(|l| l.write_sets_of(Some(hash)))
} }
fn write_quorum(&self) -> usize { fn write_quorum(&self) -> usize {
self.write_quorum self.write_quorum
@ -72,16 +74,11 @@ impl TableReplication for TableShardedReplication {
.current() .current()
.partitions() .partitions()
.map(|(partition, first_hash)| { .map(|(partition, first_hash)| {
let storage_sets = layout
.versions()
.iter()
.map(|x| x.nodes_of(&first_hash).collect())
.collect();
SyncPartition { SyncPartition {
partition, partition,
first_hash, first_hash,
last_hash: [0u8; 32].into(), // filled in just after last_hash: [0u8; 32].into(), // filled in just after
storage_sets, storage_sets: layout.write_sets_of(Some(&first_hash)),
} }
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();