relocalize logic for write_sets
Some checks failed
ci/woodpecker/push/debug Pipeline failed
ci/woodpecker/pr/debug Pipeline failed

This commit is contained in:
Alex 2025-03-25 16:35:56 +01:00
parent 514eb29874
commit d25e631a4a
3 changed files with 20 additions and 19 deletions

View file

@ -196,19 +196,6 @@ impl LayoutHelper {
self.current().nodes_of(hash).collect() self.current().nodes_of(hash).collect()
} }
/// For a given hash, or for all cluster if no hash is given,
/// return for each layout version the set of nodes that writes should be sent to
/// and for which a quorum of OK responses should be awaited.
pub fn write_sets_of(&self, hash: Option<&Hash>) -> Vec<Vec<Uuid>> {
self.versions()
.iter()
.map(|x| match hash {
Some(h) => x.nodes_of(h).collect(),
None => x.all_nodes().to_vec(),
})
.collect()
}
pub fn ack_map_min(&self) -> u64 { pub fn ack_map_min(&self) -> u64 {
self.ack_map_min self.ack_map_min
} }

View file

@ -51,9 +51,7 @@ impl TableReplication for TableFullReplication {
} }
fn write_sets(&self, _hash: &Hash) -> Self::WriteSets { fn write_sets(&self, _hash: &Hash) -> Self::WriteSets {
self.system self.system.layout_manager.write_lock_with(write_sets)
.layout_manager
.write_lock_with(|l| l.write_sets_of(None))
} }
fn write_quorum(&self) -> usize { fn write_quorum(&self) -> usize {
let layout = self.system.cluster_layout(); let layout = self.system.cluster_layout();
@ -89,7 +87,7 @@ impl TableReplication for TableFullReplication {
partition: 0u16, partition: 0u16,
first_hash: [0u8; 32].into(), first_hash: [0u8; 32].into(),
last_hash: [0xff; 32].into(), last_hash: [0xff; 32].into(),
storage_sets: layout.write_sets_of(None), storage_sets: write_sets(&layout),
}]; }];
SyncPartitions { SyncPartitions {
@ -98,3 +96,11 @@ impl TableReplication for TableFullReplication {
} }
} }
} }
fn write_sets(layout: &LayoutHelper) -> Vec<Vec<Uuid>> {
layout
.versions()
.iter()
.map(|x| x.all_nodes().to_vec())
.collect()
}

View file

@ -56,7 +56,7 @@ impl TableReplication for TableShardedReplication {
fn write_sets(&self, hash: &Hash) -> Self::WriteSets { fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
self.system self.system
.layout_manager .layout_manager
.write_lock_with(|l| l.write_sets_of(Some(hash))) .write_lock_with(|l| write_sets(l, hash))
} }
fn write_quorum(&self) -> usize { fn write_quorum(&self) -> usize {
self.write_quorum self.write_quorum
@ -78,7 +78,7 @@ impl TableReplication for TableShardedReplication {
partition, partition,
first_hash, first_hash,
last_hash: [0u8; 32].into(), // filled in just after last_hash: [0u8; 32].into(), // filled in just after
storage_sets: layout.write_sets_of(Some(&first_hash)), storage_sets: write_sets(&layout, &first_hash),
} }
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -97,3 +97,11 @@ impl TableReplication for TableShardedReplication {
} }
} }
} }
fn write_sets(layout: &LayoutHelper, hash: &Hash) -> Vec<Vec<Uuid>> {
layout
.versions()
.iter()
.map(|x| x.nodes_of(hash).collect())
.collect()
}