diff --git a/src/table/table.rs b/src/table/table.rs index 312415304..018426c40 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -27,7 +27,7 @@ pub struct Table { pub replication: R, pub name: String, - pub rpc_client: Arc>>, + pub(crate) rpc_client: Arc>>, pub system: Arc, pub store: sled::Tree, @@ -35,7 +35,7 @@ pub struct Table { } #[derive(Serialize, Deserialize)] -pub enum TableRPC { +pub(crate) enum TableRPC { Ok, ReadEntry(F::P, F::S), @@ -415,9 +415,7 @@ where } self.instance.updated(old_entry, Some(new_entry)).await?; - self.system - .background - .spawn_cancellable(syncer.clone().invalidate(tree_key)); + syncer.invalidate(&tree_key[..]); } } @@ -431,26 +429,26 @@ where Ok(()) } - pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { - let syncer = self.syncer.load_full().unwrap(); - - debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end); - let mut count: usize = 0; - while let Some((key, _value)) = self.store.get_lt(end.as_slice())? { - if key.as_ref() < begin.as_slice() { - break; - } - if let Some(old_val) = self.store.remove(&key)? { - let old_entry = self.decode_entry(&old_val)?; - self.instance.updated(Some(old_entry), None).await?; - self.system - .background - .spawn_cancellable(syncer.clone().invalidate(key.to_vec())); - count += 1; + pub(crate) async fn delete_if_equal( + self: &Arc, + k: &[u8], + v: &[u8], + ) -> Result { + let removed = self.store.transaction(|txn| { + if let Some(cur_v) = self.store.get(k)? { + if cur_v == v { + txn.remove(v)?; + return Ok(true); + } } + Ok(false) + })?; + if removed { + let old_entry = self.decode_entry(v)?; + self.instance.updated(Some(old_entry), None).await?; + self.syncer.load_full().unwrap().invalidate(k); } - debug!("({}) {} entries deleted", self.name, count); - Ok(()) + Ok(removed) } fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 1a1b328b8..11b1c211a 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -29,14 +29,14 @@ pub struct TableSyncer { } #[derive(Serialize, Deserialize)] -pub enum SyncRPC { +pub(crate) enum SyncRPC { GetRootChecksumRange(Hash, Hash), RootChecksumRange(SyncRange), Checksums(Vec), Difference(Vec, Vec>), } -pub struct SyncTodo { +struct SyncTodo { todo: Vec, } @@ -60,7 +60,7 @@ struct TodoPartition { // (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin) // See RangeChecksum for the struct that stores this information. #[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] -pub struct SyncRange { +pub(crate) struct SyncRange { begin: Vec, end: Vec, level: usize, @@ -81,7 +81,7 @@ impl std::cmp::Ord for SyncRange { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RangeChecksum { +pub(crate) struct RangeChecksum { bounds: SyncRange, children: Vec<(SyncRange, Hash)>, found_limit: Option>, @@ -91,7 +91,7 @@ pub struct RangeChecksum { } #[derive(Debug, Clone)] -pub struct RangeChecksumCache { +struct RangeChecksumCache { hash: Option, // None if no children found_limit: Option>, time: Instant, @@ -102,7 +102,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub async fn launch(table: Arc>) -> Arc { + pub(crate) async fn launch(table: Arc>) -> Arc { let todo = SyncTodo { todo: Vec::new() }; let syncer = Arc::new(TableSyncer { table: table.clone(), @@ -348,15 +348,14 @@ where } // All remote nodes have written those items, now we can delete them locally - for (k, v) in items.iter() { - self.table.store.transaction(|tx_db| { - if let Some(curv) = tx_db.get(k)? { - if curv == &v[..] { - tx_db.remove(&k[..])?; - } - } - Ok(()) - })?; + for was_removed in join_all( + items + .iter() + .map(|(k, v)| self.table.delete_if_equal(&k[..], &v[..])), + ) + .await + { + was_removed?; } Ok(()) @@ -642,7 +641,7 @@ where } } - pub async fn handle_rpc( + pub(crate) async fn handle_rpc( self: &Arc, message: &SyncRPC, mut must_exit: watch::Receiver, @@ -738,7 +737,7 @@ where Ok(SyncRPC::Difference(ret_ranges, ret_items)) } - pub async fn invalidate(self: Arc, item_key: Vec) -> Result<(), Error> { + pub(crate) fn invalidate(self: &Arc, item_key: &[u8]) { for i in 1..MAX_DEPTH { let needle = SyncRange { begin: item_key.to_vec(), @@ -747,14 +746,13 @@ where }; let mut cache = self.cache[i].lock().unwrap(); if let Some(cache_entry) = cache.range(..=needle).rev().next() { - if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key { + if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key { let index = cache_entry.0.clone(); drop(cache_entry); cache.remove(&index); } } } - Ok(()) } }