diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 185dbb27..cef56647 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -131,7 +131,8 @@ impl LayoutHistory { pub(crate) fn cleanup_old_versions(&mut self) { let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map); while self.versions.first().as_ref().unwrap().version < min_sync_ack { - self.versions.remove(0); + let removed = self.versions.remove(0); + info!("Layout history: pruning old version {}", removed.version); } } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 7d60bae6..ce8b6f61 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -133,7 +133,7 @@ impl LayoutManager { pub fn sync_table_until(self: &Arc, table_name: &'static str, version: u64) { let mut table_sync_version = self.table_sync_version.lock().unwrap(); *table_sync_version.get_mut(table_name).unwrap() = version; - let sync_until = table_sync_version.iter().map(|(_, v)| *v).max().unwrap(); + let sync_until = table_sync_version.iter().map(|(_, v)| *v).min().unwrap(); drop(table_sync_version); let mut layout = self.layout.write().unwrap(); @@ -142,6 +142,7 @@ impl LayoutManager { .sync_map .set_max(self.node_id, sync_until) { + debug!("sync_until updated to {}", sync_until); layout.update_hashes(); self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( layout.update_trackers.clone(), @@ -277,7 +278,12 @@ impl LayoutManager { self: &Arc, adv: &LayoutHistory, ) -> Result { - debug!("handle_advertise_cluster_layout: {:?}", adv); + debug!( + "handle_advertise_cluster_layout: {} versions, last={}, trackers={:?}", + adv.versions.len(), + adv.current().version, + adv.update_trackers + ); if adv.current().replication_factor != self.replication_factor { let msg = format!( diff --git a/src/table/sync.rs b/src/table/sync.rs index 43636faa..8c21db8b 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -488,8 +488,29 @@ struct SyncWorker { } impl SyncWorker { + fn check_add_full_sync(&mut self) { + let layout_versions = self.syncer.system.cluster_layout().sync_versions(); + if layout_versions != self.layout_versions { + self.layout_versions = layout_versions; + info!( + "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list", + F::TABLE_NAME, + layout_versions.0, + layout_versions.1, + layout_versions.2 + ); + self.add_full_sync(); + } + } + fn add_full_sync(&mut self) { let mut partitions = self.syncer.data.replication.sync_partitions(); + info!( + "{}: Adding full sync for ack layout version {}", + F::TABLE_NAME, + partitions.layout_version + ); + partitions.partitions.shuffle(&mut thread_rng()); self.todo = Some(partitions); self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; @@ -510,6 +531,8 @@ impl Worker for SyncWorker { } async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result { + self.check_add_full_sync(); + if let Some(todo) = &mut self.todo { let partition = todo.partitions.pop().unwrap(); @@ -531,19 +554,23 @@ impl Worker for SyncWorker { return Err(e); } - // done - if !todo.partitions.is_empty() { - return Ok(WorkerState::Busy); + if todo.partitions.is_empty() { + info!( + "{}: Completed full sync for ack layout version {}", + F::TABLE_NAME, + todo.layout_version + ); + self.syncer + .system + .layout_manager + .sync_table_until(F::TABLE_NAME, todo.layout_version); + self.todo = None; } - self.syncer - .system - .layout_manager - .sync_table_until(F::TABLE_NAME, todo.layout_version); + Ok(WorkerState::Busy) + } else { + Ok(WorkerState::Idle) } - - self.todo = None; - Ok(WorkerState::Idle) } async fn wait_for_work(&mut self) -> WorkerState { @@ -554,18 +581,7 @@ impl Worker for SyncWorker { } }, _ = self.layout_notify.notified() => { - let layout_versions = self.syncer.system.cluster_layout().sync_versions(); - if layout_versions != self.layout_versions { - self.layout_versions = layout_versions; - debug!( - "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list", - F::TABLE_NAME, - layout_versions.0, - layout_versions.1, - layout_versions.2 - ); - self.add_full_sync(); - } + self.check_add_full_sync(); }, _ = tokio::time::sleep_until(self.next_full_sync.into()) => { self.add_full_sync();