layout & replication mode refactoring
All checks were successful
ci/woodpecker/push/debug Pipeline was successful

This commit is contained in:
Alex 2025-03-25 11:05:00 +01:00
parent c9156f6828
commit 2f2a96b51d
7 changed files with 55 additions and 61 deletions

View file

@ -149,14 +149,27 @@ impl LayoutHelper {
self.layout.as_ref().unwrap() self.layout.as_ref().unwrap()
} }
/// Returns the current layout version
pub fn current(&self) -> &LayoutVersion { pub fn current(&self) -> &LayoutVersion {
self.inner().current() self.inner().current()
} }
/// Returns all layout versions currently active in the cluster
pub fn versions(&self) -> &[LayoutVersion] { pub fn versions(&self) -> &[LayoutVersion] {
&self.inner().versions &self.inner().versions
} }
/// Returns the latest layout version for which it is safe to read data from,
/// i.e. the version whose version number is sync_map_min
pub fn read_version(&self) -> &LayoutVersion {
let sync_min = self.sync_map_min;
self.versions()
.iter()
.find(|x| x.version == sync_min)
.or(self.versions().last())
.unwrap()
}
pub fn is_check_ok(&self) -> bool { pub fn is_check_ok(&self) -> bool {
self.is_check_ok self.is_check_ok
} }
@ -173,6 +186,16 @@ impl LayoutHelper {
&self.all_nongateway_nodes &self.all_nongateway_nodes
} }
/// Returns the set of nodes for storing this hash in the current layout version.
///
/// Used by the block maanger only: data blocks are immutable, so we don't have
/// to coordinate between old set of nodes and new set of nodes when layout changes.
/// As soon as the layout change is effective, blocks can be moved to the new
/// set of nodes.
pub fn current_storage_nodes_of(&self, hash: &Hash) -> Vec<Uuid> {
self.current().nodes_of(hash).collect()
}
pub fn ack_map_min(&self) -> u64 { pub fn ack_map_min(&self) -> u64 {
self.ack_map_min self.ack_map_min
} }
@ -181,6 +204,8 @@ impl LayoutHelper {
self.sync_map_min self.sync_map_min
} }
// ---- helpers for layout synchronization ----
pub fn sync_digest(&self) -> SyncLayoutDigest { pub fn sync_digest(&self) -> SyncLayoutDigest {
SyncLayoutDigest { SyncLayoutDigest {
current: self.current().version, current: self.current().version,
@ -189,50 +214,7 @@ impl LayoutHelper {
} }
} }
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> { pub(crate) fn digest(&self) -> RpcLayoutDigest {
let sync_min = self.sync_map_min;
let version = self
.versions()
.iter()
.find(|x| x.version == sync_min)
.or(self.versions().last())
.unwrap();
version
.nodes_of(position, version.replication_factor)
.collect()
}
pub fn storage_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.versions()
.iter()
.map(|x| x.nodes_of(position, x.replication_factor).collect())
.collect()
}
pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let mut ret = vec![];
for version in self.versions().iter() {
ret.extend(version.nodes_of(position, version.replication_factor));
}
ret.sort();
ret.dedup();
ret
}
pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let ver = self.current();
ver.nodes_of(position, ver.replication_factor).collect()
}
pub fn trackers_hash(&self) -> Hash {
self.trackers_hash
}
pub fn staging_hash(&self) -> Hash {
self.staging_hash
}
pub fn digest(&self) -> RpcLayoutDigest {
RpcLayoutDigest { RpcLayoutDigest {
current_version: self.current().version, current_version: self.current().version,
active_versions: self.versions().len(), active_versions: self.versions().len(),

View file

@ -180,9 +180,7 @@ impl LayoutHistory {
// Determine set of nodes for partition p in layout version v. // Determine set of nodes for partition p in layout version v.
// Sort the node set to avoid duplicate computations. // Sort the node set to avoid duplicate computations.
let mut set = v let mut set = v.nodes_of(&p_hash).collect::<Vec<Uuid>>();
.nodes_of(&p_hash, v.replication_factor)
.collect::<Vec<Uuid>>();
set.sort(); set.sort();
// If this set was already processed, skip it. // If this set was already processed, skip it.

View file

@ -143,10 +143,14 @@ impl LayoutManager {
// ---- ACK LOCKING ---- // ---- ACK LOCKING ----
pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> { pub fn write_sets_of(self: &Arc<Self>, hash: &Hash) -> WriteLock<Vec<Vec<Uuid>>> {
let layout = self.layout(); let layout = self.layout();
let version = layout.current().version; let version = layout.current().version;
let nodes = layout.storage_sets_of(position); let nodes = layout
.versions()
.iter()
.map(|x| x.nodes_of(hash).collect())
.collect();
layout layout
.ack_lock .ack_lock
.get(&version) .get(&version)

View file

@ -114,9 +114,7 @@ impl LayoutVersion {
} }
/// Return the n servers in which data for this hash should be replicated /// Return the n servers in which data for this hash should be replicated
pub fn nodes_of(&self, position: &Hash, n: usize) -> impl Iterator<Item = Uuid> + '_ { pub fn nodes_of(&self, position: &Hash) -> impl Iterator<Item = Uuid> + '_ {
assert_eq!(n, self.replication_factor);
let data = &self.ring_assignment_data; let data = &self.ring_assignment_data;
let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) { let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) {

View file

@ -573,7 +573,7 @@ impl RpcHelper {
// Compute, for each layout version, the set of nodes that might store // Compute, for each layout version, the set of nodes that might store
// the block, and put them in their preferred order as of `request_order`. // the block, and put them in their preferred order as of `request_order`.
let mut vernodes = layout.versions().iter().map(|ver| { let mut vernodes = layout.versions().iter().map(|ver| {
let nodes = ver.nodes_of(position, ver.replication_factor); let nodes = ver.nodes_of(position);
rpc_helper.request_order(layout.current(), nodes) rpc_helper.request_order(layout.current(), nodes)
}); });
@ -607,7 +607,7 @@ impl RpcHelper {
// Second step: add nodes of older layout versions // Second step: add nodes of older layout versions
let old_ver_iter = layout.inner().old_versions.iter().rev(); let old_ver_iter = layout.inner().old_versions.iter().rev();
for ver in old_ver_iter { for ver in old_ver_iter {
let nodes = ver.nodes_of(position, ver.replication_factor); let nodes = ver.nodes_of(position);
for node in rpc_helper.request_order(layout.current(), nodes) { for node in rpc_helper.request_order(layout.current(), nodes) {
if !ret.contains(&node) { if !ret.contains(&node) {
ret.push(node); ret.push(node);

View file

@ -475,10 +475,7 @@ impl System {
let mut partitions_quorum = 0; let mut partitions_quorum = 0;
let mut partitions_all_ok = 0; let mut partitions_all_ok = 0;
for (_, hash) in partitions.iter() { for (_, hash) in partitions.iter() {
let mut write_sets = layout let mut write_sets = layout.versions().iter().map(|x| x.nodes_of(hash));
.versions()
.iter()
.map(|x| x.nodes_of(hash, x.replication_factor));
let has_quorum = write_sets let has_quorum = write_sets
.clone() .clone()
.all(|set| set.filter(|x| node_up(x)).count() >= quorum); .all(|set| set.filter(|x| node_up(x)).count() >= quorum);

View file

@ -28,11 +28,22 @@ impl TableReplication for TableShardedReplication {
type WriteSets = WriteLock<Vec<Vec<Uuid>>>; type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> { fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
self.system.cluster_layout().storage_nodes_of(hash) let layout = self.system.cluster_layout();
let mut ret = vec![];
for version in layout.versions().iter() {
ret.extend(version.nodes_of(hash));
}
ret.sort();
ret.dedup();
ret
} }
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> { fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
self.system.cluster_layout().read_nodes_of(hash) self.system
.cluster_layout()
.read_version()
.nodes_of(hash)
.collect()
} }
fn read_quorum(&self) -> usize { fn read_quorum(&self) -> usize {
self.read_quorum self.read_quorum
@ -57,7 +68,11 @@ impl TableReplication for TableShardedReplication {
.current() .current()
.partitions() .partitions()
.map(|(partition, first_hash)| { .map(|(partition, first_hash)| {
let storage_sets = layout.storage_sets_of(&first_hash); let storage_sets = layout
.versions()
.iter()
.map(|x| x.nodes_of(&first_hash).collect())
.collect();
SyncPartition { SyncPartition {
partition, partition,
first_hash, first_hash,