diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index d912b58f8..593bd7785 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -240,7 +240,6 @@ pub async fn handle_update_cluster_layout( .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role))); } - layout.update_hashes(); garage .system .layout_manager diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index c189232ad..e8cd1fba8 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -29,7 +29,7 @@ pub async fn handle_read_index( .system .cluster_layout() .all_nongateway_nodes() - .into_owned(); + .to_vec(); let (partition_keys, more, next_start) = read_range( &garage.k2v.counter_table.table, diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 0f01a37a1..51774314a 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -49,7 +49,7 @@ pub async fn cmd_assign_role( }; let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - let all_nodes = layout.all_nodes().into_owned(); + let all_nodes = layout.get_all_nodes(); let added_nodes = args .node_ids @@ -331,7 +331,6 @@ pub async fn send_layout( rpc_host: NodeID, mut layout: LayoutHistory, ) -> Result<(), Error> { - layout.update_hashes(); rpc_cli .call( &rpc_host, diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 2cb53424e..efa3e27b9 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -455,7 +455,7 @@ impl<'a> BucketHelper<'a> { .system .cluster_layout() .all_nongateway_nodes() - .into_owned(); + .to_vec(); let k2vindexes = self .0 .k2v diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 2d9687333..e8702bf12 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_db as db; -use garage_rpc::layout::LayoutHistory; +use garage_rpc::layout::LayoutHelper; use garage_rpc::system::System; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -83,7 +83,7 @@ impl Entry for CounterEntry { } impl CounterEntry { - pub fn filtered_values(&self, layout: &LayoutHistory) -> HashMap { + pub fn filtered_values(&self, layout: &LayoutHelper) -> HashMap { let nodes = layout.all_nongateway_nodes(); self.filtered_values_with_nodes(&nodes) } diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 1684918e6..b6f0e4953 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,5 @@ -use std::borrow::Cow; use std::collections::HashSet; +use std::ops::Deref; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -9,95 +9,106 @@ use garage_util::error::*; use super::schema::*; use super::*; +pub struct LayoutHelper { + layout: Option, -impl LayoutHistory { - pub fn new(replication_factor: usize) -> Self { - let version = LayoutVersion::new(replication_factor); + // cached values + ack_map_min: u64, + sync_map_min: u64, - let staging = LayoutStaging { - parameters: Lww::::new(version.parameters), - roles: LwwMap::new(), - }; + all_nodes: Vec, + all_nongateway_nodes: Vec, - let mut ret = LayoutHistory { - versions: vec![version], - update_trackers: Default::default(), - trackers_hash: [0u8; 32].into(), - staging: Lww::raw(0, staging), - staging_hash: [0u8; 32].into(), - }; - ret.update_hashes(); - ret + trackers_hash: Hash, + staging_hash: Hash, +} + +impl Deref for LayoutHelper { + type Target = LayoutHistory; + fn deref(&self) -> &LayoutHistory { + self.layout() + } +} + +impl LayoutHelper { + pub fn new(mut layout: LayoutHistory) -> Self { + layout.cleanup_old_versions(); + + let all_nongateway_nodes = layout.get_all_nongateway_nodes(); + layout.clamp_update_trackers(&all_nongateway_nodes); + + let min_version = layout.min_stored(); + let ack_map_min = layout + .update_trackers + .ack_map + .min(&all_nongateway_nodes, min_version); + let sync_map_min = layout + .update_trackers + .sync_map + .min(&all_nongateway_nodes, min_version); + + let all_nodes = layout.get_all_nodes(); + let trackers_hash = layout.calculate_trackers_hash(); + let staging_hash = layout.calculate_staging_hash(); + + LayoutHelper { + layout: Some(layout), + ack_map_min, + sync_map_min, + all_nodes, + all_nongateway_nodes, + trackers_hash, + staging_hash, + } } - pub fn current(&self) -> &LayoutVersion { - self.versions.last().as_ref().unwrap() + // ------------------ single updating function -------------- + + fn layout(&self) -> &LayoutHistory { + self.layout.as_ref().unwrap() } - pub fn update_hashes(&mut self) { - self.trackers_hash = self.calculate_trackers_hash(); - self.staging_hash = self.calculate_staging_hash(); + pub(crate) fn update(&mut self, f: F) -> bool + where + F: FnOnce(&mut LayoutHistory) -> bool, + { + let changed = f(&mut self.layout.as_mut().unwrap()); + if changed { + *self = Self::new(self.layout.take().unwrap()); + } + changed } - pub(crate) fn calculate_trackers_hash(&self) -> Hash { - blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) + // ------------------ read helpers --------------- + + pub fn all_nodes(&self) -> &[Uuid] { + &self.all_nodes } - pub(crate) fn calculate_staging_hash(&self) -> Hash { - blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) + pub fn all_nongateway_nodes(&self) -> &[Uuid] { + &self.all_nongateway_nodes } - // ------------------ who stores what now? --------------- - pub fn all_ack(&self) -> u64 { - self.update_trackers.ack_map.current_min - } - - pub fn min_stored(&self) -> u64 { - self.versions.first().as_ref().unwrap().version + self.ack_map_min } pub fn sync_versions(&self) -> (u64, u64, u64) { - (self.current().version, self.all_ack(), self.min_stored()) - } - - pub fn all_nodes(&self) -> Cow<'_, [Uuid]> { - // TODO: cache this - if self.versions.len() == 1 { - self.versions[0].all_nodes().into() - } else { - let set = self - .versions - .iter() - .map(|x| x.all_nodes()) - .flatten() - .collect::>(); - set.into_iter().copied().collect::>().into() - } - } - - pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> { - // TODO: cache this - if self.versions.len() == 1 { - self.versions[0].nongateway_nodes().into() - } else { - let set = self - .versions - .iter() - .map(|x| x.nongateway_nodes()) - .flatten() - .collect::>(); - set.into_iter().copied().collect::>().into() - } + ( + self.layout().current().version, + self.all_ack(), + self.layout().min_stored(), + ) } pub fn read_nodes_of(&self, position: &Hash) -> Vec { - let sync_min = self.update_trackers.sync_map.current_min; + let sync_min = self.sync_map_min; let version = self + .layout() .versions .iter() .find(|x| x.version == sync_min) - .or(self.versions.last()) + .or(self.layout().versions.last()) .unwrap(); version .nodes_of(position, version.replication_factor) @@ -105,7 +116,8 @@ impl LayoutHistory { } pub fn write_sets_of(&self, position: &Hash) -> Vec> { - self.versions + self.layout() + .versions .iter() .map(|x| x.nodes_of(position, x.replication_factor).collect()) .collect() @@ -113,7 +125,7 @@ impl LayoutHistory { pub fn storage_nodes_of(&self, position: &Hash) -> Vec { let mut ret = vec![]; - for version in self.versions.iter() { + for version in self.layout().versions.iter() { ret.extend(version.nodes_of(position, version.replication_factor)); } ret.sort(); @@ -121,7 +133,35 @@ impl LayoutHistory { ret } - // ------------------ update tracking --------------- + pub fn trackers_hash(&self) -> Hash { + self.trackers_hash + } + + pub fn staging_hash(&self) -> Hash { + self.staging_hash + } + + // ------------------ helpers for update tracking --------------- + + pub(crate) fn sync_first(&mut self, node: Uuid) { + let first_version = self.versions.first().as_ref().unwrap().version; + self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version)); + } + + pub(crate) fn sync_ack(&mut self, node: Uuid) { + let sync_map_min = self.sync_map_min; + self.update(|layout| { + layout + .update_trackers + .sync_ack_map + .set_max(node, sync_map_min) + }); + } + + pub(crate) fn ack_last(&mut self, node: Uuid) { + let last_version = self.current().version; + self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version)); + } pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) { // Ensure trackers for this node's values are up-to-date @@ -136,55 +176,104 @@ impl LayoutHistory { // 3. Acknowledge everyone has synced up to min(self.sync_map) self.sync_ack(node_id); - // 4. Cleanup layout versions that are not needed anymore - self.cleanup_old_versions(); - - // 5. Recalculate global minima - self.update_trackers_min(); - info!("ack_map: {:?}", self.update_trackers.ack_map); info!("sync_map: {:?}", self.update_trackers.sync_map); info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); - - // Finally, update hashes - self.update_hashes(); } +} - fn update_trackers_min(&mut self) { - // TODO: for TableFullReplication, counting gateway nodes might be - // necessary? Think about this more. - let storage_nodes = self.all_nongateway_nodes().into_owned(); - let min_version = self.versions.first().unwrap().version; - self.update_trackers.update_min(&storage_nodes, min_version); - } +// ---- - pub(crate) fn ack_last(&mut self, node: Uuid) { - let last_version = self.current().version; - self.update_trackers.ack_map.set_max(node, last_version); - self.update_trackers_min(); - } +impl LayoutHistory { + pub fn new(replication_factor: usize) -> Self { + let version = LayoutVersion::new(replication_factor); - pub(crate) fn sync_first(&mut self, node: Uuid) { - let first_version = self.versions.first().as_ref().unwrap().version; - self.update_trackers.sync_map.set_max(node, first_version); - self.update_trackers_min(); - } + let staging = LayoutStaging { + parameters: Lww::::new(version.parameters), + roles: LwwMap::new(), + }; - pub(crate) fn sync_ack(&mut self, node: Uuid) { - self.update_trackers - .sync_ack_map - .set_max(node, self.update_trackers.sync_map.current_min); - self.update_trackers_min(); - } - - pub(crate) fn cleanup_old_versions(&mut self) { - let min_sync_ack = self.update_trackers.sync_ack_map.current_min; - while self.versions.first().as_ref().unwrap().version < min_sync_ack { - let removed = self.versions.remove(0); - info!("Layout history: pruning old version {}", removed.version); + LayoutHistory { + versions: vec![version], + update_trackers: Default::default(), + staging: Lww::raw(0, staging), } } + // ------------------ who stores what now? --------------- + + pub fn current(&self) -> &LayoutVersion { + self.versions.last().as_ref().unwrap() + } + + pub fn min_stored(&self) -> u64 { + self.versions.first().as_ref().unwrap().version + } + + pub fn get_all_nodes(&self) -> Vec { + if self.versions.len() == 1 { + self.versions[0].all_nodes().to_vec() + } else { + let set = self + .versions + .iter() + .map(|x| x.all_nodes()) + .flatten() + .collect::>(); + set.into_iter().copied().collect::>() + } + } + + fn get_all_nongateway_nodes(&self) -> Vec { + if self.versions.len() == 1 { + self.versions[0].nongateway_nodes().to_vec() + } else { + let set = self + .versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::>(); + set.into_iter().copied().collect::>() + } + } + + // ---- housekeeping (all invoked by LayoutHelper) ---- + + fn cleanup_old_versions(&mut self) { + loop { + let all_nongateway_nodes = self.get_all_nongateway_nodes(); + let min_version = self.min_stored(); + let sync_ack_map_min = self + .update_trackers + .sync_ack_map + .min(&all_nongateway_nodes, min_version); + if self.min_stored() < sync_ack_map_min { + let removed = self.versions.remove(0); + info!("Layout history: pruning old version {}", removed.version); + } else { + break; + } + } + } + + fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { + let min_v = self.min_stored(); + for node in nodes { + self.update_trackers.ack_map.set_max(*node, min_v); + self.update_trackers.sync_map.set_max(*node, min_v); + self.update_trackers.sync_ack_map.set_max(*node, min_v); + } + } + + fn calculate_trackers_hash(&self) -> Hash { + blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) + } + + fn calculate_staging_hash(&self) -> Hash { + blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) + } + // ================== updates to layout, public interface =================== pub fn merge(&mut self, other: &LayoutHistory) -> bool { @@ -221,20 +310,6 @@ impl LayoutHistory { self.versions.remove(0); changed = true; } - if changed { - let min_v = self.versions.first().unwrap().version; - let nodes = self.all_nongateway_nodes().into_owned(); - for node in nodes { - self.update_trackers.ack_map.set_max(node, min_v); - self.update_trackers.sync_map.set_max(node, min_v); - self.update_trackers.sync_ack_map.set_max(node, min_v); - } - } - } - - // Update the current_min value in trackers if anything changed - if changed { - self.update_trackers_min(); } // Merge staged layout changes @@ -280,7 +355,6 @@ To know the correct value of the new layout version, invoke `garage layout show` parameters: self.staging.get().parameters.clone(), roles: LwwMap::new(), }); - self.update_hashes(); Ok((self, msg)) } @@ -290,20 +364,11 @@ To know the correct value of the new layout version, invoke `garage layout show` parameters: Lww::new(self.current().parameters.clone()), roles: LwwMap::new(), }); - self.update_hashes(); Ok(self) } pub fn check(&self) -> Result<(), String> { - // Check that the hash of the staging data is correct - if self.trackers_hash != self.calculate_trackers_hash() { - return Err("trackers_hash is incorrect".into()); - } - if self.staging_hash != self.calculate_staging_hash() { - return Err("staging_hash is incorrect".into()); - } - for version in self.versions.iter() { version.check()?; } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 21ec2d8da..e270ad21e 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -24,7 +24,7 @@ pub struct LayoutManager { replication_factor: usize, persist_cluster_layout: Persister, - layout: Arc>, + layout: Arc>, pub(crate) change_notify: Arc, table_sync_version: Mutex>, @@ -54,7 +54,7 @@ impl LayoutManager { let persist_cluster_layout: Persister = Persister::new(&config.metadata_dir, "cluster_layout"); - let mut cluster_layout = match persist_cluster_layout.load() { + let cluster_layout = match persist_cluster_layout.load() { Ok(x) => { if x.current().replication_factor != replication_factor { return Err(Error::Message(format!( @@ -74,6 +74,7 @@ impl LayoutManager { } }; + let mut cluster_layout = LayoutHelper::new(cluster_layout); cluster_layout.update_trackers_of(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); @@ -100,7 +101,7 @@ impl LayoutManager { // ---- PUBLIC INTERFACE ---- - pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> { + pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHelper> { self.layout.read().unwrap() } @@ -108,8 +109,8 @@ impl LayoutManager { let layout = self.layout(); LayoutStatus { cluster_layout_version: layout.current().version, - cluster_layout_trackers_hash: layout.trackers_hash, - cluster_layout_staging_hash: layout.staging_hash, + cluster_layout_trackers_hash: layout.trackers_hash(), + cluster_layout_staging_hash: layout.staging_hash(), } } @@ -137,13 +138,8 @@ impl LayoutManager { drop(table_sync_version); let mut layout = self.layout.write().unwrap(); - if layout - .update_trackers - .sync_map - .set_max(self.node_id, sync_until) - { + if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) { debug!("sync_until updated to {}", sync_until); - layout.update_hashes(); self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( layout.update_trackers.clone(), )); @@ -157,7 +153,7 @@ impl LayoutManager { let prev_layout_check = layout.check().is_ok(); if !prev_layout_check || adv.check().is_ok() { - if layout.merge(adv) { + if layout.update(|l| l.merge(adv)) { layout.update_trackers_of(self.node_id); if prev_layout_check && layout.check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); @@ -171,7 +167,7 @@ impl LayoutManager { fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option { let mut layout = self.layout.write().unwrap(); if layout.update_trackers != *adv { - if layout.update_trackers.merge(adv) { + if layout.update(|l| l.update_trackers.merge(adv)) { layout.update_trackers_of(self.node_id); return Some(layout.update_trackers.clone()); } diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 969f5a0b1..00a2c017f 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -188,7 +188,7 @@ mod v010 { use super::v09; use crate::layout::CompactNodeType; use garage_util::crdt::{Lww, LwwMap}; - use garage_util::data::{Hash, Uuid}; + use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; @@ -202,13 +202,9 @@ mod v010 { /// Update trackers pub update_trackers: UpdateTrackers, - /// Hash of the update trackers - pub trackers_hash: Hash, /// Staged changes for the next version pub staging: Lww, - /// Hash of the serialized staging_parameters + staging_roles - pub staging_hash: Hash, } /// A version of the layout of the cluster, i.e. the list of roles @@ -260,10 +256,7 @@ mod v010 { /// The history of cluster layouts #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] - pub struct UpdateTracker { - pub values: BTreeMap, - pub current_min: u64, - } + pub struct UpdateTracker(pub BTreeMap); impl garage_util::migrate::Migrate for LayoutHistory { const VERSION_MARKER: &'static [u8] = b"G010lh"; @@ -293,32 +286,27 @@ mod v010 { nongateway_node_count, ring_assignment_data: previous.ring_assignment_data, }; - let update_tracker = UpdateTracker { - values: version + let update_tracker = UpdateTracker( + version .nongateway_nodes() .iter() .copied() .map(|x| (x, version.version)) .collect::>(), - current_min: 0, - }; + ); let staging = LayoutStaging { parameters: previous.staging_parameters, roles: previous.staging_roles, }; - let mut ret = Self { + Self { versions: vec![version], update_trackers: UpdateTrackers { ack_map: update_tracker.clone(), sync_map: update_tracker.clone(), sync_ack_map: update_tracker.clone(), }, - trackers_hash: [0u8; 32].into(), staging: Lww::raw(previous.version, staging), - staging_hash: [0u8; 32].into(), - }; - ret.update_hashes(); - ret + } } } } @@ -382,14 +370,14 @@ impl core::str::FromStr for ZoneRedundancy { impl UpdateTracker { fn merge(&mut self, other: &UpdateTracker) -> bool { let mut changed = false; - for (k, v) in other.values.iter() { - if let Some(v_mut) = self.values.get_mut(k) { + for (k, v) in other.0.iter() { + if let Some(v_mut) = self.0.get_mut(k) { if *v > *v_mut { *v_mut = *v; changed = true; } } else { - self.values.insert(*k, *v); + self.0.insert(*k, *v); changed = true; } } @@ -397,23 +385,23 @@ impl UpdateTracker { } pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool { - match self.values.get_mut(&peer) { + match self.0.get_mut(&peer) { Some(e) if *e < value => { *e = value; true } None => { - self.values.insert(peer, value); + self.0.insert(peer, value); true } _ => false, } } - fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) { - self.current_min = storage_nodes + pub(crate) fn min(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { + storage_nodes .iter() - .map(|x| self.values.get(x).copied().unwrap_or(min_version)) + .map(|x| self.0.get(x).copied().unwrap_or(min_version)) .min() .unwrap_or(min_version) } @@ -426,10 +414,4 @@ impl UpdateTrackers { let c3 = self.sync_ack_map.merge(&other.sync_ack_map); c1 || c2 || c3 } - - pub(crate) fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) { - self.ack_map.update_min(&storage_nodes, min_version); - self.sync_map.update_min(&storage_nodes, min_version); - self.sync_ack_map.update_min(&storage_nodes, min_version); - } } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 1bad495bd..e269ddaad 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -26,7 +26,7 @@ use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; -use crate::layout::LayoutHistory; +use crate::layout::LayoutHelper; use crate::metrics::RpcMetrics; // Default RPC timeout = 5 minutes @@ -90,7 +90,7 @@ pub struct RpcHelper(Arc); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc, - layout: Arc>, + layout: Arc>, metrics: RpcMetrics, rpc_timeout: Duration, } @@ -99,7 +99,7 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc, - layout: Arc>, + layout: Arc>, rpc_timeout: Option, ) -> Self { let metrics = RpcMetrics::new(); diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 31d78bf6a..d74dc2a19 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -34,7 +34,7 @@ use crate::consul::ConsulDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::manager::{LayoutManager, LayoutStatus}; -use crate::layout::{self, LayoutHistory, NodeRoleV}; +use crate::layout::{self, LayoutHelper, LayoutHistory, NodeRoleV}; use crate::replication_mode::*; use crate::rpc_helper::*; @@ -350,7 +350,7 @@ impl System { // ---- Public utilities / accessors ---- - pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHistory> { + pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHelper> { self.layout_manager.layout() }