Refactor db transactions and add on_commit for table.queue_insert #637
14 changed files with 86 additions and 84 deletions
|
@ -56,7 +56,7 @@ impl BlockRc {
|
||||||
/// deletion time has passed
|
/// deletion time has passed
|
||||||
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
|
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
|
||||||
let now = now_msec();
|
let now = now_msec();
|
||||||
self.rc.db().transaction(|mut tx| {
|
self.rc.db().transaction(|tx| {
|
||||||
let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
|
let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
|
||||||
match rcval {
|
match rcval {
|
||||||
RcEntry::Deletable { at_time } if now > at_time => {
|
RcEntry::Deletable { at_time } if now > at_time => {
|
||||||
|
@ -64,7 +64,7 @@ impl BlockRc {
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
};
|
};
|
||||||
tx.commit(())
|
Ok(())
|
||||||
})?;
|
})?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,7 +85,7 @@ impl CountedTree {
|
||||||
let old_some = expected_old.is_some();
|
let old_some = expected_old.is_some();
|
||||||
let new_some = new.is_some();
|
let new_some = new.is_some();
|
||||||
|
|
||||||
let tx_res = self.0.tree.db().transaction(|mut tx| {
|
let tx_res = self.0.tree.db().transaction(|tx| {
|
||||||
let old_val = tx.get(&self.0.tree, &key)?;
|
let old_val = tx.get(&self.0.tree, &key)?;
|
||||||
let is_same = match (&old_val, &expected_old) {
|
let is_same = match (&old_val, &expected_old) {
|
||||||
(None, None) => true,
|
(None, None) => true,
|
||||||
|
@ -101,9 +101,9 @@ impl CountedTree {
|
||||||
tx.remove(&self.0.tree, &key)?;
|
tx.remove(&self.0.tree, &key)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tx.commit(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
tx.abort(())
|
Err(TxError::Abort(()))
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -22,10 +22,15 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use err_derive::Error;
|
use err_derive::Error;
|
||||||
|
|
||||||
|
pub(crate) type OnCommit = Vec<Box<dyn FnOnce()>>;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Db(pub(crate) Arc<dyn IDb>);
|
pub struct Db(pub(crate) Arc<dyn IDb>);
|
||||||
|
|
||||||
pub struct Transaction<'a>(&'a mut dyn ITx);
|
pub struct Transaction<'a> {
|
||||||
|
tx: &'a mut dyn ITx,
|
||||||
|
on_commit: OnCommit,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Tree(Arc<dyn IDb>, usize);
|
pub struct Tree(Arc<dyn IDb>, usize);
|
||||||
|
@ -85,7 +90,7 @@ impl Db {
|
||||||
|
|
||||||
pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
|
pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
|
||||||
where
|
where
|
||||||
F: Fn(Transaction<'_>) -> TxResult<R, E>,
|
F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
|
||||||
{
|
{
|
||||||
let f = TxFn {
|
let f = TxFn {
|
||||||
function: fun,
|
function: fun,
|
||||||
|
@ -98,14 +103,17 @@ impl Db {
|
||||||
.expect("Transaction did not store result");
|
.expect("Transaction did not store result");
|
||||||
|
|
||||||
match tx_res {
|
match tx_res {
|
||||||
Ok(()) => {
|
Ok(on_commit) => match ret {
|
||||||
assert!(matches!(ret, Ok(_)));
|
Ok(value) => {
|
||||||
ret
|
on_commit.into_iter().for_each(|f| f());
|
||||||
}
|
Ok(value)
|
||||||
Err(TxError::Abort(())) => {
|
|
||||||
assert!(matches!(ret, Err(TxError::Abort(_))));
|
|
||||||
ret
|
|
||||||
}
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
},
|
||||||
|
Err(TxError::Abort(())) => match ret {
|
||||||
|
Err(TxError::Abort(e)) => Err(TxError::Abort(e)),
|
||||||
|
_ => unreachable!(),
|
||||||
|
},
|
||||||
Err(TxError::Db(e2)) => match ret {
|
Err(TxError::Db(e2)) => match ret {
|
||||||
// Ok was stored -> the error occured when finalizing
|
// Ok was stored -> the error occured when finalizing
|
||||||
// transaction
|
// transaction
|
||||||
|
@ -139,7 +147,7 @@ impl Db {
|
||||||
|
|
||||||
let ex_tree = other.open_tree(&name)?;
|
let ex_tree = other.open_tree(&name)?;
|
||||||
|
|
||||||
let tx_res = self.transaction(|mut tx| {
|
let tx_res = self.transaction(|tx| {
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
for item in ex_tree.iter().map_err(TxError::Abort)? {
|
for item in ex_tree.iter().map_err(TxError::Abort)? {
|
||||||
let (k, v) = item.map_err(TxError::Abort)?;
|
let (k, v) = item.map_err(TxError::Abort)?;
|
||||||
|
@ -149,7 +157,7 @@ impl Db {
|
||||||
println!("{}: imported {}", name, i);
|
println!("{}: imported {}", name, i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tx.commit(i)
|
Ok(i)
|
||||||
});
|
});
|
||||||
let total = match tx_res {
|
let total = match tx_res {
|
||||||
Err(TxError::Db(e)) => return Err(e),
|
Err(TxError::Db(e)) => return Err(e),
|
||||||
|
@ -249,11 +257,11 @@ impl Tree {
|
||||||
impl<'a> Transaction<'a> {
|
impl<'a> Transaction<'a> {
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
|
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
|
||||||
self.0.get(tree.1, key.as_ref())
|
self.tx.get(tree.1, key.as_ref())
|
||||||
}
|
}
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
|
pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
|
||||||
self.0.len(tree.1)
|
self.tx.len(tree.1)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the old value if there was one
|
/// Returns the old value if there was one
|
||||||
|
@ -264,21 +272,21 @@ impl<'a> Transaction<'a> {
|
||||||
key: T,
|
key: T,
|
||||||
value: U,
|
value: U,
|
||||||
) -> TxOpResult<Option<Value>> {
|
) -> TxOpResult<Option<Value>> {
|
||||||
self.0.insert(tree.1, key.as_ref(), value.as_ref())
|
self.tx.insert(tree.1, key.as_ref(), value.as_ref())
|
||||||
}
|
}
|
||||||
/// Returns the old value if there was one
|
/// Returns the old value if there was one
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
|
pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
|
||||||
self.0.remove(tree.1, key.as_ref())
|
self.tx.remove(tree.1, key.as_ref())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
|
pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
|
||||||
self.0.iter(tree.1)
|
self.tx.iter(tree.1)
|
||||||
}
|
}
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
|
pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
|
||||||
self.0.iter_rev(tree.1)
|
self.tx.iter_rev(tree.1)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -289,7 +297,7 @@ impl<'a> Transaction<'a> {
|
||||||
{
|
{
|
||||||
let sb = range.start_bound();
|
let sb = range.start_bound();
|
||||||
let eb = range.end_bound();
|
let eb = range.end_bound();
|
||||||
self.0.range(tree.1, get_bound(sb), get_bound(eb))
|
self.tx.range(tree.1, get_bound(sb), get_bound(eb))
|
||||||
}
|
}
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
|
pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
|
||||||
|
@ -299,19 +307,12 @@ impl<'a> Transaction<'a> {
|
||||||
{
|
{
|
||||||
let sb = range.start_bound();
|
let sb = range.start_bound();
|
||||||
let eb = range.end_bound();
|
let eb = range.end_bound();
|
||||||
self.0.range_rev(tree.1, get_bound(sb), get_bound(eb))
|
self.tx.range_rev(tree.1, get_bound(sb), get_bound(eb))
|
||||||
}
|
|
||||||
|
|
||||||
// ----
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub fn abort<R, E>(self, e: E) -> TxResult<R, E> {
|
|
||||||
Err(TxError::Abort(e))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn commit<R, E>(self, r: R) -> TxResult<R, E> {
|
pub fn on_commit<F: FnOnce() + 'static>(&mut self, f: F) {
|
||||||
Ok(r)
|
self.on_commit.push(Box::new(f));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -348,7 +349,7 @@ pub(crate) trait IDb: Send + Sync {
|
||||||
high: Bound<&'r [u8]>,
|
high: Bound<&'r [u8]>,
|
||||||
) -> Result<ValueIter<'_>>;
|
) -> Result<ValueIter<'_>>;
|
||||||
|
|
||||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
|
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) trait ITx {
|
pub(crate) trait ITx {
|
||||||
|
@ -380,14 +381,14 @@ pub(crate) trait ITxFn {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) enum TxFnResult {
|
pub(crate) enum TxFnResult {
|
||||||
Ok,
|
Ok(OnCommit),
|
||||||
Abort,
|
Abort,
|
||||||
DbErr,
|
DbErr,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct TxFn<F, R, E>
|
struct TxFn<F, R, E>
|
||||||
where
|
where
|
||||||
F: Fn(Transaction<'_>) -> TxResult<R, E>,
|
F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
|
||||||
{
|
{
|
||||||
function: F,
|
function: F,
|
||||||
result: Cell<Option<TxResult<R, E>>>,
|
result: Cell<Option<TxResult<R, E>>>,
|
||||||
|
@ -395,12 +396,16 @@ where
|
||||||
|
|
||||||
impl<F, R, E> ITxFn for TxFn<F, R, E>
|
impl<F, R, E> ITxFn for TxFn<F, R, E>
|
||||||
where
|
where
|
||||||
F: Fn(Transaction<'_>) -> TxResult<R, E>,
|
F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
|
||||||
{
|
{
|
||||||
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
|
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
|
||||||
let res = (self.function)(Transaction(tx));
|
let mut tx = Transaction {
|
||||||
|
tx,
|
||||||
|
on_commit: vec![],
|
||||||
|
};
|
||||||
|
let res = (self.function)(&mut tx);
|
||||||
let res2 = match &res {
|
let res2 = match &res {
|
||||||
Ok(_) => TxFnResult::Ok,
|
Ok(_) => TxFnResult::Ok(tx.on_commit),
|
||||||
Err(TxError::Abort(_)) => TxFnResult::Abort,
|
Err(TxError::Abort(_)) => TxFnResult::Abort,
|
||||||
Err(TxError::Db(_)) => TxFnResult::DbErr,
|
Err(TxError::Db(_)) => TxFnResult::DbErr,
|
||||||
};
|
};
|
||||||
|
|
|
@ -9,8 +9,8 @@ use heed::types::ByteSlice;
|
||||||
use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database};
|
use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
|
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
|
||||||
TxValueIter, Value, ValueIter,
|
TxResult, TxValueIter, Value, ValueIter,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use heed;
|
pub use heed;
|
||||||
|
@ -186,7 +186,7 @@ impl IDb for LmdbDb {
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
|
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
|
||||||
let trees = self.trees.read().unwrap();
|
let trees = self.trees.read().unwrap();
|
||||||
let mut tx = LmdbTx {
|
let mut tx = LmdbTx {
|
||||||
trees: &trees.0[..],
|
trees: &trees.0[..],
|
||||||
|
@ -199,9 +199,9 @@ impl IDb for LmdbDb {
|
||||||
|
|
||||||
let res = f.try_on(&mut tx);
|
let res = f.try_on(&mut tx);
|
||||||
match res {
|
match res {
|
||||||
TxFnResult::Ok => {
|
TxFnResult::Ok(on_commit) => {
|
||||||
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
|
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
|
||||||
Ok(())
|
Ok(on_commit)
|
||||||
}
|
}
|
||||||
TxFnResult::Abort => {
|
TxFnResult::Abort => {
|
||||||
tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;
|
tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;
|
||||||
|
|
|
@ -10,8 +10,8 @@ use sled::transaction::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
|
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
|
||||||
TxValueIter, Value, ValueIter,
|
TxResult, TxValueIter, Value, ValueIter,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use sled;
|
pub use sled;
|
||||||
|
@ -166,7 +166,7 @@ impl IDb for SledDb {
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
|
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
|
||||||
let trees = self.trees.read().unwrap();
|
let trees = self.trees.read().unwrap();
|
||||||
let res = trees.0.transaction(|txtrees| {
|
let res = trees.0.transaction(|txtrees| {
|
||||||
let mut tx = SledTx {
|
let mut tx = SledTx {
|
||||||
|
@ -174,9 +174,9 @@ impl IDb for SledDb {
|
||||||
err: Cell::new(None),
|
err: Cell::new(None),
|
||||||
};
|
};
|
||||||
match f.try_on(&mut tx) {
|
match f.try_on(&mut tx) {
|
||||||
TxFnResult::Ok => {
|
TxFnResult::Ok(on_commit) => {
|
||||||
assert!(tx.err.into_inner().is_none());
|
assert!(tx.err.into_inner().is_none());
|
||||||
Ok(())
|
Ok(on_commit)
|
||||||
}
|
}
|
||||||
TxFnResult::Abort => {
|
TxFnResult::Abort => {
|
||||||
assert!(tx.err.into_inner().is_none());
|
assert!(tx.err.into_inner().is_none());
|
||||||
|
@ -189,7 +189,7 @@ impl IDb for SledDb {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
match res {
|
match res {
|
||||||
Ok(()) => Ok(()),
|
Ok(on_commit) => Ok(on_commit),
|
||||||
Err(TransactionError::Abort(())) => Err(TxError::Abort(())),
|
Err(TransactionError::Abort(())) => Err(TxError::Abort(())),
|
||||||
Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())),
|
Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())),
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,8 +9,8 @@ use std::sync::{Arc, Mutex, MutexGuard};
|
||||||
use rusqlite::{params, Connection, Rows, Statement, Transaction};
|
use rusqlite::{params, Connection, Rows, Statement, Transaction};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
|
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
|
||||||
TxValueIter, Value, ValueIter,
|
TxResult, TxValueIter, Value, ValueIter,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use rusqlite;
|
pub use rusqlite;
|
||||||
|
@ -261,7 +261,7 @@ impl IDb for SqliteDb {
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
|
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
|
||||||
trace!("transaction: lock db");
|
trace!("transaction: lock db");
|
||||||
let mut this = self.0.lock().unwrap();
|
let mut this = self.0.lock().unwrap();
|
||||||
trace!("transaction: lock acquired");
|
trace!("transaction: lock acquired");
|
||||||
|
@ -277,9 +277,9 @@ impl IDb for SqliteDb {
|
||||||
trees: &this_mut_ref.trees,
|
trees: &this_mut_ref.trees,
|
||||||
};
|
};
|
||||||
let res = match f.try_on(&mut tx) {
|
let res = match f.try_on(&mut tx) {
|
||||||
TxFnResult::Ok => {
|
TxFnResult::Ok(on_commit) => {
|
||||||
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
|
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
|
||||||
Ok(())
|
Ok(on_commit)
|
||||||
}
|
}
|
||||||
TxFnResult::Abort => {
|
TxFnResult::Abort => {
|
||||||
tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;
|
tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;
|
||||||
|
|
|
@ -13,26 +13,26 @@ fn test_suite(db: Db) {
|
||||||
assert!(tree.insert(ka, va).unwrap().is_none());
|
assert!(tree.insert(ka, va).unwrap().is_none());
|
||||||
assert_eq!(tree.get(ka).unwrap().unwrap(), va);
|
assert_eq!(tree.get(ka).unwrap().unwrap(), va);
|
||||||
|
|
||||||
let res = db.transaction::<_, (), _>(|mut tx| {
|
let res = db.transaction::<_, (), _>(|tx| {
|
||||||
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
|
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
|
||||||
|
|
||||||
assert_eq!(tx.insert(&tree, ka, vb).unwrap().unwrap(), va);
|
assert_eq!(tx.insert(&tree, ka, vb).unwrap().unwrap(), va);
|
||||||
|
|
||||||
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
|
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
|
||||||
|
|
||||||
tx.commit(12)
|
Ok(12)
|
||||||
});
|
});
|
||||||
assert!(matches!(res, Ok(12)));
|
assert!(matches!(res, Ok(12)));
|
||||||
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
|
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
|
||||||
|
|
||||||
let res = db.transaction::<(), _, _>(|mut tx| {
|
let res = db.transaction::<(), _, _>(|tx| {
|
||||||
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
|
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
|
||||||
|
|
||||||
assert_eq!(tx.insert(&tree, ka, vc).unwrap().unwrap(), vb);
|
assert_eq!(tx.insert(&tree, ka, vc).unwrap().unwrap(), vb);
|
||||||
|
|
||||||
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vc);
|
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vc);
|
||||||
|
|
||||||
tx.abort(42)
|
Err(TxError::Abort(42))
|
||||||
});
|
});
|
||||||
assert!(matches!(res, Err(TxError::Abort(42))));
|
assert!(matches!(res, Err(TxError::Abort(42))));
|
||||||
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
|
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
|
||||||
|
|
|
@ -52,7 +52,7 @@ impl Instance {
|
||||||
r#"
|
r#"
|
||||||
metadata_dir = "{path}/meta"
|
metadata_dir = "{path}/meta"
|
||||||
data_dir = "{path}/data"
|
data_dir = "{path}/data"
|
||||||
db_engine = "sled"
|
db_engine = "lmdb"
|
||||||
|
|
||||||
replication_mode = "1"
|
replication_mode = "1"
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,7 @@ async fn test_items_and_indices() {
|
||||||
let content = format!("{}: hello world", sk).into_bytes();
|
let content = format!("{}: hello world", sk).into_bytes();
|
||||||
let content2 = format!("{}: hello universe", sk).into_bytes();
|
let content2 = format!("{}: hello universe", sk).into_bytes();
|
||||||
let content3 = format!("{}: concurrent value", sk).into_bytes();
|
let content3 = format!("{}: concurrent value", sk).into_bytes();
|
||||||
|
eprintln!("test iteration {}: {}", i, sk);
|
||||||
|
|
||||||
// Put initially, no causality token
|
// Put initially, no causality token
|
||||||
let res = ctx
|
let res = ctx
|
||||||
|
@ -89,7 +90,7 @@ async fn test_items_and_indices() {
|
||||||
assert_eq!(res_body, content);
|
assert_eq!(res_body, content);
|
||||||
|
|
||||||
// ReadIndex -- now there should be some stuff
|
// ReadIndex -- now there should be some stuff
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
let res = ctx
|
let res = ctx
|
||||||
.k2v
|
.k2v
|
||||||
.request
|
.request
|
||||||
|
@ -158,7 +159,7 @@ async fn test_items_and_indices() {
|
||||||
assert_eq!(res_body, content2);
|
assert_eq!(res_body, content2);
|
||||||
|
|
||||||
// ReadIndex -- now there should be some stuff
|
// ReadIndex -- now there should be some stuff
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
let res = ctx
|
let res = ctx
|
||||||
.k2v
|
.k2v
|
||||||
.request
|
.request
|
||||||
|
@ -230,7 +231,7 @@ async fn test_items_and_indices() {
|
||||||
);
|
);
|
||||||
|
|
||||||
// ReadIndex -- now there should be some stuff
|
// ReadIndex -- now there should be some stuff
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
let res = ctx
|
let res = ctx
|
||||||
.k2v
|
.k2v
|
||||||
.request
|
.request
|
||||||
|
@ -299,7 +300,7 @@ async fn test_items_and_indices() {
|
||||||
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||||
|
|
||||||
// ReadIndex -- now there should be some stuff
|
// ReadIndex -- now there should be some stuff
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
let res = ctx
|
let res = ctx
|
||||||
.k2v
|
.k2v
|
||||||
.request
|
.request
|
||||||
|
|
|
@ -294,7 +294,7 @@ impl<T: CountedItem> IndexCounter<T> {
|
||||||
let counter_entry = local_counter.into_counter_entry(self.this_node);
|
let counter_entry = local_counter.into_counter_entry(self.this_node);
|
||||||
self.local_counter
|
self.local_counter
|
||||||
.db()
|
.db()
|
||||||
.transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
|
.transaction(|tx| self.table.queue_insert(tx, &counter_entry))?;
|
||||||
|
|
||||||
next_start = Some(local_counter_k);
|
next_start = Some(local_counter_k);
|
||||||
}
|
}
|
||||||
|
@ -360,7 +360,7 @@ impl<T: CountedItem> IndexCounter<T> {
|
||||||
let counter_entry = local_counter.into_counter_entry(self.this_node);
|
let counter_entry = local_counter.into_counter_entry(self.this_node);
|
||||||
self.local_counter
|
self.local_counter
|
||||||
.db()
|
.db()
|
||||||
.transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
|
.transaction(|tx| self.table.queue_insert(tx, &counter_entry))?;
|
||||||
|
|
||||||
next_start = Some(counted_entry_k);
|
next_start = Some(counted_entry_k);
|
||||||
}
|
}
|
||||||
|
|
|
@ -330,9 +330,7 @@ async fn process_object(
|
||||||
"Lifecycle: expiring 1 object in bucket {:?}",
|
"Lifecycle: expiring 1 object in bucket {:?}",
|
||||||
object.bucket_id
|
object.bucket_id
|
||||||
);
|
);
|
||||||
db.transaction(|mut tx| {
|
db.transaction(|tx| garage.object_table.queue_insert(tx, &deleted_object))?;
|
||||||
garage.object_table.queue_insert(&mut tx, &deleted_object)
|
|
||||||
})?;
|
|
||||||
*objects_expired += 1;
|
*objects_expired += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -365,9 +363,7 @@ async fn process_object(
|
||||||
);
|
);
|
||||||
let aborted_object =
|
let aborted_object =
|
||||||
Object::new(object.bucket_id, object.key.clone(), aborted_versions);
|
Object::new(object.bucket_id, object.key.clone(), aborted_versions);
|
||||||
db.transaction(|mut tx| {
|
db.transaction(|tx| garage.object_table.queue_insert(tx, &aborted_object))?;
|
||||||
garage.object_table.queue_insert(&mut tx, &aborted_object)
|
|
||||||
})?;
|
|
||||||
*mpu_aborted += n_aborted;
|
*mpu_aborted += n_aborted;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -203,14 +203,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
) -> Result<Option<F::E>, Error> {
|
) -> Result<Option<F::E>, Error> {
|
||||||
let tree_key = self.tree_key(partition_key, sort_key);
|
let tree_key = self.tree_key(partition_key, sort_key);
|
||||||
|
|
||||||
let changed = self.store.db().transaction(|mut tx| {
|
let changed = self.store.db().transaction(|tx| {
|
||||||
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
|
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
|
||||||
Some(old_bytes) => {
|
Some(old_bytes) => {
|
||||||
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
|
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
|
||||||
let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
|
let new_entry = update_fn(tx, Some(old_entry.clone()))?;
|
||||||
(Some(old_entry), Some(old_bytes), new_entry)
|
(Some(old_entry), Some(old_bytes), new_entry)
|
||||||
}
|
}
|
||||||
None => (None, None, update_fn(&mut tx, None)?),
|
None => (None, None, update_fn(tx, None)?),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Changed can be true in two scenarios
|
// Changed can be true in two scenarios
|
||||||
|
@ -233,7 +233,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
tx.insert(&self.store, &tree_key, new_bytes)?;
|
tx.insert(&self.store, &tree_key, new_bytes)?;
|
||||||
|
|
||||||
self.instance
|
self.instance
|
||||||
.updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
|
.updated(tx, old_entry.as_ref(), Some(&new_entry))?;
|
||||||
|
|
||||||
Ok(Some((new_entry, new_bytes_hash)))
|
Ok(Some((new_entry, new_bytes_hash)))
|
||||||
} else {
|
} else {
|
||||||
|
@ -270,14 +270,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
let removed = self
|
let removed = self
|
||||||
.store
|
.store
|
||||||
.db()
|
.db()
|
||||||
.transaction(|mut tx| match tx.get(&self.store, k)? {
|
.transaction(|tx| match tx.get(&self.store, k)? {
|
||||||
Some(cur_v) if cur_v == v => {
|
Some(cur_v) if cur_v == v => {
|
||||||
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
|
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
|
||||||
|
|
||||||
tx.remove(&self.store, k)?;
|
tx.remove(&self.store, k)?;
|
||||||
tx.insert(&self.merkle_todo, k, vec![])?;
|
tx.insert(&self.merkle_todo, k, vec![])?;
|
||||||
|
|
||||||
self.instance.updated(&mut tx, Some(&old_entry), None)?;
|
self.instance.updated(tx, Some(&old_entry), None)?;
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
_ => Ok(false),
|
_ => Ok(false),
|
||||||
|
@ -298,14 +298,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
let removed = self
|
let removed = self
|
||||||
.store
|
.store
|
||||||
.db()
|
.db()
|
||||||
.transaction(|mut tx| match tx.get(&self.store, k)? {
|
.transaction(|tx| match tx.get(&self.store, k)? {
|
||||||
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
|
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
|
||||||
let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
|
let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
|
||||||
|
|
||||||
tx.remove(&self.store, k)?;
|
tx.remove(&self.store, k)?;
|
||||||
tx.insert(&self.merkle_todo, k, vec![])?;
|
tx.insert(&self.merkle_todo, k, vec![])?;
|
||||||
|
|
||||||
self.instance.updated(&mut tx, Some(&old_entry), None)?;
|
self.instance.updated(tx, Some(&old_entry), None)?;
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
_ => Ok(false),
|
_ => Ok(false),
|
||||||
|
|
|
@ -108,9 +108,9 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
|
||||||
self.data
|
self.data
|
||||||
.merkle_tree
|
.merkle_tree
|
||||||
.db()
|
.db()
|
||||||
.transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?;
|
.transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
|
||||||
|
|
||||||
let deleted = self.data.merkle_todo.db().transaction(|mut tx| {
|
let deleted = self.data.merkle_todo.db().transaction(|tx| {
|
||||||
let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
|
let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
|
||||||
if remove {
|
if remove {
|
||||||
tx.remove(&self.data.merkle_todo, k)?;
|
tx.remove(&self.data.merkle_todo, k)?;
|
||||||
|
|
|
@ -53,7 +53,7 @@ impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
|
||||||
|
|
||||||
self.0.insert_many(values).await?;
|
self.0.insert_many(values).await?;
|
||||||
|
|
||||||
self.0.data.insert_queue.db().transaction(|mut tx| {
|
self.0.data.insert_queue.db().transaction(|tx| {
|
||||||
for (k, v) in kv_pairs.iter() {
|
for (k, v) in kv_pairs.iter() {
|
||||||
if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? {
|
if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? {
|
||||||
if &v2 == v {
|
if &v2 == v {
|
||||||
|
|
Loading…
Reference in a new issue