layout: separate code path for synchronizing update trackers only

This commit is contained in:
Alex 2023-11-09 14:53:34 +01:00
parent bfb1845fdc
commit 94caf9c0c1
Signed by: lx
GPG key ID: 0E496D15096376BE
4 changed files with 178 additions and 71 deletions

View file

@ -18,10 +18,11 @@ impl LayoutHistory {
let mut ret = LayoutHistory {
versions: vec![version].into_boxed_slice().into(),
update_trackers: Default::default(),
trackers_hash: [0u8; 32].into(),
staging: Lww::raw(0, staging),
staging_hash: [0u8; 32].into(),
};
ret.staging_hash = ret.calculate_staging_hash();
ret.update_hashes();
ret
}
@ -29,6 +30,15 @@ impl LayoutHistory {
self.versions.last().as_ref().unwrap()
}
pub(crate) fn update_hashes(&mut self) {
self.trackers_hash = self.calculate_trackers_hash();
self.staging_hash = self.calculate_staging_hash();
}
pub(crate) fn calculate_trackers_hash(&self) -> Hash {
blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..])
}
pub(crate) fn calculate_staging_hash(&self) -> Hash {
blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
}
@ -38,12 +48,6 @@ impl LayoutHistory {
pub fn merge(&mut self, other: &LayoutHistory) -> bool {
let mut changed = false;
// Merge staged layout changes
if self.staging != other.staging {
changed = true;
}
self.staging.merge(&other.staging);
// Add any new versions to history
for v2 in other.versions.iter() {
if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) {
@ -63,7 +67,21 @@ impl LayoutHistory {
}
// Merge trackers
self.update_trackers.merge(&other.update_trackers);
if self.update_trackers != other.update_trackers {
let c = self.update_trackers.merge(&other.update_trackers);
changed = changed || c;
}
// Merge staged layout changes
if self.staging != other.staging {
self.staging.merge(&other.staging);
changed = true;
}
// Update hashes if there are changes
if changed {
self.update_hashes();
}
changed
}
@ -100,7 +118,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
parameters: self.staging.get().parameters.clone(),
roles: LwwMap::new(),
});
self.staging_hash = self.calculate_staging_hash();
self.update_hashes();
Ok((self, msg))
}
@ -110,20 +128,25 @@ To know the correct value of the new layout version, invoke `garage layout show`
parameters: Lww::new(self.current().parameters.clone()),
roles: LwwMap::new(),
});
self.staging_hash = self.calculate_staging_hash();
self.update_hashes();
Ok(self)
}
pub fn check(&self) -> Result<(), String> {
// Check that the hash of the staging data is correct
let staging_hash = self.calculate_staging_hash();
if staging_hash != self.staging_hash {
if self.trackers_hash != self.calculate_trackers_hash() {
return Err("trackers_hash is incorrect".into());
}
if self.staging_hash != self.calculate_staging_hash() {
return Err("staging_hash is incorrect".into());
}
// TODO: anythign more ?
for version in self.versions.iter() {
version.check()?;
}
self.current().check()
// TODO: anythign more ?
Ok(())
}
}

View file

@ -19,6 +19,7 @@ use crate::rpc_helper::*;
use crate::system::*;
pub struct LayoutManager {
node_id: Uuid,
replication_factor: usize,
persist_cluster_layout: Persister<LayoutHistory>,
@ -34,7 +35,7 @@ pub struct LayoutStatus {
/// Cluster layout version
pub cluster_layout_version: u64,
/// Hash of cluster layout update trackers
// (TODO) pub cluster_layout_trackers_hash: Hash,
pub cluster_layout_trackers_hash: Hash,
/// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash,
}
@ -81,6 +82,7 @@ impl LayoutManager {
);
Ok(Arc::new(Self {
node_id: node_id.into(),
replication_factor,
persist_cluster_layout,
layout,
@ -92,10 +94,15 @@ impl LayoutManager {
// ---- PUBLIC INTERFACE ----
pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
self.layout.read().unwrap()
}
pub fn status(&self) -> LayoutStatus {
let layout = self.layout();
LayoutStatus {
cluster_layout_version: layout.current().version,
cluster_layout_trackers_hash: layout.trackers_hash,
cluster_layout_staging_hash: layout.staging_hash,
}
}
@ -108,37 +115,8 @@ impl LayoutManager {
Ok(())
}
pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
self.layout.read().unwrap()
}
pub(crate) async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) {
let resp = self
.rpc_helper
.call(
&self.system_endpoint,
peer,
SystemRpc::PullClusterLayout,
RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
}
}
// ---- INTERNALS ---
/// Save network configuration to disc
async fn save_cluster_layout(&self) -> Result<(), Error> {
let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning
self.persist_cluster_layout
.save_async(&layout)
.await
.expect("Cannot save current cluster layout");
Ok(())
}
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
let mut layout = self.layout.write().unwrap();
let prev_layout_check = layout.check().is_ok();
@ -155,17 +133,98 @@ impl LayoutManager {
None
}
fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> {
let mut layout = self.layout.write().unwrap();
if layout.update_trackers != *adv {
if layout.update_trackers.merge(adv) {
return Some(layout.update_trackers.clone());
}
}
None
}
async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) {
let resp = self
.rpc_helper
.call(
&self.system_endpoint,
peer,
SystemRpc::PullClusterLayout,
RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
if let Err(e) = self.handle_advertise_cluster_layout(&layout).await {
warn!("In pull_cluster_layout: {}", e);
}
}
}
async fn pull_cluster_layout_trackers(self: &Arc<Self>, peer: Uuid) {
let resp = self
.rpc_helper
.call(
&self.system_endpoint,
peer,
SystemRpc::PullClusterLayoutTrackers,
RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
if let Ok(SystemRpc::AdvertiseClusterLayoutTrackers(trackers)) = resp {
if let Err(e) = self
.handle_advertise_cluster_layout_trackers(&trackers)
.await
{
warn!("In pull_cluster_layout_trackers: {}", e);
}
}
}
/// Save cluster layout data to disk
async fn save_cluster_layout(&self) -> Result<(), Error> {
let layout = self.layout.read().unwrap().clone();
self.persist_cluster_layout
.save_async(&layout)
.await
.expect("Cannot save current cluster layout");
Ok(())
}
fn broadcast_update(self: &Arc<Self>, rpc: SystemRpc) {
tokio::spawn({
let this = self.clone();
async move {
if let Err(e) = this
.rpc_helper
.broadcast(
&this.system_endpoint,
rpc,
RequestStrategy::with_priority(PRIO_HIGH),
)
.await
{
warn!("Error while broadcasting new cluster layout: {}", e);
}
}
});
}
// ---- RPC HANDLERS ----
pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, status: &LayoutStatus) {
let local_status = self.status();
if status.cluster_layout_version > local_status.cluster_layout_version
|| status.cluster_layout_staging_hash != local_status.cluster_layout_staging_hash
pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutStatus) {
let local = self.status();
if remote.cluster_layout_version > local.cluster_layout_version
|| remote.cluster_layout_staging_hash != local.cluster_layout_staging_hash
{
tokio::spawn({
let this = self.clone();
async move { this.pull_cluster_layout(from).await }
});
} else if remote.cluster_layout_trackers_hash != local.cluster_layout_trackers_hash {
tokio::spawn({
let this = self.clone();
async move { this.pull_cluster_layout_trackers(from).await }
});
}
}
@ -174,6 +233,11 @@ impl LayoutManager {
SystemRpc::AdvertiseClusterLayout(layout)
}
pub(crate) fn handle_pull_cluster_layout_trackers(&self) -> SystemRpc {
let layout = self.layout.read().unwrap();
SystemRpc::AdvertiseClusterLayoutTrackers(layout.update_trackers.clone())
}
pub(crate) async fn handle_advertise_cluster_layout(
self: &Arc<Self>,
adv: &LayoutHistory,
@ -190,24 +254,20 @@ impl LayoutManager {
if let Some(new_layout) = self.merge_layout(adv) {
self.change_notify.notify_waiters();
self.broadcast_update(SystemRpc::AdvertiseClusterLayout(new_layout));
self.save_cluster_layout().await?;
}
tokio::spawn({
let this = self.clone();
async move {
if let Err(e) = this
.rpc_helper
.broadcast(
&this.system_endpoint,
SystemRpc::AdvertiseClusterLayout(new_layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
.await
{
warn!("Error while broadcasting new cluster layout: {}", e);
}
}
});
Ok(SystemRpc::Ok)
}
pub(crate) async fn handle_advertise_cluster_layout_trackers(
self: &Arc<Self>,
trackers: &UpdateTrackers,
) -> Result<SystemRpc, Error> {
if let Some(new_trackers) = self.merge_layout_trackers(trackers) {
self.change_notify.notify_waiters();
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(new_trackers));
self.save_cluster_layout().await?;
}

View file

@ -233,6 +233,8 @@ mod v010 {
/// Update trackers
pub update_trackers: UpdateTrackers,
/// Hash of the update trackers
pub trackers_hash: Hash,
/// Staged changes for the next version
pub staging: Lww<LayoutStaging>,
@ -289,10 +291,12 @@ mod v010 {
sync_map: update_tracker.clone(),
sync_ack_map: update_tracker.clone(),
},
trackers_hash: [0u8; 32].into(),
staging: Lww::raw(previous.version, staging),
staging_hash: [0u8; 32].into(),
};
ret.staging_hash = ret.calculate_staging_hash();
ret.trackers_hash = ret.calculate_trackers_hash();
ret
}
}
@ -355,14 +359,20 @@ impl core::str::FromStr for ZoneRedundancy {
}
impl UpdateTracker {
fn merge(&mut self, other: &UpdateTracker) {
fn merge(&mut self, other: &UpdateTracker) -> bool {
let mut changed = false;
for (k, v) in other.0.iter() {
if let Some(v_mut) = self.0.get_mut(k) {
*v_mut = std::cmp::max(*v_mut, *v);
if *v > *v_mut {
*v_mut = *v;
changed = true;
}
} else {
self.0.insert(*k, *v);
changed = true;
}
}
changed
}
pub(crate) fn min(&self) -> u64 {
@ -371,9 +381,10 @@ impl UpdateTracker {
}
impl UpdateTrackers {
pub(crate) fn merge(&mut self, other: &UpdateTrackers) {
self.ack_map.merge(&other.ack_map);
self.sync_map.merge(&other.sync_map);
self.sync_ack_map.merge(&other.sync_ack_map);
pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool {
let c1 = self.ack_map.merge(&other.ack_map);
let c2 = self.sync_map.merge(&other.sync_map);
let c3 = self.sync_ack_map.merge(&other.sync_ack_map);
c1 || c2 || c3
}
}

View file

@ -34,7 +34,7 @@ use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::manager::{LayoutManager, LayoutStatus};
use crate::layout::*;
use crate::layout::{self, LayoutHistory, NodeRoleV};
use crate::replication_mode::*;
use crate::rpc_helper::*;
@ -65,10 +65,15 @@ pub enum SystemRpc {
GetKnownNodes,
/// Return known nodes
ReturnKnownNodes(Vec<KnownNodeInfo>),
/// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
PullClusterLayout,
/// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
AdvertiseClusterLayout(LayoutHistory),
/// Ask other node its cluster layout update trackers.
PullClusterLayoutTrackers,
/// Advertisement of cluster layout update trackers.
AdvertiseClusterLayoutTrackers(layout::UpdateTrackers),
}
impl Rpc for SystemRpc {
@ -727,6 +732,14 @@ impl EndpointHandler<SystemRpc> for System {
.handle_advertise_cluster_layout(adv)
.await
}
SystemRpc::PullClusterLayoutTrackers => {
Ok(self.layout_manager.handle_pull_cluster_layout_trackers())
}
SystemRpc::AdvertiseClusterLayoutTrackers(adv) => {
self.layout_manager
.handle_advertise_cluster_layout_trackers(adv)
.await
}
// ---- other -> Error ----
m => Err(Error::unexpected_rpc_message(m)),