diff --git a/src/background.rs b/src/background.rs index 772745a6..f4b889ea 100644 --- a/src/background.rs +++ b/src/background.rs @@ -14,7 +14,7 @@ type Job = Pin + Send>>; pub struct BackgroundRunner { n_runners: usize, - stop_signal: watch::Receiver, + pub stop_signal: watch::Receiver, queue_in: mpsc::UnboundedSender<(Job, bool)>, queue_out: Mutex>, diff --git a/src/table.rs b/src/table.rs index 533b4291..99ac77bb 100644 --- a/src/table.rs +++ b/src/table.rs @@ -4,6 +4,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::stream::*; +use tokio::sync::RwLock; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -12,7 +13,7 @@ use crate::error::Error; use crate::membership::System; use crate::proto::*; use crate::rpc_client::*; -use crate::table_sync::TableSyncer; +use crate::table_sync::*; pub struct Table { pub instance: F, @@ -21,6 +22,7 @@ pub struct Table { pub system: Arc, pub store: sled::Tree, + pub syncer: RwLock>>>, pub param: TableReplicationParams, } @@ -59,6 +61,9 @@ pub enum TableRPC { ReadEntryResponse(Option), Update(Vec>), + + SyncChecksums(Vec), + SyncDifferentSet(Vec), } pub trait PartitionKey { @@ -132,8 +137,10 @@ impl Table { system, store, param, + syncer: RwLock::new(None), }); - TableSyncer::launch(table.clone()).await; + let syncer = TableSyncer::launch(table.clone()).await; + *table.syncer.write().await = Some(syncer); table } @@ -309,6 +316,11 @@ impl Table { self.handle_update(pairs).await?; Ok(TableRPC::Ok) } + TableRPC::SyncChecksums(checksums) => { + let syncer = self.syncer.read().await.as_ref().unwrap().clone(); + let differing = syncer.handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()).await?; + Ok(TableRPC::SyncDifferentSet(differing)) + } _ => Err(Error::RPCError(format!("Unexpected table RPC"))), } } @@ -353,6 +365,11 @@ impl Table { Ok(()) } + pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { + // TODO + Ok(()) + } + fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { let mut ret = p.hash().to_vec(); ret.extend(s.sort_key()); diff --git a/src/table_sync.rs b/src/table_sync.rs index 039dab6d..3dd9df33 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,12 +1,15 @@ use rand::Rng; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashMap, VecDeque}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use futures::{pin_mut, select}; +use futures::future::BoxFuture; +use futures_util::stream::*; use futures_util::future::*; use tokio::sync::watch; use tokio::sync::Mutex; +use serde::{Serialize, Deserialize}; use crate::data::*; use crate::error::Error; @@ -14,11 +17,12 @@ use crate::membership::Ring; use crate::table::*; const SCAN_INTERVAL: Duration = Duration::from_secs(3600); +const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); pub struct TableSyncer { pub table: Arc>, - pub todo: Mutex, + pub cache: Vec>>, } pub struct SyncTodo { @@ -32,12 +36,30 @@ pub struct Partition { pub retain: bool, } +#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] +pub struct SyncRange { + pub begin: Vec, + pub end: Vec, + pub level: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RangeChecksum { + pub bounds: SyncRange, + pub children: Vec<(SyncRange, Hash)>, + pub found_limit: Option>, + + #[serde(skip, default="std::time::Instant::now")] + pub time: Instant, +} + impl TableSyncer { pub async fn launch(table: Arc>) -> Arc { let todo = SyncTodo { todo: Vec::new() }; let syncer = Arc::new(TableSyncer { table: table.clone(), todo: Mutex::new(todo), + cache: (0..32).map(|_| Mutex::new(HashMap::new())).collect::>(), }); let s1 = syncer.clone(); @@ -95,38 +117,176 @@ impl TableSyncer { self: Arc, mut must_exit: watch::Receiver, ) -> Result<(), Error> { - loop { - let s_pop_task = self.pop_task().fuse(); - let s_must_exit = must_exit.recv().fuse(); - pin_mut!(s_must_exit, s_pop_task); + while !*must_exit.borrow() { + if let Some(partition) = self.todo.lock().await.pop_task() { + let res = self.clone().sync_partition(&partition, &mut must_exit).await; + if let Err(e) = res { + eprintln!("Error while syncing {:?}: {}", partition, e); + } + } else { + tokio::time::delay_for(Duration::from_secs(1)).await; + } + } + Ok(()) + } - select! { - task = s_pop_task => { - if let Some(partition) = task { - let res = self.sync_partition(&partition).await; - if let Err(e) = res { - eprintln!("Error while syncing {:?}: {}", partition, e); - } - } else { - tokio::time::delay_for(Duration::from_secs(1)).await; + async fn sync_partition(self: Arc, partition: &Partition, must_exit: &mut watch::Receiver) -> Result<(), Error> { + let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?; + eprintln!("Root checksum for {:?}: {:?}", partition, root_cks); + + let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor); + let mut sync_futures = nodes.iter() + .map(|node| self.clone().do_sync_with(root_cks.clone(), node.clone(), must_exit.clone())) + .collect::>(); + + while let Some(r) = sync_futures.next().await { + if let Err(e) = r { + eprintln!("Sync error: {}", e); + } + } + if !partition.retain { + self.table.delete_range(&partition.begin, &partition.end).await?; + } + + Ok(()) + } + + async fn root_checksum(self: &Arc, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver) -> Result { + for i in 1..32 { + let rc = self.range_checksum(&SyncRange{ + begin: begin.to_vec(), + end: end.to_vec(), + level: i, + }, must_exit).await?; + if rc.found_limit.is_none() { + return Ok(rc); + } + } + Err(Error::Message(format!("Unable to compute root checksum (this should never happen"))) + } + + fn range_checksum<'a>(self: &'a Arc, range: &'a SyncRange, must_exit: &'a mut watch::Receiver) -> BoxFuture<'a, Result> { + async move { + let mut cache = self.cache[range.level].lock().await; + if let Some(v) = cache.get(&range) { + if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT { + return Ok(v.clone()); + } + } + cache.remove(&range); + drop(cache); + + let v = self.range_checksum_inner(&range, must_exit).await?; + + let mut cache = self.cache[range.level].lock().await; + eprintln!("Checksum for {:?}: {:?}", range, v); + cache.insert(range.clone(), v.clone()); + Ok(v) + }.boxed() + } + + async fn range_checksum_inner(self: &Arc, range: &SyncRange, must_exit: &mut watch::Receiver) -> Result { + if range.level == 1 { + let mut children = vec![]; + for item in self.table.store.range(range.begin.clone()..range.end.clone()) { + let (key, value) = item?; + let key_hash = hash(&key[..]); + if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) { + return Ok(RangeChecksum{ + bounds: range.clone(), + children, + found_limit: Some(key.to_vec()), + time: Instant::now(), + }) + } + let item_range = SyncRange{ + begin: key.to_vec(), + end: vec![], + level: 0, + }; + children.push((item_range, hash(&value[..]))); + } + Ok(RangeChecksum{ + bounds: range.clone(), + children, + found_limit: None, + time: Instant::now(), + }) + } else { + let mut children = vec![]; + let mut sub_range = SyncRange{ + begin: range.begin.clone(), + end: range.end.clone(), + level: range.level - 1, + }; + let mut time = Instant::now(); + while !*must_exit.borrow() { + let sub_ck = self.range_checksum(&sub_range, must_exit).await?; + + if sub_ck.children.len() > 0 { + let sub_ck_hash = hash(&rmp_to_vec_all_named(&sub_ck)?[..]); + children.push((sub_range.clone(), sub_ck_hash)); + if sub_ck.time < time { + time = sub_ck.time; } } - must_exit_v = s_must_exit => { - if must_exit_v.unwrap_or(false) { - return Ok(()) + + if sub_ck.found_limit.is_none() || sub_ck.children.len() == 0 { + return Ok(RangeChecksum{ + bounds: range.clone(), + children, + found_limit: None, + time, + }); + } + let found_limit = sub_ck.found_limit.unwrap(); + + let actual_limit_hash = hash(&found_limit[..]); + if actual_limit_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) { + return Ok(RangeChecksum{ + bounds: range.clone(), + children, + found_limit: Some(found_limit.clone()), + time, + }); + } + + sub_range.begin = found_limit; + } + Err(Error::Message(format!("Exiting."))) + } + } + + async fn do_sync_with(self: Arc, root_ck: RangeChecksum, who: UUID, mut must_exit: watch::Receiver) -> Result<(), Error> { + let mut todo = VecDeque::new(); + todo.push_back(root_ck); + + while !todo.is_empty() && !*must_exit.borrow() { + let end = std::cmp::min(16, todo.len()); + let step = todo.drain(..end).collect::>(); + unimplemented!() + } + Ok(()) + } + + pub async fn handle_checksum_rpc(self: &Arc, checksums: &[RangeChecksum], mut must_exit: watch::Receiver) -> Result, Error> { + let mut ret = vec![]; + for ckr in checksums.iter() { + let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?; + for (range, hash) in ckr.children.iter() { + match our_ckr.children.binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) { + Err(_) => { + ret.push(range.clone()); + } + Ok(i) => { + if our_ckr.children[i].1 != *hash { + ret.push(range.clone()); + } } } } } - } - - async fn pop_task(&self) -> Option { - self.todo.lock().await.pop_task() - } - - async fn sync_partition(self: &Arc, partition: &Partition) -> Result<(), Error> { - eprintln!("NOT IMPLEMENTED: SYNC PARTITION {:?}", partition); - Ok(()) + Ok(ret) } }