Abstract database behind generic interface and implement alternative drivers #322

Merged
lx merged 64 commits from db-abstraction into main 2022-06-08 08:01:56 +00:00
12 changed files with 149 additions and 99 deletions
Showing only changes of commit 0543cb3453 - Show all commits

View file

@ -325,7 +325,11 @@ impl BlockManager {
/// Increment the number of time a block is used, putting it to resynchronization if it is /// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known /// required, but not known
pub fn block_incref(self: &Arc<Self>, tx: &mut db::Transaction, hash: Hash) -> db::Result<()> { pub fn block_incref(
self: &Arc<Self>,
tx: &mut db::Transaction,
hash: Hash,
) -> db::TxOpResult<()> {
if self.rc.block_incref(tx, &hash)? { if self.rc.block_incref(tx, &hash)? {
// When the reference counter is incremented, there is // When the reference counter is incremented, there is
// normally a node that is responsible for sending us the // normally a node that is responsible for sending us the
@ -344,7 +348,11 @@ impl BlockManager {
} }
/// Decrement the number of time a block is used /// Decrement the number of time a block is used
pub fn block_decref(self: &Arc<Self>, tx: &mut db::Transaction, hash: Hash) -> db::Result<()> { pub fn block_decref(
self: &Arc<Self>,
tx: &mut db::Transaction,
hash: Hash,
) -> db::TxOpResult<()> {
if self.rc.block_decref(tx, &hash)? { if self.rc.block_decref(tx, &hash)? {
// When the RC is decremented, it might drop to zero, // When the RC is decremented, it might drop to zero,
// indicating that we don't need the block. // indicating that we don't need the block.

View file

@ -19,7 +19,11 @@ impl BlockRc {
/// Increment the reference counter associated to a hash. /// Increment the reference counter associated to a hash.
/// Returns true if the RC goes from zero to nonzero. /// Returns true if the RC goes from zero to nonzero.
pub(crate) fn block_incref(&self, tx: &mut db::Transaction, hash: &Hash) -> db::Result<bool> { pub(crate) fn block_incref(
&self,
tx: &mut db::Transaction,
hash: &Hash,
) -> db::TxOpResult<bool> {
let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
match old_rc.increment().serialize() { match old_rc.increment().serialize() {
Some(x) => tx.insert(&self.rc, &hash, x)?, Some(x) => tx.insert(&self.rc, &hash, x)?,
@ -30,7 +34,11 @@ impl BlockRc {
/// Decrement the reference counter associated to a hash. /// Decrement the reference counter associated to a hash.
/// Returns true if the RC is now zero. /// Returns true if the RC is now zero.
pub(crate) fn block_decref(&self, tx: &mut db::Transaction, hash: &Hash) -> db::Result<bool> { pub(crate) fn block_decref(
&self,
tx: &mut db::Transaction,
hash: &Hash,
) -> db::TxOpResult<bool> {
let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement();
match new_rc.serialize() { match new_rc.serialize() {
Some(x) => tx.insert(&self.rc, &hash, x)?, Some(x) => tx.insert(&self.rc, &hash, x)?,

View file

@ -34,6 +34,9 @@ pub struct Error(pub Cow<'static, str>);
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
pub struct TxOpError(pub(crate) Error);
pub type TxOpResult<T> = std::result::Result<T, TxOpError>;
#[derive(Debug)] #[derive(Debug)]
pub enum TxError<E> { pub enum TxError<E> {
Abort(E), Abort(E),
@ -41,9 +44,17 @@ pub enum TxError<E> {
} }
pub type TxResult<R, E> = std::result::Result<R, TxError<E>>; pub type TxResult<R, E> = std::result::Result<R, TxError<E>>;
impl<E> From<Error> for TxError<E> { impl<E> From<TxOpError> for TxError<E> {
fn from(e: Error) -> TxError<E> { fn from(e: TxOpError) -> TxError<E> {
TxError::Db(e) TxError::Db(e.0)
}
}
pub fn unabort<R, E>(res: TxResult<R, E>) -> TxOpResult<std::result::Result<R, E>> {
match res {
Ok(v) => Ok(Ok(v)),
Err(TxError::Abort(e)) => Ok(Err(e)),
Err(TxError::Db(e)) => Err(TxOpError(e)),
} }
} }
@ -117,19 +128,19 @@ impl Db {
let tx_res = self.transaction(|mut tx| { let tx_res = self.transaction(|mut tx| {
let mut i = 0; let mut i = 0;
for item in ex_tree.iter()? { for item in ex_tree.iter().map_err(TxError::Abort)? {
let (k, v) = item?; let (k, v) = item.map_err(TxError::Abort)?;
tx.insert(&tree, k, v)?; tx.insert(&tree, k, v)?;
i += 1; i += 1;
if i % 1000 == 0 { if i % 1000 == 0 {
println!("{}: imported {}", name, i); println!("{}: imported {}", name, i);
} }
} }
Ok::<_, TxError<()>>(i) tx.commit(i)
}); });
let total = match tx_res { let total = match tx_res {
Err(TxError::Db(e)) => return Err(e), Err(TxError::Db(e)) => return Err(e),
Err(TxError::Abort(_)) => unreachable!(), Err(TxError::Abort(e)) => return Err(e),
Ok(x) => x, Ok(x) => x,
}; };
@ -215,11 +226,11 @@ impl Tree {
#[allow(clippy::len_without_is_empty)] #[allow(clippy::len_without_is_empty)]
impl<'a> Transaction<'a> { impl<'a> Transaction<'a> {
#[inline] #[inline]
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<Option<Value>> { pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
self.0.get(tree.1, key.as_ref()) self.0.get(tree.1, key.as_ref())
} }
#[inline] #[inline]
pub fn len(&self, tree: &Tree) -> Result<usize> { pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
self.0.len(tree.1) self.0.len(tree.1)
} }
@ -230,26 +241,26 @@ impl<'a> Transaction<'a> {
tree: &Tree, tree: &Tree,
key: T, key: T,
value: U, value: U,
) -> Result<Option<Value>> { ) -> TxOpResult<Option<Value>> {
self.0.insert(tree.1, key.as_ref(), value.as_ref()) self.0.insert(tree.1, key.as_ref(), value.as_ref())
} }
/// Returns the old value if there was one /// Returns the old value if there was one
#[inline] #[inline]
pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> Result<Option<Value>> { pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
self.0.remove(tree.1, key.as_ref()) self.0.remove(tree.1, key.as_ref())
} }
#[inline] #[inline]
pub fn iter(&self, tree: &Tree) -> Result<ValueIter<'_>> { pub fn iter(&self, tree: &Tree) -> TxOpResult<ValueIter<'_>> {
self.0.iter(tree.1) self.0.iter(tree.1)
} }
#[inline] #[inline]
pub fn iter_rev(&self, tree: &Tree) -> Result<ValueIter<'_>> { pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<ValueIter<'_>> {
self.0.iter_rev(tree.1) self.0.iter_rev(tree.1)
} }
#[inline] #[inline]
pub fn range<K, R>(&self, tree: &Tree, range: R) -> Result<ValueIter<'_>> pub fn range<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<ValueIter<'_>>
where where
K: AsRef<[u8]>, K: AsRef<[u8]>,
R: RangeBounds<K>, R: RangeBounds<K>,
@ -259,7 +270,7 @@ impl<'a> Transaction<'a> {
self.0.range(tree.1, get_bound(sb), get_bound(eb)) self.0.range(tree.1, get_bound(sb), get_bound(eb))
} }
#[inline] #[inline]
pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> Result<ValueIter<'_>> pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<ValueIter<'_>>
where where
K: AsRef<[u8]>, K: AsRef<[u8]>,
R: RangeBounds<K>, R: RangeBounds<K>,
@ -314,27 +325,27 @@ pub(crate) trait IDb: Send + Sync {
} }
pub(crate) trait ITx { pub(crate) trait ITx {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>;
fn len(&self, tree: usize) -> Result<usize>; fn len(&self, tree: usize) -> TxOpResult<usize>;
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>; fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>>;
fn remove(&mut self, tree: usize, key: &[u8]) -> Result<Option<Value>>; fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>;
fn iter(&self, tree: usize) -> Result<ValueIter<'_>>; fn iter(&self, tree: usize) -> TxOpResult<ValueIter<'_>>;
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>; fn iter_rev(&self, tree: usize) -> TxOpResult<ValueIter<'_>>;
fn range<'r>( fn range<'r>(
&self, &self,
tree: usize, tree: usize,
low: Bound<&'r [u8]>, low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>, high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>>; ) -> TxOpResult<ValueIter<'_>>;
fn range_rev<'r>( fn range_rev<'r>(
&self, &self,
tree: usize, tree: usize,
low: Bound<&'r [u8]>, low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>, high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>>; ) -> TxOpResult<ValueIter<'_>>;
} }
pub(crate) trait ITxFn { pub(crate) trait ITxFn {

View file

@ -8,7 +8,10 @@ use std::sync::{Arc, RwLock};
use heed::types::ByteSlice; use heed::types::ByteSlice;
use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database}; use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database};
use crate::{Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter}; use crate::{
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
Value, ValueIter,
};
pub use heed; pub use heed;
@ -20,9 +23,9 @@ impl From<heed::Error> for Error {
} }
} }
impl<T> From<heed::Error> for TxError<T> { impl From<heed::Error> for TxOpError {
fn from(e: heed::Error) -> TxError<T> { fn from(e: heed::Error) -> TxOpError {
TxError::Db(e.into()) TxOpError(e.into())
} }
} }
@ -171,21 +174,25 @@ impl IDb for LmdbDb {
let trees = self.trees.read().unwrap(); let trees = self.trees.read().unwrap();
let mut tx = LmdbTx { let mut tx = LmdbTx {
trees: &trees.0[..], trees: &trees.0[..],
tx: self.db.write_txn()?, tx: self
.db
.write_txn()
.map_err(Error::from)
.map_err(TxError::Db)?,
}; };
let res = f.try_on(&mut tx); let res = f.try_on(&mut tx);
match res { match res {
TxFnResult::Ok => { TxFnResult::Ok => {
tx.tx.commit()?; tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
Ok(()) Ok(())
} }
TxFnResult::Abort => { TxFnResult::Abort => {
tx.tx.abort()?; tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;
Err(TxError::Abort(())) Err(TxError::Abort(()))
} }
TxFnResult::DbErr => { TxFnResult::DbErr => {
tx.tx.abort()?; tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;
Err(TxError::Db(Error( Err(TxError::Db(Error(
"(this message will be discarded)".into(), "(this message will be discarded)".into(),
))) )))
@ -202,44 +209,44 @@ struct LmdbTx<'a, 'db> {
} }
impl<'a, 'db> LmdbTx<'a, 'db> { impl<'a, 'db> LmdbTx<'a, 'db> {
fn get_tree(&self, i: usize) -> Result<&Database> { fn get_tree(&self, i: usize) -> TxOpResult<&Database> {
self.trees.get(i).ok_or_else(|| { self.trees.get(i).ok_or_else(|| {
Error( TxOpError(Error(
"invalid tree id (it might have been openned after the transaction started)".into(), "invalid tree id (it might have been openned after the transaction started)".into(),
) ))
}) })
} }
} }
impl<'a, 'db> ITx for LmdbTx<'a, 'db> { impl<'a, 'db> ITx for LmdbTx<'a, 'db> {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
match tree.get(&self.tx, key)? { match tree.get(&self.tx, key)? {
Some(v) => Ok(Some(v.to_vec())), Some(v) => Ok(Some(v.to_vec())),
None => Ok(None), None => Ok(None),
} }
} }
fn len(&self, _tree: usize) -> Result<usize> { fn len(&self, _tree: usize) -> TxOpResult<usize> {
unimplemented!(".len() in transaction not supported with LMDB backend") unimplemented!(".len() in transaction not supported with LMDB backend")
} }
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> {
let tree = *self.get_tree(tree)?; let tree = *self.get_tree(tree)?;
let old_val = tree.get(&self.tx, key)?.map(Vec::from); let old_val = tree.get(&self.tx, key)?.map(Vec::from);
tree.put(&mut self.tx, key, value)?; tree.put(&mut self.tx, key, value)?;
Ok(old_val) Ok(old_val)
} }
fn remove(&mut self, tree: usize, key: &[u8]) -> Result<Option<Value>> { fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
let tree = *self.get_tree(tree)?; let tree = *self.get_tree(tree)?;
let old_val = tree.get(&self.tx, key)?.map(Vec::from); let old_val = tree.get(&self.tx, key)?.map(Vec::from);
tree.delete(&mut self.tx, key)?; tree.delete(&mut self.tx, key)?;
Ok(old_val) Ok(old_val)
} }
fn iter(&self, _tree: usize) -> Result<ValueIter<'_>> { fn iter(&self, _tree: usize) -> TxOpResult<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with LMDB backend"); unimplemented!("Iterators in transactions not supported with LMDB backend");
} }
fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'_>> { fn iter_rev(&self, _tree: usize) -> TxOpResult<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with LMDB backend"); unimplemented!("Iterators in transactions not supported with LMDB backend");
} }
@ -248,7 +255,7 @@ impl<'a, 'db> ITx for LmdbTx<'a, 'db> {
_tree: usize, _tree: usize,
_low: Bound<&'r [u8]>, _low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, _high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> { ) -> TxOpResult<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with LMDB backend"); unimplemented!("Iterators in transactions not supported with LMDB backend");
} }
fn range_rev<'r>( fn range_rev<'r>(
@ -256,7 +263,7 @@ impl<'a, 'db> ITx for LmdbTx<'a, 'db> {
_tree: usize, _tree: usize,
_low: Bound<&'r [u8]>, _low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, _high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> { ) -> TxOpResult<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with LMDB backend"); unimplemented!("Iterators in transactions not supported with LMDB backend");
} }
} }

View file

@ -9,7 +9,10 @@ use sled::transaction::{
UnabortableTransactionError, UnabortableTransactionError,
}; };
use crate::{Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter}; use crate::{
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
Value, ValueIter,
};
pub use sled; pub use sled;
@ -21,6 +24,12 @@ impl From<sled::Error> for Error {
} }
} }
impl From<sled::Error> for TxOpError {
fn from(e: sled::Error) -> TxOpError {
TxOpError(e.into())
}
}
// -- db // -- db
pub struct SledDb { pub struct SledDb {
@ -177,51 +186,54 @@ struct SledTx<'a> {
} }
impl<'a> SledTx<'a> { impl<'a> SledTx<'a> {
fn get_tree(&self, i: usize) -> Result<&TransactionalTree> { fn get_tree(&self, i: usize) -> TxOpResult<&TransactionalTree> {
self.trees.get(i).ok_or_else(|| { self.trees.get(i).ok_or_else(|| {
Error( TxOpError(Error(
"invalid tree id (it might have been openned after the transaction started)".into(), "invalid tree id (it might have been openned after the transaction started)".into(),
) ))
}) })
} }
fn save_error<R>(&self, v: std::result::Result<R, UnabortableTransactionError>) -> Result<R> { fn save_error<R>(
&self,
v: std::result::Result<R, UnabortableTransactionError>,
) -> TxOpResult<R> {
match v { match v {
Ok(x) => Ok(x), Ok(x) => Ok(x),
Err(e) => { Err(e) => {
let txt = format!("{}", e); let txt = format!("{}", e);
self.err.set(Some(e)); self.err.set(Some(e));
Err(Error(txt.into())) Err(TxOpError(Error(txt.into())))
} }
} }
} }
} }
impl<'a> ITx for SledTx<'a> { impl<'a> ITx for SledTx<'a> {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
let tmp = self.save_error(tree.get(key))?; let tmp = self.save_error(tree.get(key))?;
Ok(tmp.map(|x| x.to_vec())) Ok(tmp.map(|x| x.to_vec()))
} }
fn len(&self, _tree: usize) -> Result<usize> { fn len(&self, _tree: usize) -> TxOpResult<usize> {
unimplemented!(".len() in transaction not supported with Sled backend") unimplemented!(".len() in transaction not supported with Sled backend")
} }
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
let old_val = self.save_error(tree.insert(key, value))?; let old_val = self.save_error(tree.insert(key, value))?;
Ok(old_val.map(|x| x.to_vec())) Ok(old_val.map(|x| x.to_vec()))
} }
fn remove(&mut self, tree: usize, key: &[u8]) -> Result<Option<Value>> { fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
let old_val = self.save_error(tree.remove(key))?; let old_val = self.save_error(tree.remove(key))?;
Ok(old_val.map(|x| x.to_vec())) Ok(old_val.map(|x| x.to_vec()))
} }
fn iter(&self, _tree: usize) -> Result<ValueIter<'_>> { fn iter(&self, _tree: usize) -> TxOpResult<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with Sled backend"); unimplemented!("Iterators in transactions not supported with Sled backend");
} }
fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'_>> { fn iter_rev(&self, _tree: usize) -> TxOpResult<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with Sled backend"); unimplemented!("Iterators in transactions not supported with Sled backend");
} }
@ -230,7 +242,7 @@ impl<'a> ITx for SledTx<'a> {
_tree: usize, _tree: usize,
_low: Bound<&'r [u8]>, _low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, _high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> { ) -> TxOpResult<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with Sled backend"); unimplemented!("Iterators in transactions not supported with Sled backend");
} }
fn range_rev<'r>( fn range_rev<'r>(
@ -238,7 +250,7 @@ impl<'a> ITx for SledTx<'a> {
_tree: usize, _tree: usize,
_low: Bound<&'r [u8]>, _low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, _high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> { ) -> TxOpResult<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with Sled backend"); unimplemented!("Iterators in transactions not supported with Sled backend");
} }
} }

View file

@ -10,7 +10,10 @@ use log::trace;
use rusqlite::{params, Connection, Rows, Statement, Transaction}; use rusqlite::{params, Connection, Rows, Statement, Transaction};
use crate::{Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter}; use crate::{
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
Value, ValueIter,
};
pub use rusqlite; pub use rusqlite;
@ -22,9 +25,9 @@ impl From<rusqlite::Error> for Error {
} }
} }
impl<T> From<rusqlite::Error> for TxError<T> { impl From<rusqlite::Error> for TxOpError {
fn from(e: rusqlite::Error) -> TxError<T> { fn from(e: rusqlite::Error) -> TxOpError {
TxError::Db(e.into()) TxOpError(e.into())
} }
} }
@ -260,20 +263,24 @@ impl IDb for SqliteDb {
let this_mut_ref: &mut SqliteDbInner = this.borrow_mut(); let this_mut_ref: &mut SqliteDbInner = this.borrow_mut();
let mut tx = SqliteTx { let mut tx = SqliteTx {
tx: this_mut_ref.db.transaction()?, tx: this_mut_ref
.db
.transaction()
.map_err(Error::from)
.map_err(TxError::Db)?,
trees: &this_mut_ref.trees, trees: &this_mut_ref.trees,
}; };
let res = match f.try_on(&mut tx) { let res = match f.try_on(&mut tx) {
TxFnResult::Ok => { TxFnResult::Ok => {
tx.tx.commit()?; tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
Ok(()) Ok(())
} }
TxFnResult::Abort => { TxFnResult::Abort => {
tx.tx.rollback()?; tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;
Err(TxError::Abort(())) Err(TxError::Abort(()))
} }
TxFnResult::DbErr => { TxFnResult::DbErr => {
tx.tx.rollback()?; tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;
Err(TxError::Db(Error( Err(TxError::Db(Error(
"(this message will be discarded)".into(), "(this message will be discarded)".into(),
))) )))
@ -293,15 +300,15 @@ struct SqliteTx<'a> {
} }
impl<'a> SqliteTx<'a> { impl<'a> SqliteTx<'a> {
fn get_tree(&self, i: usize) -> Result<&'_ str> { fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> {
self.trees.get(i).map(String::as_ref).ok_or_else(|| { self.trees.get(i).map(String::as_ref).ok_or_else(|| {
Error( TxOpError(Error(
"invalid tree id (it might have been openned after the transaction started)".into(), "invalid tree id (it might have been openned after the transaction started)".into(),
) ))
}) })
} }
fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> { fn internal_get(&self, tree: &str, key: &[u8]) -> TxOpResult<Option<Value>> {
let mut stmt = self let mut stmt = self
.tx .tx
.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
@ -314,11 +321,11 @@ impl<'a> SqliteTx<'a> {
} }
impl<'a> ITx for SqliteTx<'a> { impl<'a> ITx for SqliteTx<'a> {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
self.internal_get(tree, key) self.internal_get(tree, key)
} }
fn len(&self, tree: usize) -> Result<usize> { fn len(&self, tree: usize) -> TxOpResult<usize> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
let mut stmt = self.tx.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; let mut stmt = self.tx.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
let mut res_iter = stmt.query([])?; let mut res_iter = stmt.query([])?;
@ -328,7 +335,7 @@ impl<'a> ITx for SqliteTx<'a> {
} }
} }
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
let old_val = self.internal_get(tree, key)?; let old_val = self.internal_get(tree, key)?;
@ -351,7 +358,7 @@ impl<'a> ITx for SqliteTx<'a> {
Ok(old_val) Ok(old_val)
} }
fn remove(&mut self, tree: usize, key: &[u8]) -> Result<Option<Value>> { fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
let old_val = self.internal_get(tree, key)?; let old_val = self.internal_get(tree, key)?;
@ -365,10 +372,10 @@ impl<'a> ITx for SqliteTx<'a> {
Ok(old_val) Ok(old_val)
} }
fn iter(&self, _tree: usize) -> Result<ValueIter<'_>> { fn iter(&self, _tree: usize) -> TxOpResult<ValueIter<'_>> {
unimplemented!(); unimplemented!();
} }
fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'_>> { fn iter_rev(&self, _tree: usize) -> TxOpResult<ValueIter<'_>> {
unimplemented!(); unimplemented!();
} }
@ -377,7 +384,7 @@ impl<'a> ITx for SqliteTx<'a> {
_tree: usize, _tree: usize,
_low: Bound<&'r [u8]>, _low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, _high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> { ) -> TxOpResult<ValueIter<'_>> {
unimplemented!(); unimplemented!();
} }
fn range_rev<'r>( fn range_rev<'r>(
@ -385,7 +392,7 @@ impl<'a> ITx for SqliteTx<'a> {
_tree: usize, _tree: usize,
_low: Bound<&'r [u8]>, _low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, _high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> { ) -> TxOpResult<ValueIter<'_>> {
unimplemented!(); unimplemented!();
} }
} }

View file

@ -121,7 +121,7 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
_tx: &mut db::Transaction, _tx: &mut db::Transaction,
_old: Option<&Self::E>, _old: Option<&Self::E>,
_new: Option<&Self::E>, _new: Option<&Self::E>,
) -> db::Result<()> { ) -> db::TxOpResult<()> {
// nothing for now // nothing for now
Ok(()) Ok(())
} }

View file

@ -227,7 +227,7 @@ impl TableSchema for K2VItemTable {
tx: &mut db::Transaction, tx: &mut db::Transaction,
old: Option<&Self::E>, old: Option<&Self::E>,
new: Option<&Self::E>, new: Option<&Self::E>,
) -> db::Result<()> { ) -> db::TxOpResult<()> {
// 1. Count // 1. Count
let (old_entries, old_conflicts, old_values, old_bytes) = match old { let (old_entries, old_conflicts, old_values, old_bytes) = match old {
None => (0, 0, 0, 0), None => (0, 0, 0, 0),
@ -245,7 +245,7 @@ impl TableSchema for K2VItemTable {
.map(|e| &e.partition.partition_key) .map(|e| &e.partition.partition_key)
.unwrap_or_else(|| &new.unwrap().partition.partition_key); .unwrap_or_else(|| &new.unwrap().partition.partition_key);
match self.counter_table.count( let counter_res = self.counter_table.count(
tx, tx,
&count_pk, &count_pk,
count_sk, count_sk,
@ -255,10 +255,8 @@ impl TableSchema for K2VItemTable {
(VALUES, new_values - old_values), (VALUES, new_values - old_values),
(BYTES, new_bytes - old_bytes), (BYTES, new_bytes - old_bytes),
], ],
) { );
Ok(()) => (), if let Err(e) = db::unabort(counter_res)? {
Err(db::TxError::Db(e)) => return Err(e),
Err(db::TxError::Abort(e)) => {
// This result can be returned by `counter_table.count()` for instance // This result can be returned by `counter_table.count()` for instance
// if messagepack serialization or deserialization fails at some step. // if messagepack serialization or deserialization fails at some step.
// Warn admin but ignore this error for now, that's all we can do. // Warn admin but ignore this error for now, that's all we can do.
@ -267,7 +265,6 @@ impl TableSchema for K2VItemTable {
count_pk, count_sk, e count_pk, count_sk, e
); );
} }
}
// 2. Notify // 2. Notify
if let Some(new_ent) = new { if let Some(new_ent) = new {

View file

@ -58,7 +58,7 @@ impl TableSchema for BlockRefTable {
tx: &mut db::Transaction, tx: &mut db::Transaction,
old: Option<&Self::E>, old: Option<&Self::E>,
new: Option<&Self::E>, new: Option<&Self::E>,
) -> db::Result<()> { ) -> db::TxOpResult<()> {
#[allow(clippy::or_fun_call)] #[allow(clippy::or_fun_call)]
let block = old.or(new).unwrap().block; let block = old.or(new).unwrap().block;
let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false);

View file

@ -239,7 +239,7 @@ impl TableSchema for ObjectTable {
_tx: &mut db::Transaction, _tx: &mut db::Transaction,
old: Option<&Self::E>, old: Option<&Self::E>,
new: Option<&Self::E>, new: Option<&Self::E>,
) -> db::Result<()> { ) -> db::TxOpResult<()> {
let version_table = self.version_table.clone(); let version_table = self.version_table.clone();
let old = old.cloned(); let old = old.cloned();
let new = new.cloned(); let new = new.cloned();

View file

@ -144,7 +144,7 @@ impl TableSchema for VersionTable {
_tx: &mut db::Transaction, _tx: &mut db::Transaction,
old: Option<&Self::E>, old: Option<&Self::E>,
new: Option<&Self::E>, new: Option<&Self::E>,
) -> db::Result<()> { ) -> db::TxOpResult<()> {
let block_ref_table = self.block_ref_table.clone(); let block_ref_table = self.block_ref_table.clone();
let old = old.cloned(); let old = old.cloned();
let new = new.cloned(); let new = new.cloned();

View file

@ -93,7 +93,7 @@ pub trait TableSchema: Send + Sync {
_tx: &mut db::Transaction, _tx: &mut db::Transaction,
_old: Option<&Self::E>, _old: Option<&Self::E>,
_new: Option<&Self::E>, _new: Option<&Self::E>,
) -> db::Result<()> { ) -> db::TxOpResult<()> {
Ok(()) Ok(())
} }