diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 2346b14a4..1684918e6 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -9,6 +9,7 @@ use garage_util::error::*; use super::schema::*; use super::*; + impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { let version = LayoutVersion::new(replication_factor); @@ -49,7 +50,7 @@ impl LayoutHistory { // ------------------ who stores what now? --------------- pub fn all_ack(&self) -> u64 { - self.calculate_global_min(&self.update_trackers.ack_map) + self.update_trackers.ack_map.current_min } pub fn min_stored(&self) -> u64 { @@ -91,7 +92,7 @@ impl LayoutHistory { } pub fn read_nodes_of(&self, position: &Hash) -> Vec { - let sync_min = self.calculate_global_min(&self.update_trackers.sync_map); + let sync_min = self.update_trackers.sync_map.current_min; let version = self .versions .iter() @@ -122,7 +123,7 @@ impl LayoutHistory { // ------------------ update tracking --------------- - pub(crate) fn update_trackers(&mut self, node_id: Uuid) { + pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) { // Ensure trackers for this node's values are up-to-date // 1. Acknowledge the last layout version in the history @@ -138,6 +139,9 @@ impl LayoutHistory { // 4. Cleanup layout versions that are not needed anymore self.cleanup_old_versions(); + // 5. Recalculate global minima + self.update_trackers_min(); + info!("ack_map: {:?}", self.update_trackers.ack_map); info!("sync_map: {:?}", self.update_trackers.sync_map); info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); @@ -146,42 +150,41 @@ impl LayoutHistory { self.update_hashes(); } + fn update_trackers_min(&mut self) { + // TODO: for TableFullReplication, counting gateway nodes might be + // necessary? Think about this more. + let storage_nodes = self.all_nongateway_nodes().into_owned(); + let min_version = self.versions.first().unwrap().version; + self.update_trackers.update_min(&storage_nodes, min_version); + } + pub(crate) fn ack_last(&mut self, node: Uuid) { let last_version = self.current().version; self.update_trackers.ack_map.set_max(node, last_version); + self.update_trackers_min(); } pub(crate) fn sync_first(&mut self, node: Uuid) { let first_version = self.versions.first().as_ref().unwrap().version; self.update_trackers.sync_map.set_max(node, first_version); + self.update_trackers_min(); } pub(crate) fn sync_ack(&mut self, node: Uuid) { - self.update_trackers.sync_ack_map.set_max( - node, - self.calculate_global_min(&self.update_trackers.sync_map), - ); + self.update_trackers + .sync_ack_map + .set_max(node, self.update_trackers.sync_map.current_min); + self.update_trackers_min(); } pub(crate) fn cleanup_old_versions(&mut self) { - let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map); + let min_sync_ack = self.update_trackers.sync_ack_map.current_min; while self.versions.first().as_ref().unwrap().version < min_sync_ack { let removed = self.versions.remove(0); info!("Layout history: pruning old version {}", removed.version); } } - pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 { - // TODO: for TableFullReplication, counting gateway nodes might be - // necessary? Think about this more. - let storage_nodes = self.all_nongateway_nodes(); - storage_nodes - .iter() - .map(|x| tracker.0.get(x).copied().unwrap_or(0)) - .min() - .unwrap_or(0) - } - // ================== updates to layout, public interface =================== pub fn merge(&mut self, other: &LayoutHistory) -> bool { @@ -229,6 +232,11 @@ impl LayoutHistory { } } + // Update the current_min value in trackers if anything changed + if changed { + self.update_trackers_min(); + } + // Merge staged layout changes if self.staging != other.staging { self.staging.merge(&other.staging); diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index ce8b6f61c..21ec2d8da 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -74,7 +74,7 @@ impl LayoutManager { } }; - cluster_layout.update_trackers(node_id.into()); + cluster_layout.update_trackers_of(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); let change_notify = Arc::new(Notify::new()); @@ -158,7 +158,7 @@ impl LayoutManager { if !prev_layout_check || adv.check().is_ok() { if layout.merge(adv) { - layout.update_trackers(self.node_id); + layout.update_trackers_of(self.node_id); if prev_layout_check && layout.check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); } @@ -172,7 +172,7 @@ impl LayoutManager { let mut layout = self.layout.write().unwrap(); if layout.update_trackers != *adv { if layout.update_trackers.merge(adv) { - layout.update_trackers(self.node_id); + layout.update_trackers_of(self.node_id); return Some(layout.update_trackers.clone()); } } diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 79440a471..969f5a0b1 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -260,7 +260,10 @@ mod v010 { /// The history of cluster layouts #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] - pub struct UpdateTracker(pub BTreeMap); + pub struct UpdateTracker { + pub values: BTreeMap, + pub current_min: u64, + } impl garage_util::migrate::Migrate for LayoutHistory { const VERSION_MARKER: &'static [u8] = b"G010lh"; @@ -290,14 +293,15 @@ mod v010 { nongateway_node_count, ring_assignment_data: previous.ring_assignment_data, }; - let update_tracker = UpdateTracker( - version + let update_tracker = UpdateTracker { + values: version .nongateway_nodes() .iter() .copied() .map(|x| (x, version.version)) .collect::>(), - ); + current_min: 0, + }; let staging = LayoutStaging { parameters: previous.staging_parameters, roles: previous.staging_roles, @@ -378,14 +382,14 @@ impl core::str::FromStr for ZoneRedundancy { impl UpdateTracker { fn merge(&mut self, other: &UpdateTracker) -> bool { let mut changed = false; - for (k, v) in other.0.iter() { - if let Some(v_mut) = self.0.get_mut(k) { + for (k, v) in other.values.iter() { + if let Some(v_mut) = self.values.get_mut(k) { if *v > *v_mut { *v_mut = *v; changed = true; } } else { - self.0.insert(*k, *v); + self.values.insert(*k, *v); changed = true; } } @@ -393,18 +397,26 @@ impl UpdateTracker { } pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool { - match self.0.get_mut(&peer) { + match self.values.get_mut(&peer) { Some(e) if *e < value => { *e = value; true } None => { - self.0.insert(peer, value); + self.values.insert(peer, value); true } _ => false, } } + + fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) { + self.current_min = storage_nodes + .iter() + .map(|x| self.values.get(x).copied().unwrap_or(min_version)) + .min() + .unwrap_or(min_version) + } } impl UpdateTrackers { @@ -414,4 +426,10 @@ impl UpdateTrackers { let c3 = self.sync_ack_map.merge(&other.sync_ack_map); c1 || c2 || c3 } + + pub(crate) fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) { + self.ack_map.update_min(&storage_nodes, min_version); + self.sync_map.update_min(&storage_nodes, min_version); + self.sync_ack_map.update_min(&storage_nodes, min_version); + } }