diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index b15f75409..3a033ab2b 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -238,19 +238,31 @@ impl LayoutHelper { // ------------------ helpers for update tracking --------------- - pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { + pub(crate) fn update_update_trackers(&mut self, local_node_id: Uuid) { // Ensure trackers for this node's values are up-to-date // 1. Acknowledge the last layout version which is not currently // locked by an in-progress write operation - self.ack_max_free(local_node_id); + self.update_ack_to_max_free(local_node_id); // 2. Assume the data on this node is sync'ed up at least to // the first layout version in the history - self.sync_first(local_node_id); + let first_version = self.inner().min_stored(); + self.update(|layout| { + layout + .update_trackers + .sync_map + .set_max(local_node_id, first_version) + }); // 3. Acknowledge everyone has synced up to min(self.sync_map) - self.sync_ack(local_node_id); + let sync_map_min = self.sync_map_min; + self.update(|layout| { + layout + .update_trackers + .sync_ack_map + .set_max(local_node_id, sync_map_min) + }); debug!("ack_map: {:?}", self.inner().update_trackers.ack_map); debug!("sync_map: {:?}", self.inner().update_trackers.sync_map); @@ -260,42 +272,9 @@ impl LayoutHelper { ); } - fn sync_first(&mut self, local_node_id: Uuid) { - let first_version = self.inner().min_stored(); - self.update(|layout| { - layout - .update_trackers - .sync_map - .set_max(local_node_id, first_version) - }); - } - - fn sync_ack(&mut self, local_node_id: Uuid) { - let sync_map_min = self.sync_map_min; - self.update(|layout| { - layout - .update_trackers - .sync_ack_map - .set_max(local_node_id, sync_map_min) - }); - } - - pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool { - let max_ack = self.max_free_ack(); - let changed = self.update(|layout| { - layout - .update_trackers - .ack_map - .set_max(local_node_id, max_ack) - }); - if changed { - info!("ack_until updated to {}", max_ack); - } - changed - } - - pub(crate) fn max_free_ack(&self) -> u64 { - self.versions() + pub(crate) fn update_ack_to_max_free(&mut self, local_node_id: Uuid) -> bool { + let max_free = self + .versions() .iter() .map(|x| x.version) .skip_while(|v| { @@ -305,6 +284,16 @@ impl LayoutHelper { .unwrap_or(true) }) .next() - .unwrap_or(self.current().version) + .unwrap_or(self.current().version); + let changed = self.update(|layout| { + layout + .update_trackers + .ack_map + .set_max(local_node_id, max_free) + }); + if changed { + info!("ack_until updated to {}", max_free); + } + changed } } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 0ca532baa..a0dcf50e6 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -70,7 +70,7 @@ impl LayoutManager { cluster_layout, Default::default(), ); - cluster_layout.update_trackers(node_id.into()); + cluster_layout.update_update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); let change_notify = Arc::new(Notify::new()); @@ -134,7 +134,7 @@ impl LayoutManager { fn ack_new_version(self: &Arc) { let mut layout = self.layout.write().unwrap(); - if layout.ack_max_free(self.node_id) { + if layout.update_ack_to_max_free(self.node_id) { self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( layout.inner().update_trackers.clone(), )); @@ -164,7 +164,7 @@ impl LayoutManager { if !prev_layout_check || adv.check().is_ok() { if layout.update(|l| l.merge(adv)) { - layout.update_trackers(self.node_id); + layout.update_update_trackers(self.node_id); if prev_layout_check && !layout.is_check_ok() { panic!("Merged two correct layouts and got an incorrect layout."); } @@ -182,7 +182,7 @@ impl LayoutManager { if layout.inner().update_trackers != *adv { if layout.update(|l| l.update_trackers.merge(adv)) { - layout.update_trackers(self.node_id); + layout.update_update_trackers(self.node_id); assert!(layout.digest() != prev_digest); return Some(layout.inner().update_trackers.clone()); }