use rand::Rng; use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; use futures::{pin_mut, select}; use futures_util::future::*; use tokio::sync::watch; use tokio::sync::Mutex; use crate::data::*; use crate::error::Error; use crate::membership::Ring; use crate::table::*; const SCAN_INTERVAL: Duration = Duration::from_secs(3600); pub struct TableSyncer { pub table: Arc>, pub todo: Mutex, } pub struct SyncTodo { pub todo: Vec, } #[derive(Debug, Clone)] pub struct Partition { pub begin: Hash, pub end: Hash, pub retain: bool, } impl TableSyncer { pub async fn launch(table: Arc>) -> Arc { let todo = SyncTodo { todo: Vec::new() }; let syncer = Arc::new(TableSyncer { table: table.clone(), todo: Mutex::new(todo), }); let s1 = syncer.clone(); table .system .background .spawn_worker(move |must_exit: watch::Receiver| s1.watcher_task(must_exit)) .await; let s2 = syncer.clone(); table .system .background .spawn_worker(move |must_exit: watch::Receiver| s2.syncer_task(must_exit)) .await; syncer } async fn watcher_task( self: Arc, mut must_exit: watch::Receiver, ) -> Result<(), Error> { self.todo.lock().await.add_full_scan(&self.table); let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); let mut prev_ring: Arc = self.table.system.ring.borrow().clone(); let mut ring_recv: watch::Receiver> = self.table.system.ring.clone(); loop { let s_ring_recv = ring_recv.recv().fuse(); let s_must_exit = must_exit.recv().fuse(); pin_mut!(s_ring_recv, s_must_exit); select! { _ = next_full_scan => { next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); self.todo.lock().await.add_full_scan(&self.table); } new_ring_r = s_ring_recv => { if let Some(new_ring) = new_ring_r { self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring); prev_ring = new_ring; } } must_exit_v = s_must_exit => { if must_exit_v.unwrap_or(false) { return Ok(()) } } } } } async fn syncer_task( self: Arc, mut must_exit: watch::Receiver, ) -> Result<(), Error> { loop { let s_pop_task = self.pop_task().fuse(); let s_must_exit = must_exit.recv().fuse(); pin_mut!(s_must_exit, s_pop_task); select! { task = s_pop_task => { if let Some(partition) = task { let res = self.sync_partition(&partition).await; if let Err(e) = res { eprintln!("Error while syncing {:?}: {}", partition, e); } } else { tokio::time::delay_for(Duration::from_secs(1)).await; } } must_exit_v = s_must_exit => { if must_exit_v.unwrap_or(false) { return Ok(()) } } } } } async fn pop_task(&self) -> Option { self.todo.lock().await.pop_task() } async fn sync_partition(self: &Arc, partition: &Partition) -> Result<(), Error> { eprintln!("NOT IMPLEMENTED: SYNC PARTITION {:?}", partition); Ok(()) } } impl SyncTodo { fn add_full_scan(&mut self, table: &Table) { let my_id = table.system.id.clone(); self.todo.clear(); let ring: Arc = table.system.ring.borrow().clone(); for i in 0..ring.ring.len() { let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor); let begin = ring.ring[i].location.clone(); if i == 0 { self.add_full_scan_aux(table, [0u8; 32].into(), begin.clone(), &nodes[..], &my_id); } if i == ring.ring.len() - 1 { self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id); } else { let end = ring.ring[i + 1].location.clone(); self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id); } } } fn add_full_scan_aux( &mut self, table: &Table, begin: Hash, end: Hash, nodes: &[UUID], my_id: &UUID, ) { let retain = nodes.contains(my_id); if !retain { // Check if we have some data to send, otherwise skip if table .store .range(begin.clone()..end.clone()) .next() .is_none() { return; } } self.todo.push(Partition { begin, end, retain }); } fn add_ring_difference(&mut self, table: &Table, old: &Ring, new: &Ring) { let my_id = table.system.id.clone(); let old_ring = ring_points(old); let new_ring = ring_points(new); let both_ring = old_ring.union(&new_ring).cloned().collect::>(); let prev_todo_begin = self .todo .iter() .map(|x| x.begin.clone()) .collect::>(); let prev_todo_end = self .todo .iter() .map(|x| x.end.clone()) .collect::>(); let prev_todo = prev_todo_begin .union(&prev_todo_end) .cloned() .collect::>(); let all_points = both_ring.union(&prev_todo).cloned().collect::>(); self.todo.sort_by(|x, y| x.begin.cmp(&y.begin)); let mut new_todo = vec![]; for i in 0..all_points.len() - 1 { let begin = all_points[i].clone(); let end = all_points[i + 1].clone(); let was_ours = old .walk_ring(&begin, table.param.replication_factor) .contains(&my_id); let is_ours = new .walk_ring(&begin, table.param.replication_factor) .contains(&my_id); let was_todo = match self.todo.binary_search_by(|x| x.begin.cmp(&begin)) { Ok(_) => true, Err(j) => { (j > 0 && self.todo[j - 1].begin < end && begin < self.todo[j - 1].end) || (j < self.todo.len() && self.todo[j].begin < end && begin < self.todo[j].end) } }; if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) { new_todo.push(Partition { begin, end, retain: is_ours, }); } } self.todo = new_todo; } fn pop_task(&mut self) -> Option { if self.todo.is_empty() { return None; } let i = rand::thread_rng().gen_range::(0, self.todo.len()); if i == self.todo.len() - 1 { self.todo.pop() } else { let replacement = self.todo.pop().unwrap(); let ret = std::mem::replace(&mut self.todo[i], replacement); Some(ret) } } } fn ring_points(ring: &Ring) -> BTreeSet { let mut ret = BTreeSet::new(); ret.insert([0u8; 32].into()); ret.insert([0xFFu8; 32].into()); for i in 0..ring.ring.len() { ret.insert(ring.ring[i].location.clone()); } ret }