[next-0.10] layout helper: rename & clarify updates to update trackers

This commit is contained in:
Alex 2024-03-27 13:47:06 +01:00
parent 32f1786f9f
commit 4eba32f29f
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
2 changed files with 34 additions and 45 deletions
src/rpc/layout

View file

@ -238,19 +238,31 @@ impl LayoutHelper {
// ------------------ helpers for update tracking --------------- // ------------------ helpers for update tracking ---------------
pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { pub(crate) fn update_update_trackers(&mut self, local_node_id: Uuid) {
// Ensure trackers for this node's values are up-to-date // Ensure trackers for this node's values are up-to-date
// 1. Acknowledge the last layout version which is not currently // 1. Acknowledge the last layout version which is not currently
// locked by an in-progress write operation // locked by an in-progress write operation
self.ack_max_free(local_node_id); self.update_ack_to_max_free(local_node_id);
// 2. Assume the data on this node is sync'ed up at least to // 2. Assume the data on this node is sync'ed up at least to
// the first layout version in the history // the first layout version in the history
self.sync_first(local_node_id); let first_version = self.inner().min_stored();
self.update(|layout| {
layout
.update_trackers
.sync_map
.set_max(local_node_id, first_version)
});
// 3. Acknowledge everyone has synced up to min(self.sync_map) // 3. Acknowledge everyone has synced up to min(self.sync_map)
self.sync_ack(local_node_id); let sync_map_min = self.sync_map_min;
self.update(|layout| {
layout
.update_trackers
.sync_ack_map
.set_max(local_node_id, sync_map_min)
});
debug!("ack_map: {:?}", self.inner().update_trackers.ack_map); debug!("ack_map: {:?}", self.inner().update_trackers.ack_map);
debug!("sync_map: {:?}", self.inner().update_trackers.sync_map); debug!("sync_map: {:?}", self.inner().update_trackers.sync_map);
@ -260,42 +272,9 @@ impl LayoutHelper {
); );
} }
fn sync_first(&mut self, local_node_id: Uuid) { pub(crate) fn update_ack_to_max_free(&mut self, local_node_id: Uuid) -> bool {
let first_version = self.inner().min_stored(); let max_free = self
self.update(|layout| { .versions()
layout
.update_trackers
.sync_map
.set_max(local_node_id, first_version)
});
}
fn sync_ack(&mut self, local_node_id: Uuid) {
let sync_map_min = self.sync_map_min;
self.update(|layout| {
layout
.update_trackers
.sync_ack_map
.set_max(local_node_id, sync_map_min)
});
}
pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool {
let max_ack = self.max_free_ack();
let changed = self.update(|layout| {
layout
.update_trackers
.ack_map
.set_max(local_node_id, max_ack)
});
if changed {
info!("ack_until updated to {}", max_ack);
}
changed
}
pub(crate) fn max_free_ack(&self) -> u64 {
self.versions()
.iter() .iter()
.map(|x| x.version) .map(|x| x.version)
.skip_while(|v| { .skip_while(|v| {
@ -305,6 +284,16 @@ impl LayoutHelper {
.unwrap_or(true) .unwrap_or(true)
}) })
.next() .next()
.unwrap_or(self.current().version) .unwrap_or(self.current().version);
let changed = self.update(|layout| {
layout
.update_trackers
.ack_map
.set_max(local_node_id, max_free)
});
if changed {
info!("ack_until updated to {}", max_free);
}
changed
} }
} }

View file

@ -70,7 +70,7 @@ impl LayoutManager {
cluster_layout, cluster_layout,
Default::default(), Default::default(),
); );
cluster_layout.update_trackers(node_id.into()); cluster_layout.update_update_trackers(node_id.into());
let layout = Arc::new(RwLock::new(cluster_layout)); let layout = Arc::new(RwLock::new(cluster_layout));
let change_notify = Arc::new(Notify::new()); let change_notify = Arc::new(Notify::new());
@ -134,7 +134,7 @@ impl LayoutManager {
fn ack_new_version(self: &Arc<Self>) { fn ack_new_version(self: &Arc<Self>) {
let mut layout = self.layout.write().unwrap(); let mut layout = self.layout.write().unwrap();
if layout.ack_max_free(self.node_id) { if layout.update_ack_to_max_free(self.node_id) {
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
layout.inner().update_trackers.clone(), layout.inner().update_trackers.clone(),
)); ));
@ -164,7 +164,7 @@ impl LayoutManager {
if !prev_layout_check || adv.check().is_ok() { if !prev_layout_check || adv.check().is_ok() {
if layout.update(|l| l.merge(adv)) { if layout.update(|l| l.merge(adv)) {
layout.update_trackers(self.node_id); layout.update_update_trackers(self.node_id);
if prev_layout_check && !layout.is_check_ok() { if prev_layout_check && !layout.is_check_ok() {
panic!("Merged two correct layouts and got an incorrect layout."); panic!("Merged two correct layouts and got an incorrect layout.");
} }
@ -182,7 +182,7 @@ impl LayoutManager {
if layout.inner().update_trackers != *adv { if layout.inner().update_trackers != *adv {
if layout.update(|l| l.update_trackers.merge(adv)) { if layout.update(|l| l.update_trackers.merge(adv)) {
layout.update_trackers(self.node_id); layout.update_update_trackers(self.node_id);
assert!(layout.digest() != prev_digest); assert!(layout.digest() != prev_digest);
return Some(layout.inner().update_trackers.clone()); return Some(layout.inner().update_trackers.clone());
} }