Handle correctly deletion dues to offloading
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Alex 2021-02-23 19:59:43 +01:00
parent 55156cca9d
commit 28bc967c83
2 changed files with 38 additions and 42 deletions

View File

@ -27,7 +27,7 @@ pub struct Table<F: TableSchema, R: TableReplication> {
pub replication: R, pub replication: R,
pub name: String, pub name: String,
pub rpc_client: Arc<RpcClient<TableRPC<F>>>, pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>,
pub system: Arc<System>, pub system: Arc<System>,
pub store: sled::Tree, pub store: sled::Tree,
@ -35,7 +35,7 @@ pub struct Table<F: TableSchema, R: TableReplication> {
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub enum TableRPC<F: TableSchema> { pub(crate) enum TableRPC<F: TableSchema> {
Ok, Ok,
ReadEntry(F::P, F::S), ReadEntry(F::P, F::S),
@ -415,9 +415,7 @@ where
} }
self.instance.updated(old_entry, Some(new_entry)).await?; self.instance.updated(old_entry, Some(new_entry)).await?;
self.system syncer.invalidate(&tree_key[..]);
.background
.spawn_cancellable(syncer.clone().invalidate(tree_key));
} }
} }
@ -431,26 +429,26 @@ where
Ok(()) Ok(())
} }
pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { pub(crate) async fn delete_if_equal(
let syncer = self.syncer.load_full().unwrap(); self: &Arc<Self>,
k: &[u8],
debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end); v: &[u8],
let mut count: usize = 0; ) -> Result<bool, Error> {
while let Some((key, _value)) = self.store.get_lt(end.as_slice())? { let removed = self.store.transaction(|txn| {
if key.as_ref() < begin.as_slice() { if let Some(cur_v) = self.store.get(k)? {
break; if cur_v == v {
} txn.remove(v)?;
if let Some(old_val) = self.store.remove(&key)? { return Ok(true);
let old_entry = self.decode_entry(&old_val)?; }
self.instance.updated(Some(old_entry), None).await?;
self.system
.background
.spawn_cancellable(syncer.clone().invalidate(key.to_vec()));
count += 1;
} }
Ok(false)
})?;
if removed {
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None).await?;
self.syncer.load_full().unwrap().invalidate(k);
} }
debug!("({}) {} entries deleted", self.name, count); Ok(removed)
Ok(())
} }
fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {

View File

@ -29,14 +29,14 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> {
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub enum SyncRPC { pub(crate) enum SyncRPC {
GetRootChecksumRange(Hash, Hash), GetRootChecksumRange(Hash, Hash),
RootChecksumRange(SyncRange), RootChecksumRange(SyncRange),
Checksums(Vec<RangeChecksum>), Checksums(Vec<RangeChecksum>),
Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>), Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
} }
pub struct SyncTodo { struct SyncTodo {
todo: Vec<TodoPartition>, todo: Vec<TodoPartition>,
} }
@ -60,7 +60,7 @@ struct TodoPartition {
// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin) // (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin)
// See RangeChecksum for the struct that stores this information. // See RangeChecksum for the struct that stores this information.
#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] #[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
pub struct SyncRange { pub(crate) struct SyncRange {
begin: Vec<u8>, begin: Vec<u8>,
end: Vec<u8>, end: Vec<u8>,
level: usize, level: usize,
@ -81,7 +81,7 @@ impl std::cmp::Ord for SyncRange {
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RangeChecksum { pub(crate) struct RangeChecksum {
bounds: SyncRange, bounds: SyncRange,
children: Vec<(SyncRange, Hash)>, children: Vec<(SyncRange, Hash)>,
found_limit: Option<Vec<u8>>, found_limit: Option<Vec<u8>>,
@ -91,7 +91,7 @@ pub struct RangeChecksum {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RangeChecksumCache { struct RangeChecksumCache {
hash: Option<Hash>, // None if no children hash: Option<Hash>, // None if no children
found_limit: Option<Vec<u8>>, found_limit: Option<Vec<u8>>,
time: Instant, time: Instant,
@ -102,7 +102,7 @@ where
F: TableSchema + 'static, F: TableSchema + 'static,
R: TableReplication + 'static, R: TableReplication + 'static,
{ {
pub async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { pub(crate) async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
let todo = SyncTodo { todo: Vec::new() }; let todo = SyncTodo { todo: Vec::new() };
let syncer = Arc::new(TableSyncer { let syncer = Arc::new(TableSyncer {
table: table.clone(), table: table.clone(),
@ -348,15 +348,14 @@ where
} }
// All remote nodes have written those items, now we can delete them locally // All remote nodes have written those items, now we can delete them locally
for (k, v) in items.iter() { for was_removed in join_all(
self.table.store.transaction(|tx_db| { items
if let Some(curv) = tx_db.get(k)? { .iter()
if curv == &v[..] { .map(|(k, v)| self.table.delete_if_equal(&k[..], &v[..])),
tx_db.remove(&k[..])?; )
} .await
} {
Ok(()) was_removed?;
})?;
} }
Ok(()) Ok(())
@ -642,7 +641,7 @@ where
} }
} }
pub async fn handle_rpc( pub(crate) async fn handle_rpc(
self: &Arc<Self>, self: &Arc<Self>,
message: &SyncRPC, message: &SyncRPC,
mut must_exit: watch::Receiver<bool>, mut must_exit: watch::Receiver<bool>,
@ -738,7 +737,7 @@ where
Ok(SyncRPC::Difference(ret_ranges, ret_items)) Ok(SyncRPC::Difference(ret_ranges, ret_items))
} }
pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> { pub(crate) fn invalidate(self: &Arc<Self>, item_key: &[u8]) {
for i in 1..MAX_DEPTH { for i in 1..MAX_DEPTH {
let needle = SyncRange { let needle = SyncRange {
begin: item_key.to_vec(), begin: item_key.to_vec(),
@ -747,14 +746,13 @@ where
}; };
let mut cache = self.cache[i].lock().unwrap(); let mut cache = self.cache[i].lock().unwrap();
if let Some(cache_entry) = cache.range(..=needle).rev().next() { if let Some(cache_entry) = cache.range(..=needle).rev().next() {
if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key { if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key {
let index = cache_entry.0.clone(); let index = cache_entry.0.clone();
drop(cache_entry); drop(cache_entry);
cache.remove(&index); cache.remove(&index);
} }
} }
} }
Ok(())
} }
} }