Don't do version & block_ref updates in background on deletion

This commit is contained in:
Alex 2020-04-19 20:52:20 +00:00
parent 5ae32972ef
commit 04acaea231
7 changed files with 56 additions and 46 deletions

5
TODO
View File

@ -6,6 +6,11 @@ Finish the thing that sends blocks to other nodes if needed before deleting them
How are we going to test that our replication method works correctly?
We will have to introduce lots of dummy data and then add/remove nodes many times.
Repair:
- re-propagate object table version deletions to version table
- re-propagate version table deletion to block ref table
- re-propagate block ref table to rc
To do list
----------

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use crate::background::*;
use crate::data::*;
use crate::error::Error;
use crate::table::*;
use crate::block::*;
@ -47,20 +48,17 @@ impl TableSchema for BlockRefTable {
type E = BlockRef;
type Filter = ();
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
let block = &old.as_ref().or(new.as_ref()).unwrap().block;
let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false);
let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false);
if is_after && !was_before {
if let Err(e) = self.block_manager.block_incref(block) {
eprintln!("Failed to incref block {:?}: {}", block, e);
}
self.block_manager.block_incref(block)?;
}
if was_before && !is_after {
if let Err(e) = self.block_manager.block_decref(block) {
eprintln!("Failed to decref block {:?}: {}", block, e);
}
self.block_manager.block_decref(block)?;
}
Ok(())
}
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {

View File

@ -1,6 +1,7 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::error::Error;
use crate::table::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
@ -71,7 +72,9 @@ impl TableSchema for BucketTable {
type E = Bucket;
type Filter = ();
async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
Ok(())
}
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
!entry.deleted

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use crate::background::BackgroundRunner;
use crate::data::*;
use crate::error::Error;
use crate::table::*;
use crate::table_sharded::*;
@ -101,30 +102,28 @@ impl TableSchema for ObjectTable {
type E = Object;
type Filter = ();
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
let version_table = self.version_table.clone();
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of old versions
self.background.spawn(async move {
for v in old_v.versions.iter() {
if new_v
.versions
.binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
.is_err()
{
let deleted_version = Version {
uuid: v.uuid.clone(),
deleted: true,
blocks: vec![],
bucket: old_v.bucket.clone(),
key: old_v.key.clone(),
};
version_table.insert(&deleted_version).await?;
}
for v in old_v.versions.iter() {
if new_v
.versions
.binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
.is_err()
{
let deleted_version = Version {
uuid: v.uuid.clone(),
deleted: true,
blocks: vec![],
bucket: old_v.bucket.clone(),
key: old_v.key.clone(),
};
version_table.insert(&deleted_version).await?;
}
Ok(())
});
}
}
Ok(())
}
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {

View File

@ -105,7 +105,7 @@ pub trait TableSchema: Send + Sync {
type E: Entry<Self::P, Self::S>;
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>);
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>;
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
true
}
@ -469,7 +469,7 @@ where
epidemic_propagate.push(new_entry.clone());
}
self.instance.updated(old_entry, Some(new_entry)).await;
self.instance.updated(old_entry, Some(new_entry)).await?;
self.system
.background
.spawn(syncer.clone().invalidate(tree_key));
@ -497,7 +497,7 @@ where
}
if let Some(old_val) = self.store.remove(&key)? {
let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?;
self.instance.updated(Some(old_entry), None).await;
self.instance.updated(Some(old_entry), None).await?;
self.system
.background
.spawn(syncer.clone().invalidate(key.to_vec()));

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use crate::background::BackgroundRunner;
use crate::data::*;
use crate::error::Error;
use crate::table::*;
use crate::table_sharded::*;
@ -67,26 +68,24 @@ impl TableSchema for VersionTable {
type E = Version;
type Filter = ();
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
let block_ref_table = self.block_ref_table.clone();
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks
self.background.spawn(async move {
if new_v.deleted && !old_v.deleted {
let deleted_block_refs = old_v
.blocks
.iter()
.map(|vb| BlockRef {
block: vb.hash.clone(),
version: old_v.uuid.clone(),
deleted: true,
})
.collect::<Vec<_>>();
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
}
Ok(())
});
if new_v.deleted && !old_v.deleted {
let deleted_block_refs = old_v
.blocks
.iter()
.map(|vb| BlockRef {
block: vb.hash.clone(),
version: old_v.uuid.clone(),
deleted: true,
})
.collect::<Vec<_>>();
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
}
}
Ok(())
}
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {

6
test_delete.sh Executable file
View File

@ -0,0 +1,6 @@
#!/bin/bash
for FILE in $(find target/debug/deps); do
curl -v localhost:3900/$FILE -X DELETE -H 'Host: garage'
done