Refactor db transactions and add on_commit for table.queue_insert #637
14 changed files with 91 additions and 87 deletions
|
@ -56,7 +56,7 @@ impl BlockRc {
|
|||
/// deletion time has passed
|
||||
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
|
||||
let now = now_msec();
|
||||
self.rc.db().transaction(|mut tx| {
|
||||
self.rc.db().transaction(|tx| {
|
||||
let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
|
||||
match rcval {
|
||||
RcEntry::Deletable { at_time } if now > at_time => {
|
||||
|
@ -64,7 +64,7 @@ impl BlockRc {
|
|||
}
|
||||
_ => (),
|
||||
};
|
||||
tx.commit(())
|
||||
Ok(())
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -85,7 +85,7 @@ impl CountedTree {
|
|||
let old_some = expected_old.is_some();
|
||||
let new_some = new.is_some();
|
||||
|
||||
let tx_res = self.0.tree.db().transaction(|mut tx| {
|
||||
let tx_res = self.0.tree.db().transaction(|tx| {
|
||||
let old_val = tx.get(&self.0.tree, &key)?;
|
||||
let is_same = match (&old_val, &expected_old) {
|
||||
(None, None) => true,
|
||||
|
@ -101,9 +101,9 @@ impl CountedTree {
|
|||
tx.remove(&self.0.tree, &key)?;
|
||||
}
|
||||
}
|
||||
tx.commit(())
|
||||
Ok(())
|
||||
} else {
|
||||
tx.abort(())
|
||||
Err(TxError::Abort(()))
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -22,10 +22,15 @@ use std::sync::Arc;
|
|||
|
||||
use err_derive::Error;
|
||||
|
||||
pub(crate) type OnCommit = Vec<Box<dyn FnOnce()>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Db(pub(crate) Arc<dyn IDb>);
|
||||
|
||||
pub struct Transaction<'a>(&'a mut dyn ITx);
|
||||
pub struct Transaction<'a> {
|
||||
tx: &'a mut dyn ITx,
|
||||
on_commit: OnCommit,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Tree(Arc<dyn IDb>, usize);
|
||||
|
@ -85,7 +90,7 @@ impl Db {
|
|||
|
||||
pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
|
||||
where
|
||||
F: Fn(Transaction<'_>) -> TxResult<R, E>,
|
||||
F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
|
||||
{
|
||||
let f = TxFn {
|
||||
function: fun,
|
||||
|
@ -98,14 +103,17 @@ impl Db {
|
|||
.expect("Transaction did not store result");
|
||||
|
||||
match tx_res {
|
||||
Ok(()) => {
|
||||
assert!(matches!(ret, Ok(_)));
|
||||
ret
|
||||
}
|
||||
Err(TxError::Abort(())) => {
|
||||
assert!(matches!(ret, Err(TxError::Abort(_))));
|
||||
ret
|
||||
Ok(on_commit) => match ret {
|
||||
Ok(value) => {
|
||||
on_commit.into_iter().for_each(|f| f());
|
||||
Ok(value)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
},
|
||||
Err(TxError::Abort(())) => match ret {
|
||||
Err(TxError::Abort(e)) => Err(TxError::Abort(e)),
|
||||
_ => unreachable!(),
|
||||
},
|
||||
Err(TxError::Db(e2)) => match ret {
|
||||
// Ok was stored -> the error occured when finalizing
|
||||
// transaction
|
||||
|
@ -139,7 +147,7 @@ impl Db {
|
|||
|
||||
let ex_tree = other.open_tree(&name)?;
|
||||
|
||||
let tx_res = self.transaction(|mut tx| {
|
||||
let tx_res = self.transaction(|tx| {
|
||||
let mut i = 0;
|
||||
for item in ex_tree.iter().map_err(TxError::Abort)? {
|
||||
let (k, v) = item.map_err(TxError::Abort)?;
|
||||
|
@ -149,7 +157,7 @@ impl Db {
|
|||
println!("{}: imported {}", name, i);
|
||||
}
|
||||
}
|
||||
tx.commit(i)
|
||||
Ok(i)
|
||||
});
|
||||
let total = match tx_res {
|
||||
Err(TxError::Db(e)) => return Err(e),
|
||||
|
@ -249,11 +257,11 @@ impl Tree {
|
|||
impl<'a> Transaction<'a> {
|
||||
#[inline]
|
||||
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
|
||||
self.0.get(tree.1, key.as_ref())
|
||||
self.tx.get(tree.1, key.as_ref())
|
||||
}
|
||||
#[inline]
|
||||
pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
|
||||
self.0.len(tree.1)
|
||||
self.tx.len(tree.1)
|
||||
}
|
||||
|
||||
/// Returns the old value if there was one
|
||||
|
@ -264,21 +272,21 @@ impl<'a> Transaction<'a> {
|
|||
key: T,
|
||||
value: U,
|
||||
) -> TxOpResult<Option<Value>> {
|
||||
self.0.insert(tree.1, key.as_ref(), value.as_ref())
|
||||
self.tx.insert(tree.1, key.as_ref(), value.as_ref())
|
||||
}
|
||||
/// Returns the old value if there was one
|
||||
#[inline]
|
||||
pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
|
||||
self.0.remove(tree.1, key.as_ref())
|
||||
self.tx.remove(tree.1, key.as_ref())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
|
||||
self.0.iter(tree.1)
|
||||
self.tx.iter(tree.1)
|
||||
}
|
||||
#[inline]
|
||||
pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
|
||||
self.0.iter_rev(tree.1)
|
||||
self.tx.iter_rev(tree.1)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -289,7 +297,7 @@ impl<'a> Transaction<'a> {
|
|||
{
|
||||
let sb = range.start_bound();
|
||||
let eb = range.end_bound();
|
||||
self.0.range(tree.1, get_bound(sb), get_bound(eb))
|
||||
self.tx.range(tree.1, get_bound(sb), get_bound(eb))
|
||||
}
|
||||
#[inline]
|
||||
pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
|
||||
|
@ -299,19 +307,12 @@ impl<'a> Transaction<'a> {
|
|||
{
|
||||
let sb = range.start_bound();
|
||||
let eb = range.end_bound();
|
||||
self.0.range_rev(tree.1, get_bound(sb), get_bound(eb))
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
#[inline]
|
||||
pub fn abort<R, E>(self, e: E) -> TxResult<R, E> {
|
||||
Err(TxError::Abort(e))
|
||||
self.tx.range_rev(tree.1, get_bound(sb), get_bound(eb))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn commit<R, E>(self, r: R) -> TxResult<R, E> {
|
||||
Ok(r)
|
||||
pub fn on_commit<F: FnOnce() + 'static>(&mut self, f: F) {
|
||||
self.on_commit.push(Box::new(f));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -348,7 +349,7 @@ pub(crate) trait IDb: Send + Sync {
|
|||
high: Bound<&'r [u8]>,
|
||||
) -> Result<ValueIter<'_>>;
|
||||
|
||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
|
||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()>;
|
||||
}
|
||||
|
||||
pub(crate) trait ITx {
|
||||
|
@ -380,14 +381,14 @@ pub(crate) trait ITxFn {
|
|||
}
|
||||
|
||||
pub(crate) enum TxFnResult {
|
||||
Ok,
|
||||
Ok(OnCommit),
|
||||
Abort,
|
||||
DbErr,
|
||||
}
|
||||
|
||||
struct TxFn<F, R, E>
|
||||
where
|
||||
F: Fn(Transaction<'_>) -> TxResult<R, E>,
|
||||
F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
|
||||
{
|
||||
function: F,
|
||||
result: Cell<Option<TxResult<R, E>>>,
|
||||
|
@ -395,12 +396,16 @@ where
|
|||
|
||||
impl<F, R, E> ITxFn for TxFn<F, R, E>
|
||||
where
|
||||
F: Fn(Transaction<'_>) -> TxResult<R, E>,
|
||||
F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
|
||||
{
|
||||
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
|
||||
let res = (self.function)(Transaction(tx));
|
||||
let mut tx = Transaction {
|
||||
tx,
|
||||
on_commit: vec![],
|
||||
};
|
||||
let res = (self.function)(&mut tx);
|
||||
let res2 = match &res {
|
||||
Ok(_) => TxFnResult::Ok,
|
||||
Ok(_) => TxFnResult::Ok(tx.on_commit),
|
||||
Err(TxError::Abort(_)) => TxFnResult::Abort,
|
||||
Err(TxError::Db(_)) => TxFnResult::DbErr,
|
||||
};
|
||||
|
|
|
@ -9,8 +9,8 @@ use heed::types::ByteSlice;
|
|||
use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database};
|
||||
|
||||
use crate::{
|
||||
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
|
||||
TxValueIter, Value, ValueIter,
|
||||
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
|
||||
TxResult, TxValueIter, Value, ValueIter,
|
||||
};
|
||||
|
||||
pub use heed;
|
||||
|
@ -186,7 +186,7 @@ impl IDb for LmdbDb {
|
|||
|
||||
// ----
|
||||
|
||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
|
||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
|
||||
let trees = self.trees.read().unwrap();
|
||||
let mut tx = LmdbTx {
|
||||
trees: &trees.0[..],
|
||||
|
@ -199,9 +199,9 @@ impl IDb for LmdbDb {
|
|||
|
||||
let res = f.try_on(&mut tx);
|
||||
match res {
|
||||
TxFnResult::Ok => {
|
||||
TxFnResult::Ok(on_commit) => {
|
||||
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
|
||||
Ok(())
|
||||
Ok(on_commit)
|
||||
}
|
||||
TxFnResult::Abort => {
|
||||
tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;
|
||||
|
|
|
@ -10,8 +10,8 @@ use sled::transaction::{
|
|||
};
|
||||
|
||||
use crate::{
|
||||
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
|
||||
TxValueIter, Value, ValueIter,
|
||||
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
|
||||
TxResult, TxValueIter, Value, ValueIter,
|
||||
};
|
||||
|
||||
pub use sled;
|
||||
|
@ -166,7 +166,7 @@ impl IDb for SledDb {
|
|||
|
||||
// ----
|
||||
|
||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
|
||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
|
||||
let trees = self.trees.read().unwrap();
|
||||
let res = trees.0.transaction(|txtrees| {
|
||||
let mut tx = SledTx {
|
||||
|
@ -174,9 +174,9 @@ impl IDb for SledDb {
|
|||
err: Cell::new(None),
|
||||
};
|
||||
match f.try_on(&mut tx) {
|
||||
TxFnResult::Ok => {
|
||||
TxFnResult::Ok(on_commit) => {
|
||||
assert!(tx.err.into_inner().is_none());
|
||||
Ok(())
|
||||
Ok(on_commit)
|
||||
}
|
||||
TxFnResult::Abort => {
|
||||
assert!(tx.err.into_inner().is_none());
|
||||
|
@ -189,7 +189,7 @@ impl IDb for SledDb {
|
|||
}
|
||||
});
|
||||
match res {
|
||||
Ok(()) => Ok(()),
|
||||
Ok(on_commit) => Ok(on_commit),
|
||||
Err(TransactionError::Abort(())) => Err(TxError::Abort(())),
|
||||
Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())),
|
||||
}
|
||||
|
|
|
@ -9,8 +9,8 @@ use std::sync::{Arc, Mutex, MutexGuard};
|
|||
use rusqlite::{params, Connection, Rows, Statement, Transaction};
|
||||
|
||||
use crate::{
|
||||
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
|
||||
TxValueIter, Value, ValueIter,
|
||||
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
|
||||
TxResult, TxValueIter, Value, ValueIter,
|
||||
};
|
||||
|
||||
pub use rusqlite;
|
||||
|
@ -261,7 +261,7 @@ impl IDb for SqliteDb {
|
|||
|
||||
// ----
|
||||
|
||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
|
||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
|
||||
trace!("transaction: lock db");
|
||||
let mut this = self.0.lock().unwrap();
|
||||
trace!("transaction: lock acquired");
|
||||
|
@ -277,9 +277,9 @@ impl IDb for SqliteDb {
|
|||
trees: &this_mut_ref.trees,
|
||||
};
|
||||
let res = match f.try_on(&mut tx) {
|
||||
TxFnResult::Ok => {
|
||||
TxFnResult::Ok(on_commit) => {
|
||||
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
|
||||
Ok(())
|
||||
Ok(on_commit)
|
||||
}
|
||||
TxFnResult::Abort => {
|
||||
tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;
|
||||
|
|
|
@ -13,26 +13,26 @@ fn test_suite(db: Db) {
|
|||
assert!(tree.insert(ka, va).unwrap().is_none());
|
||||
assert_eq!(tree.get(ka).unwrap().unwrap(), va);
|
||||
|
||||
let res = db.transaction::<_, (), _>(|mut tx| {
|
||||
let res = db.transaction::<_, (), _>(|tx| {
|
||||
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
|
||||
|
||||
assert_eq!(tx.insert(&tree, ka, vb).unwrap().unwrap(), va);
|
||||
|
||||
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
|
||||
|
||||
tx.commit(12)
|
||||
Ok(12)
|
||||
});
|
||||
assert!(matches!(res, Ok(12)));
|
||||
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
|
||||
|
||||
let res = db.transaction::<(), _, _>(|mut tx| {
|
||||
let res = db.transaction::<(), _, _>(|tx| {
|
||||
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
|
||||
|
||||
assert_eq!(tx.insert(&tree, ka, vc).unwrap().unwrap(), vb);
|
||||
|
||||
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vc);
|
||||
|
||||
tx.abort(42)
|
||||
Err(TxError::Abort(42))
|
||||
});
|
||||
assert!(matches!(res, Err(TxError::Abort(42))));
|
||||
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
|
||||
|
|
|
@ -52,7 +52,7 @@ impl Instance {
|
|||
r#"
|
||||
metadata_dir = "{path}/meta"
|
||||
data_dir = "{path}/data"
|
||||
db_engine = "sled"
|
||||
db_engine = "lmdb"
|
||||
|
||||
replication_mode = "1"
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ async fn test_items_and_indices() {
|
|||
let content = format!("{}: hello world", sk).into_bytes();
|
||||
let content2 = format!("{}: hello universe", sk).into_bytes();
|
||||
let content3 = format!("{}: concurrent value", sk).into_bytes();
|
||||
eprintln!("test iteration {}: {}", i, sk);
|
||||
|
||||
// Put initially, no causality token
|
||||
let res = ctx
|
||||
|
@ -89,7 +90,7 @@ async fn test_items_and_indices() {
|
|||
assert_eq!(res_body, content);
|
||||
|
||||
// ReadIndex -- now there should be some stuff
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
|
@ -158,7 +159,7 @@ async fn test_items_and_indices() {
|
|||
assert_eq!(res_body, content2);
|
||||
|
||||
// ReadIndex -- now there should be some stuff
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
|
@ -230,7 +231,7 @@ async fn test_items_and_indices() {
|
|||
);
|
||||
|
||||
// ReadIndex -- now there should be some stuff
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
|
@ -299,7 +300,7 @@ async fn test_items_and_indices() {
|
|||
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// ReadIndex -- now there should be some stuff
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
|
|
|
@ -294,7 +294,7 @@ impl<T: CountedItem> IndexCounter<T> {
|
|||
let counter_entry = local_counter.into_counter_entry(self.this_node);
|
||||
self.local_counter
|
||||
.db()
|
||||
.transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
|
||||
.transaction(|tx| self.table.queue_insert(tx, &counter_entry))?;
|
||||
|
||||
next_start = Some(local_counter_k);
|
||||
}
|
||||
|
@ -360,7 +360,7 @@ impl<T: CountedItem> IndexCounter<T> {
|
|||
let counter_entry = local_counter.into_counter_entry(self.this_node);
|
||||
self.local_counter
|
||||
.db()
|
||||
.transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
|
||||
.transaction(|tx| self.table.queue_insert(tx, &counter_entry))?;
|
||||
|
||||
next_start = Some(counted_entry_k);
|
||||
}
|
||||
|
|
|
@ -330,9 +330,7 @@ async fn process_object(
|
|||
"Lifecycle: expiring 1 object in bucket {:?}",
|
||||
object.bucket_id
|
||||
);
|
||||
db.transaction(|mut tx| {
|
||||
garage.object_table.queue_insert(&mut tx, &deleted_object)
|
||||
})?;
|
||||
db.transaction(|tx| garage.object_table.queue_insert(tx, &deleted_object))?;
|
||||
*objects_expired += 1;
|
||||
}
|
||||
}
|
||||
|
@ -365,9 +363,7 @@ async fn process_object(
|
|||
);
|
||||
let aborted_object =
|
||||
Object::new(object.bucket_id, object.key.clone(), aborted_versions);
|
||||
db.transaction(|mut tx| {
|
||||
garage.object_table.queue_insert(&mut tx, &aborted_object)
|
||||
})?;
|
||||
db.transaction(|tx| garage.object_table.queue_insert(tx, &aborted_object))?;
|
||||
*mpu_aborted += n_aborted;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
|
|||
pub(crate) merkle_todo_notify: Notify,
|
||||
|
||||
pub(crate) insert_queue: db::Tree,
|
||||
pub(crate) insert_queue_notify: Notify,
|
||||
pub(crate) insert_queue_notify: Arc<Notify>,
|
||||
|
||||
pub(crate) gc_todo: CountedTree,
|
||||
|
||||
|
@ -80,7 +80,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
merkle_todo,
|
||||
merkle_todo_notify: Notify::new(),
|
||||
insert_queue,
|
||||
insert_queue_notify: Notify::new(),
|
||||
insert_queue_notify: Arc::new(Notify::new()),
|
||||
gc_todo,
|
||||
metrics,
|
||||
})
|
||||
|
@ -203,14 +203,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
) -> Result<Option<F::E>, Error> {
|
||||
let tree_key = self.tree_key(partition_key, sort_key);
|
||||
|
||||
let changed = self.store.db().transaction(|mut tx| {
|
||||
let changed = self.store.db().transaction(|tx| {
|
||||
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
|
||||
Some(old_bytes) => {
|
||||
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
|
||||
let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
|
||||
let new_entry = update_fn(tx, Some(old_entry.clone()))?;
|
||||
(Some(old_entry), Some(old_bytes), new_entry)
|
||||
}
|
||||
None => (None, None, update_fn(&mut tx, None)?),
|
||||
None => (None, None, update_fn(tx, None)?),
|
||||
};
|
||||
|
||||
// Changed can be true in two scenarios
|
||||
|
@ -233,7 +233,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
tx.insert(&self.store, &tree_key, new_bytes)?;
|
||||
|
||||
self.instance
|
||||
.updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
|
||||
.updated(tx, old_entry.as_ref(), Some(&new_entry))?;
|
||||
|
||||
Ok(Some((new_entry, new_bytes_hash)))
|
||||
} else {
|
||||
|
@ -270,14 +270,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
let removed = self
|
||||
.store
|
||||
.db()
|
||||
.transaction(|mut tx| match tx.get(&self.store, k)? {
|
||||
.transaction(|tx| match tx.get(&self.store, k)? {
|
||||
Some(cur_v) if cur_v == v => {
|
||||
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
|
||||
|
||||
tx.remove(&self.store, k)?;
|
||||
tx.insert(&self.merkle_todo, k, vec![])?;
|
||||
|
||||
self.instance.updated(&mut tx, Some(&old_entry), None)?;
|
||||
self.instance.updated(tx, Some(&old_entry), None)?;
|
||||
Ok(true)
|
||||
}
|
||||
_ => Ok(false),
|
||||
|
@ -298,14 +298,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
let removed = self
|
||||
.store
|
||||
.db()
|
||||
.transaction(|mut tx| match tx.get(&self.store, k)? {
|
||||
.transaction(|tx| match tx.get(&self.store, k)? {
|
||||
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
|
||||
let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
|
||||
|
||||
tx.remove(&self.store, k)?;
|
||||
tx.insert(&self.merkle_todo, k, vec![])?;
|
||||
|
||||
self.instance.updated(&mut tx, Some(&old_entry), None)?;
|
||||
self.instance.updated(tx, Some(&old_entry), None)?;
|
||||
Ok(true)
|
||||
}
|
||||
_ => Ok(false),
|
||||
|
@ -339,7 +339,9 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
.map_err(Error::RmpEncode)
|
||||
.map_err(db::TxError::Abort)?;
|
||||
tx.insert(&self.insert_queue, &tree_key, new_entry)?;
|
||||
self.insert_queue_notify.notify_one();
|
||||
|
||||
let notif = self.insert_queue_notify.clone();
|
||||
tx.on_commit(move || notif.notify_one());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -108,9 +108,9 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
|
|||
self.data
|
||||
.merkle_tree
|
||||
.db()
|
||||
.transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?;
|
||||
.transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
|
||||
|
||||
let deleted = self.data.merkle_todo.db().transaction(|mut tx| {
|
||||
let deleted = self.data.merkle_todo.db().transaction(|tx| {
|
||||
let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
|
||||
if remove {
|
||||
tx.remove(&self.data.merkle_todo, k)?;
|
||||
|
|
|
@ -53,7 +53,7 @@ impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
|
|||
|
||||
self.0.insert_many(values).await?;
|
||||
|
||||
self.0.data.insert_queue.db().transaction(|mut tx| {
|
||||
self.0.data.insert_queue.db().transaction(|tx| {
|
||||
for (k, v) in kv_pairs.iter() {
|
||||
if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? {
|
||||
if &v2 == v {
|
||||
|
|
Loading…
Reference in a new issue