fullcopy replication: quorum reads and writes
All checks were successful
ci/woodpecker/push/debug Pipeline was successful

This commit is contained in:
Alex 2025-03-25 11:40:09 +01:00
parent 2f2a96b51d
commit 34baade499

View file

@ -27,26 +27,49 @@ impl TableReplication for TableFullReplication {
type WriteSets = Vec<Vec<Uuid>>;
fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
let layout = self.system.cluster_layout();
layout.current().all_nodes().to_vec()
self.system.cluster_layout().all_nodes().to_vec()
}
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
self.system
.cluster_layout()
.read_version()
.all_nodes()
.to_vec()
}
fn read_quorum(&self) -> usize {
1
let layout = self.system.cluster_layout();
let nodes = layout.read_version().all_nodes();
nodes.len().div_euclid(2) + 1
}
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
vec![self.storage_nodes(hash)]
fn write_sets(&self, _hash: &Hash) -> Self::WriteSets {
self.system
.cluster_layout()
.versions()
.iter()
.map(|ver| ver.all_nodes().to_vec())
.collect()
}
fn write_quorum(&self) -> usize {
let nmembers = self.system.cluster_layout().current().all_nodes().len();
if nmembers < 3 {
1
let layout = self.system.cluster_layout();
let min_len = layout
.versions()
.iter()
.map(|x| x.all_nodes().len())
.min()
.unwrap();
let max_quorum = layout
.versions()
.iter()
.map(|x| x.all_nodes().len().div_euclid(2) + 1)
.max()
.unwrap();
if min_len < max_quorum {
warn!("Write quorum will not be respected for TableFullReplication operations due to multiple active layout versions with vastly different number of nodes");
min_len
} else {
nmembers.div_euclid(2) + 1
max_quorum
}
}
@ -56,15 +79,18 @@ impl TableReplication for TableFullReplication {
fn sync_partitions(&self) -> SyncPartitions {
let layout = self.system.cluster_layout();
let layout_version = layout.current().version;
let layout_version = layout.ack_map_min();
let partitions = vec![SyncPartition {
partition: 0u16,
first_hash: [0u8; 32].into(),
last_hash: [0xff; 32].into(),
storage_sets: self.write_sets(&[0u8; 32].into()),
}];
SyncPartitions {
layout_version,
partitions: vec![SyncPartition {
partition: 0u16,
first_hash: [0u8; 32].into(),
last_hash: [0xff; 32].into(),
storage_sets: vec![layout.current().all_nodes().to_vec()],
}],
partitions,
}
}
}