layout: refactoring and fix in layout helper

This commit is contained in:
Alex 2023-12-11 16:09:22 +01:00
parent adccce1145
commit 0041b013a4
Signed by: lx
GPG key ID: 0E496D15096376BE
7 changed files with 38 additions and 33 deletions

View file

@ -10,7 +10,7 @@ use super::*;
use crate::replication_mode::ReplicationMode; use crate::replication_mode::ReplicationMode;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
pub struct LayoutDigest { pub struct RpcLayoutDigest {
/// Cluster layout version /// Cluster layout version
pub current_version: u64, pub current_version: u64,
/// Number of active layout versions /// Number of active layout versions
@ -21,6 +21,13 @@ pub struct LayoutDigest {
pub staging_hash: Hash, pub staging_hash: Hash,
} }
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct SyncLayoutDigest {
current: u64,
ack_map_min: u64,
min_stored: u64,
}
pub struct LayoutHelper { pub struct LayoutHelper {
replication_mode: ReplicationMode, replication_mode: ReplicationMode,
layout: Option<LayoutHistory>, layout: Option<LayoutHistory>,
@ -150,20 +157,20 @@ impl LayoutHelper {
&self.all_nongateway_nodes &self.all_nongateway_nodes
} }
pub fn all_ack(&self) -> u64 { pub fn ack_map_min(&self) -> u64 {
self.ack_map_min self.ack_map_min
} }
pub fn all_sync(&self) -> u64 { pub fn sync_map_min(&self) -> u64 {
self.sync_map_min self.sync_map_min
} }
pub fn sync_versions(&self) -> (u64, u64, u64) { pub fn sync_digest(&self) -> SyncLayoutDigest {
( SyncLayoutDigest {
self.layout().current().version, current: self.layout().current().version,
self.all_ack(), ack_map_min: self.ack_map_min(),
self.layout().min_stored(), min_stored: self.layout().min_stored(),
) }
} }
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> { pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
@ -206,8 +213,8 @@ impl LayoutHelper {
self.staging_hash self.staging_hash
} }
pub fn digest(&self) -> LayoutDigest { pub fn digest(&self) -> RpcLayoutDigest {
LayoutDigest { RpcLayoutDigest {
current_version: self.current().version, current_version: self.current().version,
active_versions: self.versions.len(), active_versions: self.versions.len(),
trackers_hash: self.trackers_hash, trackers_hash: self.trackers_hash,
@ -231,13 +238,13 @@ impl LayoutHelper {
// 3. Acknowledge everyone has synced up to min(self.sync_map) // 3. Acknowledge everyone has synced up to min(self.sync_map)
self.sync_ack(local_node_id); self.sync_ack(local_node_id);
info!("ack_map: {:?}", self.update_trackers.ack_map); debug!("ack_map: {:?}", self.update_trackers.ack_map);
info!("sync_map: {:?}", self.update_trackers.sync_map); debug!("sync_map: {:?}", self.update_trackers.sync_map);
info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); debug!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
} }
fn sync_first(&mut self, local_node_id: Uuid) { fn sync_first(&mut self, local_node_id: Uuid) {
let first_version = self.versions.first().as_ref().unwrap().version; let first_version = self.min_stored();
self.update(|layout| { self.update(|layout| {
layout layout
.update_trackers .update_trackers
@ -275,13 +282,13 @@ impl LayoutHelper {
.versions .versions
.iter() .iter()
.map(|x| x.version) .map(|x| x.version)
.take_while(|v| { .skip_while(|v| {
self.ack_lock self.ack_lock
.get(v) .get(v)
.map(|x| x.load(Ordering::Relaxed) == 0) .map(|x| x.load(Ordering::Relaxed) == 0)
.unwrap_or(true) .unwrap_or(true)
}) })
.max() .next()
.unwrap_or(self.min_stored()) .unwrap_or(self.current().version)
} }
} }

View file

@ -256,7 +256,7 @@ impl LayoutManager {
// ---- RPC HANDLERS ---- // ---- RPC HANDLERS ----
pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutDigest) { pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &RpcLayoutDigest) {
let local = self.layout().digest(); let local = self.layout().digest();
if remote.current_version > local.current_version if remote.current_version > local.current_version
|| remote.active_versions != local.active_versions || remote.active_versions != local.active_versions

View file

@ -17,7 +17,7 @@ pub mod manager;
// ---- re-exports ---- // ---- re-exports ----
pub use helper::{LayoutDigest, LayoutHelper}; pub use helper::{LayoutHelper, RpcLayoutDigest, SyncLayoutDigest};
pub use manager::WriteLock; pub use manager::WriteLock;
pub use version::*; pub use version::*;

View file

@ -502,7 +502,7 @@ impl RpcHelper {
.rev() .rev()
.chain(layout.old_versions.iter().rev()); .chain(layout.old_versions.iter().rev());
for ver in ver_iter { for ver in ver_iter {
if ver.version > layout.all_sync() { if ver.version > layout.sync_map_min() {
continue; continue;
} }
let nodes = ver.nodes_of(position, ver.replication_factor); let nodes = ver.nodes_of(position, ver.replication_factor);

View file

@ -34,7 +34,7 @@ use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*; use crate::kubernetes::*;
use crate::layout::{ use crate::layout::{
self, manager::LayoutManager, LayoutDigest, LayoutHelper, LayoutHistory, NodeRoleV, self, manager::LayoutManager, LayoutHelper, LayoutHistory, NodeRoleV, RpcLayoutDigest,
}; };
use crate::replication_mode::*; use crate::replication_mode::*;
use crate::rpc_helper::*; use crate::rpc_helper::*;
@ -132,7 +132,7 @@ pub struct NodeStatus {
pub replication_factor: usize, pub replication_factor: usize,
/// Cluster layout digest /// Cluster layout digest
pub layout_digest: LayoutDigest, pub layout_digest: RpcLayoutDigest,
/// Disk usage on partition containing metadata directory (tuple: `(avail, total)`) /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
#[serde(default)] #[serde(default)]

View file

@ -54,7 +54,7 @@ impl TableReplication for TableShardedReplication {
fn sync_partitions(&self) -> SyncPartitions { fn sync_partitions(&self) -> SyncPartitions {
let layout = self.system.cluster_layout(); let layout = self.system.cluster_layout();
let layout_version = layout.all_ack(); let layout_version = layout.ack_map_min();
let mut partitions = layout let mut partitions = layout
.current() .current()

View file

@ -83,7 +83,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
bg.spawn_worker(SyncWorker { bg.spawn_worker(SyncWorker {
syncer: self.clone(), syncer: self.clone(),
layout_notify: self.system.layout_notify(), layout_notify: self.system.layout_notify(),
layout_versions: self.system.cluster_layout().sync_versions(), layout_digest: self.system.cluster_layout().sync_digest(),
add_full_sync_rx, add_full_sync_rx,
todo: None, todo: None,
next_full_sync: Instant::now() + Duration::from_secs(20), next_full_sync: Instant::now() + Duration::from_secs(20),
@ -483,7 +483,7 @@ struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>, syncer: Arc<TableSyncer<F, R>>,
layout_notify: Arc<Notify>, layout_notify: Arc<Notify>,
layout_versions: (u64, u64, u64), layout_digest: SyncLayoutDigest,
add_full_sync_rx: mpsc::UnboundedReceiver<()>, add_full_sync_rx: mpsc::UnboundedReceiver<()>,
next_full_sync: Instant, next_full_sync: Instant,
@ -493,15 +493,13 @@ struct SyncWorker<F: TableSchema, R: TableReplication> {
impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> { impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
fn check_add_full_sync(&mut self) { fn check_add_full_sync(&mut self) {
let layout_versions = self.syncer.system.cluster_layout().sync_versions(); let layout_digest = self.syncer.system.cluster_layout().sync_digest();
if layout_versions != self.layout_versions { if layout_digest != self.layout_digest {
self.layout_versions = layout_versions; self.layout_digest = layout_digest;
info!( info!(
"({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list", "({}) Layout versions changed ({:?}), adding full sync to syncer todo list",
F::TABLE_NAME, F::TABLE_NAME,
layout_versions.0, layout_digest,
layout_versions.1,
layout_versions.2
); );
self.add_full_sync(); self.add_full_sync();
} }