Refactor db transactions and add on_commit for table.queue_insert #637

Merged
lx merged 2 commits from k2v-indices-lmdb into next 2023-09-21 14:03:36 +00:00
14 changed files with 91 additions and 87 deletions

View file

@ -56,7 +56,7 @@ impl BlockRc {
/// deletion time has passed /// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec(); let now = now_msec();
self.rc.db().transaction(|mut tx| { self.rc.db().transaction(|tx| {
let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?); let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
match rcval { match rcval {
RcEntry::Deletable { at_time } if now > at_time => { RcEntry::Deletable { at_time } if now > at_time => {
@ -64,7 +64,7 @@ impl BlockRc {
} }
_ => (), _ => (),
}; };
tx.commit(()) Ok(())
})?; })?;
Ok(()) Ok(())
} }

View file

@ -85,7 +85,7 @@ impl CountedTree {
let old_some = expected_old.is_some(); let old_some = expected_old.is_some();
let new_some = new.is_some(); let new_some = new.is_some();
let tx_res = self.0.tree.db().transaction(|mut tx| { let tx_res = self.0.tree.db().transaction(|tx| {
let old_val = tx.get(&self.0.tree, &key)?; let old_val = tx.get(&self.0.tree, &key)?;
let is_same = match (&old_val, &expected_old) { let is_same = match (&old_val, &expected_old) {
(None, None) => true, (None, None) => true,
@ -101,9 +101,9 @@ impl CountedTree {
tx.remove(&self.0.tree, &key)?; tx.remove(&self.0.tree, &key)?;
} }
} }
tx.commit(()) Ok(())
} else { } else {
tx.abort(()) Err(TxError::Abort(()))
} }
}); });

View file

@ -22,10 +22,15 @@ use std::sync::Arc;
use err_derive::Error; use err_derive::Error;
pub(crate) type OnCommit = Vec<Box<dyn FnOnce()>>;
#[derive(Clone)] #[derive(Clone)]
pub struct Db(pub(crate) Arc<dyn IDb>); pub struct Db(pub(crate) Arc<dyn IDb>);
pub struct Transaction<'a>(&'a mut dyn ITx); pub struct Transaction<'a> {
tx: &'a mut dyn ITx,
on_commit: OnCommit,
}
#[derive(Clone)] #[derive(Clone)]
pub struct Tree(Arc<dyn IDb>, usize); pub struct Tree(Arc<dyn IDb>, usize);
@ -85,7 +90,7 @@ impl Db {
pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E> pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
where where
F: Fn(Transaction<'_>) -> TxResult<R, E>, F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{ {
let f = TxFn { let f = TxFn {
function: fun, function: fun,
@ -98,14 +103,17 @@ impl Db {
.expect("Transaction did not store result"); .expect("Transaction did not store result");
match tx_res { match tx_res {
Ok(()) => { Ok(on_commit) => match ret {
assert!(matches!(ret, Ok(_))); Ok(value) => {
ret on_commit.into_iter().for_each(|f| f());
} Ok(value)
Err(TxError::Abort(())) => { }
assert!(matches!(ret, Err(TxError::Abort(_)))); _ => unreachable!(),
ret },
} Err(TxError::Abort(())) => match ret {
Err(TxError::Abort(e)) => Err(TxError::Abort(e)),
_ => unreachable!(),
},
Err(TxError::Db(e2)) => match ret { Err(TxError::Db(e2)) => match ret {
// Ok was stored -> the error occured when finalizing // Ok was stored -> the error occured when finalizing
// transaction // transaction
@ -139,7 +147,7 @@ impl Db {
let ex_tree = other.open_tree(&name)?; let ex_tree = other.open_tree(&name)?;
let tx_res = self.transaction(|mut tx| { let tx_res = self.transaction(|tx| {
let mut i = 0; let mut i = 0;
for item in ex_tree.iter().map_err(TxError::Abort)? { for item in ex_tree.iter().map_err(TxError::Abort)? {
let (k, v) = item.map_err(TxError::Abort)?; let (k, v) = item.map_err(TxError::Abort)?;
@ -149,7 +157,7 @@ impl Db {
println!("{}: imported {}", name, i); println!("{}: imported {}", name, i);
} }
} }
tx.commit(i) Ok(i)
}); });
let total = match tx_res { let total = match tx_res {
Err(TxError::Db(e)) => return Err(e), Err(TxError::Db(e)) => return Err(e),
@ -249,11 +257,11 @@ impl Tree {
impl<'a> Transaction<'a> { impl<'a> Transaction<'a> {
#[inline] #[inline]
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> { pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
self.0.get(tree.1, key.as_ref()) self.tx.get(tree.1, key.as_ref())
} }
#[inline] #[inline]
pub fn len(&self, tree: &Tree) -> TxOpResult<usize> { pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
self.0.len(tree.1) self.tx.len(tree.1)
} }
/// Returns the old value if there was one /// Returns the old value if there was one
@ -264,21 +272,21 @@ impl<'a> Transaction<'a> {
key: T, key: T,
value: U, value: U,
) -> TxOpResult<Option<Value>> { ) -> TxOpResult<Option<Value>> {
self.0.insert(tree.1, key.as_ref(), value.as_ref()) self.tx.insert(tree.1, key.as_ref(), value.as_ref())
} }
/// Returns the old value if there was one /// Returns the old value if there was one
#[inline] #[inline]
pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> { pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
self.0.remove(tree.1, key.as_ref()) self.tx.remove(tree.1, key.as_ref())
} }
#[inline] #[inline]
pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> { pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
self.0.iter(tree.1) self.tx.iter(tree.1)
} }
#[inline] #[inline]
pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> { pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
self.0.iter_rev(tree.1) self.tx.iter_rev(tree.1)
} }
#[inline] #[inline]
@ -289,7 +297,7 @@ impl<'a> Transaction<'a> {
{ {
let sb = range.start_bound(); let sb = range.start_bound();
let eb = range.end_bound(); let eb = range.end_bound();
self.0.range(tree.1, get_bound(sb), get_bound(eb)) self.tx.range(tree.1, get_bound(sb), get_bound(eb))
} }
#[inline] #[inline]
pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>> pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
@ -299,19 +307,12 @@ impl<'a> Transaction<'a> {
{ {
let sb = range.start_bound(); let sb = range.start_bound();
let eb = range.end_bound(); let eb = range.end_bound();
self.0.range_rev(tree.1, get_bound(sb), get_bound(eb)) self.tx.range_rev(tree.1, get_bound(sb), get_bound(eb))
}
// ----
#[inline]
pub fn abort<R, E>(self, e: E) -> TxResult<R, E> {
Err(TxError::Abort(e))
} }
#[inline] #[inline]
pub fn commit<R, E>(self, r: R) -> TxResult<R, E> { pub fn on_commit<F: FnOnce() + 'static>(&mut self, f: F) {
Ok(r) self.on_commit.push(Box::new(f));
} }
} }
@ -348,7 +349,7 @@ pub(crate) trait IDb: Send + Sync {
high: Bound<&'r [u8]>, high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>>; ) -> Result<ValueIter<'_>>;
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>; fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()>;
} }
pub(crate) trait ITx { pub(crate) trait ITx {
@ -380,14 +381,14 @@ pub(crate) trait ITxFn {
} }
pub(crate) enum TxFnResult { pub(crate) enum TxFnResult {
Ok, Ok(OnCommit),
Abort, Abort,
DbErr, DbErr,
} }
struct TxFn<F, R, E> struct TxFn<F, R, E>
where where
F: Fn(Transaction<'_>) -> TxResult<R, E>, F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{ {
function: F, function: F,
result: Cell<Option<TxResult<R, E>>>, result: Cell<Option<TxResult<R, E>>>,
@ -395,12 +396,16 @@ where
impl<F, R, E> ITxFn for TxFn<F, R, E> impl<F, R, E> ITxFn for TxFn<F, R, E>
where where
F: Fn(Transaction<'_>) -> TxResult<R, E>, F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{ {
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult { fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
let res = (self.function)(Transaction(tx)); let mut tx = Transaction {
tx,
on_commit: vec![],
};
let res = (self.function)(&mut tx);
let res2 = match &res { let res2 = match &res {
Ok(_) => TxFnResult::Ok, Ok(_) => TxFnResult::Ok(tx.on_commit),
Err(TxError::Abort(_)) => TxFnResult::Abort, Err(TxError::Abort(_)) => TxFnResult::Abort,
Err(TxError::Db(_)) => TxFnResult::DbErr, Err(TxError::Db(_)) => TxFnResult::DbErr,
}; };

View file

@ -9,8 +9,8 @@ use heed::types::ByteSlice;
use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database}; use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database};
use crate::{ use crate::{
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
TxValueIter, Value, ValueIter, TxResult, TxValueIter, Value, ValueIter,
}; };
pub use heed; pub use heed;
@ -186,7 +186,7 @@ impl IDb for LmdbDb {
// ---- // ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
let trees = self.trees.read().unwrap(); let trees = self.trees.read().unwrap();
let mut tx = LmdbTx { let mut tx = LmdbTx {
trees: &trees.0[..], trees: &trees.0[..],
@ -199,9 +199,9 @@ impl IDb for LmdbDb {
let res = f.try_on(&mut tx); let res = f.try_on(&mut tx);
match res { match res {
TxFnResult::Ok => { TxFnResult::Ok(on_commit) => {
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?; tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
Ok(()) Ok(on_commit)
} }
TxFnResult::Abort => { TxFnResult::Abort => {
tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?; tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;

View file

@ -10,8 +10,8 @@ use sled::transaction::{
}; };
use crate::{ use crate::{
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
TxValueIter, Value, ValueIter, TxResult, TxValueIter, Value, ValueIter,
}; };
pub use sled; pub use sled;
@ -166,7 +166,7 @@ impl IDb for SledDb {
// ---- // ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
let trees = self.trees.read().unwrap(); let trees = self.trees.read().unwrap();
let res = trees.0.transaction(|txtrees| { let res = trees.0.transaction(|txtrees| {
let mut tx = SledTx { let mut tx = SledTx {
@ -174,9 +174,9 @@ impl IDb for SledDb {
err: Cell::new(None), err: Cell::new(None),
}; };
match f.try_on(&mut tx) { match f.try_on(&mut tx) {
TxFnResult::Ok => { TxFnResult::Ok(on_commit) => {
assert!(tx.err.into_inner().is_none()); assert!(tx.err.into_inner().is_none());
Ok(()) Ok(on_commit)
} }
TxFnResult::Abort => { TxFnResult::Abort => {
assert!(tx.err.into_inner().is_none()); assert!(tx.err.into_inner().is_none());
@ -189,7 +189,7 @@ impl IDb for SledDb {
} }
}); });
match res { match res {
Ok(()) => Ok(()), Ok(on_commit) => Ok(on_commit),
Err(TransactionError::Abort(())) => Err(TxError::Abort(())), Err(TransactionError::Abort(())) => Err(TxError::Abort(())),
Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())), Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())),
} }

View file

@ -9,8 +9,8 @@ use std::sync::{Arc, Mutex, MutexGuard};
use rusqlite::{params, Connection, Rows, Statement, Transaction}; use rusqlite::{params, Connection, Rows, Statement, Transaction};
use crate::{ use crate::{
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
TxValueIter, Value, ValueIter, TxResult, TxValueIter, Value, ValueIter,
}; };
pub use rusqlite; pub use rusqlite;
@ -261,7 +261,7 @@ impl IDb for SqliteDb {
// ---- // ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
trace!("transaction: lock db"); trace!("transaction: lock db");
let mut this = self.0.lock().unwrap(); let mut this = self.0.lock().unwrap();
trace!("transaction: lock acquired"); trace!("transaction: lock acquired");
@ -277,9 +277,9 @@ impl IDb for SqliteDb {
trees: &this_mut_ref.trees, trees: &this_mut_ref.trees,
}; };
let res = match f.try_on(&mut tx) { let res = match f.try_on(&mut tx) {
TxFnResult::Ok => { TxFnResult::Ok(on_commit) => {
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?; tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
Ok(()) Ok(on_commit)
} }
TxFnResult::Abort => { TxFnResult::Abort => {
tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?; tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;

View file

@ -13,26 +13,26 @@ fn test_suite(db: Db) {
assert!(tree.insert(ka, va).unwrap().is_none()); assert!(tree.insert(ka, va).unwrap().is_none());
assert_eq!(tree.get(ka).unwrap().unwrap(), va); assert_eq!(tree.get(ka).unwrap().unwrap(), va);
let res = db.transaction::<_, (), _>(|mut tx| { let res = db.transaction::<_, (), _>(|tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va); assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
assert_eq!(tx.insert(&tree, ka, vb).unwrap().unwrap(), va); assert_eq!(tx.insert(&tree, ka, vb).unwrap().unwrap(), va);
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb); assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
tx.commit(12) Ok(12)
}); });
assert!(matches!(res, Ok(12))); assert!(matches!(res, Ok(12)));
assert_eq!(tree.get(ka).unwrap().unwrap(), vb); assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
let res = db.transaction::<(), _, _>(|mut tx| { let res = db.transaction::<(), _, _>(|tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb); assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
assert_eq!(tx.insert(&tree, ka, vc).unwrap().unwrap(), vb); assert_eq!(tx.insert(&tree, ka, vc).unwrap().unwrap(), vb);
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vc); assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vc);
tx.abort(42) Err(TxError::Abort(42))
}); });
assert!(matches!(res, Err(TxError::Abort(42)))); assert!(matches!(res, Err(TxError::Abort(42))));
assert_eq!(tree.get(ka).unwrap().unwrap(), vb); assert_eq!(tree.get(ka).unwrap().unwrap(), vb);

View file

@ -52,7 +52,7 @@ impl Instance {
r#" r#"
metadata_dir = "{path}/meta" metadata_dir = "{path}/meta"
data_dir = "{path}/data" data_dir = "{path}/data"
db_engine = "sled" db_engine = "lmdb"
replication_mode = "1" replication_mode = "1"

View file

@ -44,6 +44,7 @@ async fn test_items_and_indices() {
let content = format!("{}: hello world", sk).into_bytes(); let content = format!("{}: hello world", sk).into_bytes();
let content2 = format!("{}: hello universe", sk).into_bytes(); let content2 = format!("{}: hello universe", sk).into_bytes();
let content3 = format!("{}: concurrent value", sk).into_bytes(); let content3 = format!("{}: concurrent value", sk).into_bytes();
eprintln!("test iteration {}: {}", i, sk);
// Put initially, no causality token // Put initially, no causality token
let res = ctx let res = ctx
@ -89,7 +90,7 @@ async fn test_items_and_indices() {
assert_eq!(res_body, content); assert_eq!(res_body, content);
// ReadIndex -- now there should be some stuff // ReadIndex -- now there should be some stuff
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx let res = ctx
.k2v .k2v
.request .request
@ -158,7 +159,7 @@ async fn test_items_and_indices() {
assert_eq!(res_body, content2); assert_eq!(res_body, content2);
// ReadIndex -- now there should be some stuff // ReadIndex -- now there should be some stuff
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx let res = ctx
.k2v .k2v
.request .request
@ -230,7 +231,7 @@ async fn test_items_and_indices() {
); );
// ReadIndex -- now there should be some stuff // ReadIndex -- now there should be some stuff
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx let res = ctx
.k2v .k2v
.request .request
@ -299,7 +300,7 @@ async fn test_items_and_indices() {
assert_eq!(res.status(), StatusCode::NO_CONTENT); assert_eq!(res.status(), StatusCode::NO_CONTENT);
// ReadIndex -- now there should be some stuff // ReadIndex -- now there should be some stuff
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx let res = ctx
.k2v .k2v
.request .request

View file

@ -294,7 +294,7 @@ impl<T: CountedItem> IndexCounter<T> {
let counter_entry = local_counter.into_counter_entry(self.this_node); let counter_entry = local_counter.into_counter_entry(self.this_node);
self.local_counter self.local_counter
.db() .db()
.transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; .transaction(|tx| self.table.queue_insert(tx, &counter_entry))?;
next_start = Some(local_counter_k); next_start = Some(local_counter_k);
} }
@ -360,7 +360,7 @@ impl<T: CountedItem> IndexCounter<T> {
let counter_entry = local_counter.into_counter_entry(self.this_node); let counter_entry = local_counter.into_counter_entry(self.this_node);
self.local_counter self.local_counter
.db() .db()
.transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; .transaction(|tx| self.table.queue_insert(tx, &counter_entry))?;
next_start = Some(counted_entry_k); next_start = Some(counted_entry_k);
} }

View file

@ -330,9 +330,7 @@ async fn process_object(
"Lifecycle: expiring 1 object in bucket {:?}", "Lifecycle: expiring 1 object in bucket {:?}",
object.bucket_id object.bucket_id
); );
db.transaction(|mut tx| { db.transaction(|tx| garage.object_table.queue_insert(tx, &deleted_object))?;
garage.object_table.queue_insert(&mut tx, &deleted_object)
})?;
*objects_expired += 1; *objects_expired += 1;
} }
} }
@ -365,9 +363,7 @@ async fn process_object(
); );
let aborted_object = let aborted_object =
Object::new(object.bucket_id, object.key.clone(), aborted_versions); Object::new(object.bucket_id, object.key.clone(), aborted_versions);
db.transaction(|mut tx| { db.transaction(|tx| garage.object_table.queue_insert(tx, &aborted_object))?;
garage.object_table.queue_insert(&mut tx, &aborted_object)
})?;
*mpu_aborted += n_aborted; *mpu_aborted += n_aborted;
} }
} }

View file

@ -34,7 +34,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_todo_notify: Notify, pub(crate) merkle_todo_notify: Notify,
pub(crate) insert_queue: db::Tree, pub(crate) insert_queue: db::Tree,
pub(crate) insert_queue_notify: Notify, pub(crate) insert_queue_notify: Arc<Notify>,
pub(crate) gc_todo: CountedTree, pub(crate) gc_todo: CountedTree,
@ -80,7 +80,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
merkle_todo, merkle_todo,
merkle_todo_notify: Notify::new(), merkle_todo_notify: Notify::new(),
insert_queue, insert_queue,
insert_queue_notify: Notify::new(), insert_queue_notify: Arc::new(Notify::new()),
gc_todo, gc_todo,
metrics, metrics,
}) })
@ -203,14 +203,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
) -> Result<Option<F::E>, Error> { ) -> Result<Option<F::E>, Error> {
let tree_key = self.tree_key(partition_key, sort_key); let tree_key = self.tree_key(partition_key, sort_key);
let changed = self.store.db().transaction(|mut tx| { let changed = self.store.db().transaction(|tx| {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => { Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?; let new_entry = update_fn(tx, Some(old_entry.clone()))?;
(Some(old_entry), Some(old_bytes), new_entry) (Some(old_entry), Some(old_bytes), new_entry)
} }
None => (None, None, update_fn(&mut tx, None)?), None => (None, None, update_fn(tx, None)?),
}; };
// Changed can be true in two scenarios // Changed can be true in two scenarios
@ -233,7 +233,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
tx.insert(&self.store, &tree_key, new_bytes)?; tx.insert(&self.store, &tree_key, new_bytes)?;
self.instance self.instance
.updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?; .updated(tx, old_entry.as_ref(), Some(&new_entry))?;
Ok(Some((new_entry, new_bytes_hash))) Ok(Some((new_entry, new_bytes_hash)))
} else { } else {
@ -270,14 +270,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let removed = self let removed = self
.store .store
.db() .db()
.transaction(|mut tx| match tx.get(&self.store, k)? { .transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if cur_v == v => { Some(cur_v) if cur_v == v => {
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?; tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?; tx.insert(&self.merkle_todo, k, vec![])?;
self.instance.updated(&mut tx, Some(&old_entry), None)?; self.instance.updated(tx, Some(&old_entry), None)?;
Ok(true) Ok(true)
} }
_ => Ok(false), _ => Ok(false),
@ -298,14 +298,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let removed = self let removed = self
.store .store
.db() .db()
.transaction(|mut tx| match tx.get(&self.store, k)? { .transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?; tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?; tx.insert(&self.merkle_todo, k, vec![])?;
self.instance.updated(&mut tx, Some(&old_entry), None)?; self.instance.updated(tx, Some(&old_entry), None)?;
Ok(true) Ok(true)
} }
_ => Ok(false), _ => Ok(false),
@ -339,7 +339,9 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
.map_err(Error::RmpEncode) .map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?; .map_err(db::TxError::Abort)?;
tx.insert(&self.insert_queue, &tree_key, new_entry)?; tx.insert(&self.insert_queue, &tree_key, new_entry)?;
self.insert_queue_notify.notify_one();
let notif = self.insert_queue_notify.clone();
tx.on_commit(move || notif.notify_one());
Ok(()) Ok(())
} }

View file

@ -108,9 +108,9 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
self.data self.data
.merkle_tree .merkle_tree
.db() .db()
.transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?; .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
let deleted = self.data.merkle_todo.db().transaction(|mut tx| { let deleted = self.data.merkle_todo.db().transaction(|tx| {
let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by); let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
if remove { if remove {
tx.remove(&self.data.merkle_todo, k)?; tx.remove(&self.data.merkle_todo, k)?;

View file

@ -53,7 +53,7 @@ impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
self.0.insert_many(values).await?; self.0.insert_many(values).await?;
self.0.data.insert_queue.db().transaction(|mut tx| { self.0.data.insert_queue.db().transaction(|tx| {
for (k, v) in kv_pairs.iter() { for (k, v) in kv_pairs.iter() {
if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? { if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? {
if &v2 == v { if &v2 == v {