diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 357b9d62..347f03db 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; use garage_util::encode::nonversioned_encode; @@ -30,6 +32,14 @@ impl LayoutHistory { self.versions.last().as_ref().unwrap() } + pub fn all_storage_nodes(&self) -> HashSet { + self.versions + .iter() + .map(|x| x.nongateway_nodes()) + .flatten() + .collect::>() + } + pub(crate) fn update_hashes(&mut self) { self.trackers_hash = self.calculate_trackers_hash(); self.staging_hash = self.calculate_staging_hash(); @@ -43,6 +53,65 @@ impl LayoutHistory { blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } + // ------------------ update tracking --------------- + + pub(crate) fn update_trackers(&mut self, node_id: Uuid) { + // Ensure trackers for this node's values are up-to-date + + // 1. Acknowledge the last layout version in the history + self.ack_last(node_id); + + // 2. Assume the data on this node is sync'ed up at least to + // the first layout version in the history + self.sync_first(node_id); + + // 3. Acknowledge everyone has synced up to min(self.sync_map) + self.sync_ack(node_id); + + // 4. Cleanup layout versions that are not needed anymore + self.cleanup_old_versions(); + + info!("ack_map: {:?}", self.update_trackers.ack_map); + info!("sync_map: {:?}", self.update_trackers.sync_map); + info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + + // Finally, update hashes + self.update_hashes(); + } + + pub(crate) fn ack_last(&mut self, node: Uuid) { + let last_version = self.current().version; + self.update_trackers.ack_map.set_max(node, last_version); + } + + pub(crate) fn sync_first(&mut self, node: Uuid) { + let first_version = self.versions.first().as_ref().unwrap().version; + self.update_trackers.sync_map.set_max(node, first_version); + } + + pub(crate) fn sync_ack(&mut self, node: Uuid) { + self.update_trackers.sync_ack_map.set_max( + node, + self.calculate_global_min(&self.update_trackers.sync_map), + ); + } + + pub(crate) fn cleanup_old_versions(&mut self) { + let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map); + while self.versions.first().as_ref().unwrap().version < min_sync_ack { + self.versions.remove(0); + } + } + + pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 { + let storage_nodes = self.all_storage_nodes(); + storage_nodes + .iter() + .map(|x| tracker.0.get(x).copied().unwrap_or(0)) + .min() + .unwrap_or(0) + } + // ================== updates to layout, public interface =================== pub fn merge(&mut self, other: &LayoutHistory) -> bool { @@ -78,11 +147,6 @@ impl LayoutHistory { changed = true; } - // Update hashes if there are changes - if changed { - self.update_hashes(); - } - changed } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index a2502f58..ffcc938b 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -51,7 +51,7 @@ impl LayoutManager { let persist_cluster_layout: Persister = Persister::new(&config.metadata_dir, "cluster_layout"); - let cluster_layout = match persist_cluster_layout.load() { + let mut cluster_layout = match persist_cluster_layout.load() { Ok(x) => { if x.current().replication_factor != replication_factor { return Err(Error::Message(format!( @@ -71,6 +71,8 @@ impl LayoutManager { } }; + cluster_layout.update_trackers(node_id.into()); + let layout = Arc::new(RwLock::new(cluster_layout)); let change_notify = Arc::new(Notify::new()); @@ -126,7 +128,7 @@ impl LayoutManager { if prev_layout_check && layout.check().is_err() { panic!("Merged two correct layouts and got an incorrect layout."); } - + layout.update_trackers(self.node_id); return Some(layout.clone()); } } @@ -137,6 +139,7 @@ impl LayoutManager { let mut layout = self.layout.write().unwrap(); if layout.update_trackers != *adv { if layout.update_trackers.merge(adv) { + layout.update_trackers(self.node_id); return Some(layout.update_trackers.clone()); } } diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index abae5bd8..9f5d6f62 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -3,6 +3,7 @@ use std::fmt; use bytesize::ByteSize; use garage_util::crdt::{AutoCrdt, Crdt}; +use garage_util::data::Uuid; mod v08 { use crate::layout::CompactNodeType; @@ -276,8 +277,7 @@ mod v010 { let update_tracker = UpdateTracker( version .nongateway_nodes() - .iter() - .map(|x| (*x, version.version)) + .map(|x| (x, version.version)) .collect::>(), ); let staging = LayoutStaging { @@ -375,8 +375,15 @@ impl UpdateTracker { changed } - pub(crate) fn min(&self) -> u64 { - self.0.iter().map(|(_, v)| *v).min().unwrap_or(0) + pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) { + match self.0.get_mut(&peer) { + Some(e) => { + *e = std::cmp::max(*e, value); + } + None => { + self.0.insert(peer, value); + } + } } } diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 6918fdf9..65c62f63 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -134,15 +134,14 @@ impl LayoutVersion { // ===================== internal information extractors ====================== /// Returns the uuids of the non_gateway nodes in self.node_id_vec. - pub(crate) fn nongateway_nodes(&self) -> Vec { - let mut result = Vec::::new(); - for uuid in self.node_id_vec.iter() { - match self.node_role(uuid) { - Some(role) if role.capacity.is_some() => result.push(*uuid), - _ => (), - } - } - result + pub(crate) fn nongateway_nodes(&self) -> impl Iterator + '_ { + self.node_id_vec + .iter() + .copied() + .filter(move |uuid| match self.node_role(uuid) { + Some(role) if role.capacity.is_some() => true, + _ => false, + }) } /// Given a node uuids, this function returns the label of its zone @@ -158,8 +157,8 @@ impl LayoutVersion { /// Returns the sum of capacities of non gateway nodes in the cluster fn get_total_capacity(&self) -> Result { let mut total_capacity = 0; - for uuid in self.nongateway_nodes().iter() { - total_capacity += self.get_node_capacity(uuid)?; + for uuid in self.nongateway_nodes() { + total_capacity += self.get_node_capacity(&uuid)?; } Ok(total_capacity) } @@ -320,7 +319,7 @@ impl LayoutVersion { // to use them as indices in the flow graphs. let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?; - let nb_nongateway_nodes = self.nongateway_nodes().len(); + let nb_nongateway_nodes = self.nongateway_nodes().count(); if nb_nongateway_nodes < self.replication_factor { return Err(Error::Message(format!( "The number of nodes with positive \ @@ -479,7 +478,8 @@ impl LayoutVersion { let mut id_to_zone = Vec::::new(); let mut zone_to_id = HashMap::::new(); - for uuid in self.nongateway_nodes().iter() { + let nongateway_nodes = self.nongateway_nodes().collect::>(); + for uuid in nongateway_nodes.iter() { let r = self.node_role(uuid).unwrap(); if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() { zone_to_id.insert(r.zone.clone(), id_to_zone.len()); @@ -556,8 +556,10 @@ impl LayoutVersion { exclude_assoc: &HashSet<(usize, usize)>, zone_redundancy: usize, ) -> Result, Error> { - let vertices = - LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); + let vertices = LayoutVersion::generate_graph_vertices( + zone_to_id.len(), + self.nongateway_nodes().count(), + ); let mut g = Graph::::new(&vertices); let nb_zones = zone_to_id.len(); for p in 0..NB_PARTITIONS { @@ -576,7 +578,7 @@ impl LayoutVersion { )?; } } - for n in 0..self.nongateway_nodes().len() { + for n in 0..self.nongateway_nodes().count() { let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?; let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?]; g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?; @@ -600,7 +602,7 @@ impl LayoutVersion { // previous assignment let mut exclude_edge = HashSet::<(usize, usize)>::new(); if let Some(prev_assign) = prev_assign_opt { - let nb_nodes = self.nongateway_nodes().len(); + let nb_nodes = self.nongateway_nodes().count(); for (p, prev_assign_p) in prev_assign.iter().enumerate() { for n in 0..nb_nodes { exclude_edge.insert((p, n)); @@ -652,7 +654,7 @@ impl LayoutVersion { // We compute the maximal length of a simple path in gflow. It is used in the // Bellman-Ford algorithm in optimize_flow_with_cost to set the number // of iterations. - let nb_nodes = self.nongateway_nodes().len(); + let nb_nodes = self.nongateway_nodes().count(); let path_length = 4 * nb_nodes; gflow.optimize_flow_with_cost(&cost, path_length)?; @@ -730,7 +732,7 @@ impl LayoutVersion { } // We define and fill in the following tables - let storing_nodes = self.nongateway_nodes(); + let storing_nodes = self.nongateway_nodes().collect::>(); let mut new_partitions = vec![0; storing_nodes.len()]; let mut stored_partitions = vec![0; storing_nodes.len()]; @@ -873,9 +875,9 @@ mod tests { for z in zones.iter() { zone_token.insert(z.clone(), 0); } - for uuid in cl.nongateway_nodes().iter() { - let z = cl.get_node_zone(uuid)?; - let c = cl.get_node_capacity(uuid)?; + for uuid in cl.nongateway_nodes() { + let z = cl.get_node_zone(&uuid)?; + let c = cl.get_node_capacity(&uuid)?; zone_token.insert( z.clone(), zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize),