layout: refactor digests and add "!=" assertions before epidemic bcast

This commit is contained in:
Alex 2023-11-16 13:51:40 +01:00
parent ad5c6f779f
commit 707442f5de
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
5 changed files with 45 additions and 38 deletions

View file

@ -2,10 +2,24 @@ use std::collections::HashMap;
use std::ops::Deref; use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use serde::{Deserialize, Serialize};
use garage_util::data::*; use garage_util::data::*;
use super::schema::*; use super::schema::*;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
pub struct LayoutDigest {
/// Cluster layout version
pub current_version: u64,
/// Number of active layout versions
pub active_versions: usize,
/// Hash of cluster layout update trackers
pub trackers_hash: Hash,
/// Hash of cluster layout staging data
pub staging_hash: Hash,
}
pub struct LayoutHelper { pub struct LayoutHelper {
layout: Option<LayoutHistory>, layout: Option<LayoutHistory>,
@ -16,8 +30,8 @@ pub struct LayoutHelper {
all_nodes: Vec<Uuid>, all_nodes: Vec<Uuid>,
all_nongateway_nodes: Vec<Uuid>, all_nongateway_nodes: Vec<Uuid>,
pub(crate) trackers_hash: Hash, trackers_hash: Hash,
pub(crate) staging_hash: Hash, staging_hash: Hash,
// ack lock: counts in-progress write operations for each // ack lock: counts in-progress write operations for each
// layout version ; we don't increase the ack update tracker // layout version ; we don't increase the ack update tracker
@ -152,6 +166,15 @@ impl LayoutHelper {
self.staging_hash self.staging_hash
} }
pub fn digest(&self) -> LayoutDigest {
LayoutDigest {
current_version: self.current().version,
active_versions: self.versions.len(),
trackers_hash: self.trackers_hash,
staging_hash: self.staging_hash,
}
}
// ------------------ helpers for update tracking --------------- // ------------------ helpers for update tracking ---------------
pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) {

View file

@ -5,7 +5,6 @@ use garage_util::data::*;
use garage_util::encode::nonversioned_encode; use garage_util::encode::nonversioned_encode;
use garage_util::error::*; use garage_util::error::*;
use super::schema::*;
use super::*; use super::*;
impl LayoutHistory { impl LayoutHistory {

View file

@ -2,8 +2,6 @@ use std::collections::HashMap;
use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard}; use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard};
use std::time::Duration; use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::sync::Notify; use tokio::sync::Notify;
use netapp::endpoint::Endpoint; use netapp::endpoint::Endpoint;
@ -33,16 +31,6 @@ pub struct LayoutManager {
system_endpoint: Arc<Endpoint<SystemRpc, System>>, system_endpoint: Arc<Endpoint<SystemRpc, System>>,
} }
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LayoutStatus {
/// Cluster layout version
pub cluster_layout_version: u64,
/// Hash of cluster layout update trackers
pub cluster_layout_trackers_hash: Hash,
/// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash,
}
impl LayoutManager { impl LayoutManager {
pub fn new( pub fn new(
config: &Config, config: &Config,
@ -105,15 +93,6 @@ impl LayoutManager {
self.layout.read().unwrap() self.layout.read().unwrap()
} }
pub fn status(&self) -> LayoutStatus {
let layout = self.layout();
LayoutStatus {
cluster_layout_version: layout.current().version,
cluster_layout_trackers_hash: layout.trackers_hash(),
cluster_layout_staging_hash: layout.staging_hash(),
}
}
pub async fn update_cluster_layout( pub async fn update_cluster_layout(
self: &Arc<Self>, self: &Arc<Self>,
layout: &LayoutHistory, layout: &LayoutHistory,
@ -173,6 +152,7 @@ impl LayoutManager {
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> { fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
let mut layout = self.layout.write().unwrap(); let mut layout = self.layout.write().unwrap();
let prev_digest = layout.digest();
let prev_layout_check = layout.check().is_ok(); let prev_layout_check = layout.check().is_ok();
if !prev_layout_check || adv.check().is_ok() { if !prev_layout_check || adv.check().is_ok() {
@ -181,6 +161,7 @@ impl LayoutManager {
if prev_layout_check && layout.check().is_err() { if prev_layout_check && layout.check().is_err() {
panic!("Merged two correct layouts and got an incorrect layout."); panic!("Merged two correct layouts and got an incorrect layout.");
} }
assert!(layout.digest() != prev_digest);
return Some(layout.clone()); return Some(layout.clone());
} }
} }
@ -190,10 +171,12 @@ impl LayoutManager {
fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> { fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> {
let mut layout = self.layout.write().unwrap(); let mut layout = self.layout.write().unwrap();
let prev_digest = layout.digest();
if layout.update_trackers != *adv { if layout.update_trackers != *adv {
if layout.update(|l| l.update_trackers.merge(adv)) { if layout.update(|l| l.update_trackers.merge(adv)) {
layout.update_trackers(self.node_id); layout.update_trackers(self.node_id);
assert!(layout.digest() != prev_digest);
return Some(layout.update_trackers.clone()); return Some(layout.update_trackers.clone());
} }
} }
@ -269,16 +252,17 @@ impl LayoutManager {
// ---- RPC HANDLERS ---- // ---- RPC HANDLERS ----
pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutStatus) { pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutDigest) {
let local = self.status(); let local = self.layout().digest();
if remote.cluster_layout_version > local.cluster_layout_version if remote.current_version > local.current_version
|| remote.cluster_layout_staging_hash != local.cluster_layout_staging_hash || remote.active_versions != local.active_versions
|| remote.staging_hash != local.staging_hash
{ {
tokio::spawn({ tokio::spawn({
let this = self.clone(); let this = self.clone();
async move { this.pull_cluster_layout(from).await } async move { this.pull_cluster_layout(from).await }
}); });
} else if remote.cluster_layout_trackers_hash != local.cluster_layout_trackers_hash { } else if remote.trackers_hash != local.trackers_hash {
tokio::spawn({ tokio::spawn({
let this = self.clone(); let this = self.clone();
async move { this.pull_cluster_layout_trackers(from).await } async move { this.pull_cluster_layout_trackers(from).await }

View file

@ -11,7 +11,7 @@ pub mod manager;
// ---- re-exports ---- // ---- re-exports ----
pub use helper::LayoutHelper; pub use helper::{LayoutDigest, LayoutHelper};
pub use manager::WriteLock; pub use manager::WriteLock;
pub use schema::*; pub use schema::*;
pub use version::*; pub use version::*;

View file

@ -33,8 +33,9 @@ use garage_util::time::*;
use crate::consul::ConsulDiscovery; use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*; use crate::kubernetes::*;
use crate::layout::manager::{LayoutManager, LayoutStatus}; use crate::layout::{
use crate::layout::{self, LayoutHelper, LayoutHistory, NodeRoleV}; self, manager::LayoutManager, LayoutDigest, LayoutHelper, LayoutHistory, NodeRoleV,
};
use crate::replication_mode::*; use crate::replication_mode::*;
use crate::rpc_helper::*; use crate::rpc_helper::*;
@ -130,8 +131,8 @@ pub struct NodeStatus {
/// Replication factor configured on the node /// Replication factor configured on the node
pub replication_factor: usize, pub replication_factor: usize,
/// Layout status /// Cluster layout digest
pub layout_status: LayoutStatus, pub layout_digest: LayoutDigest,
/// Disk usage on partition containing metadata directory (tuple: `(avail, total)`) /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
#[serde(default)] #[serde(default)]
@ -539,7 +540,7 @@ impl System {
fn update_local_status(&self) { fn update_local_status(&self) {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
new_si.layout_status = self.layout_manager.status(); new_si.layout_digest = self.layout_manager.layout().digest();
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics); new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
@ -573,7 +574,7 @@ impl System {
} }
self.layout_manager self.layout_manager
.handle_advertise_status(from, &info.layout_status); .handle_advertise_status(from, &info.layout_digest);
self.node_status self.node_status
.write() .write()
@ -755,7 +756,7 @@ impl NodeStatus {
.into_string() .into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()), .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
replication_factor, replication_factor,
layout_status: layout_manager.status(), layout_digest: layout_manager.layout().digest(),
meta_disk_avail: None, meta_disk_avail: None,
data_disk_avail: None, data_disk_avail: None,
} }
@ -765,7 +766,7 @@ impl NodeStatus {
NodeStatus { NodeStatus {
hostname: "?".to_string(), hostname: "?".to_string(),
replication_factor: 0, replication_factor: 0,
layout_status: Default::default(), layout_digest: Default::default(),
meta_disk_avail: None, meta_disk_avail: None,
data_disk_avail: None, data_disk_avail: None,
} }