Start LMDB adapter, with fixed semantics
Some checks reported errors
continuous-integration/drone/push Build was killed
continuous-integration/drone/pr Build was killed

This commit is contained in:
Alex 2022-06-03 15:31:07 +02:00
parent 16e0a655d0
commit 4e72c713f1
Signed by: lx
GPG key ID: 0E496D15096376BE
12 changed files with 394 additions and 84 deletions

23
Cargo.lock generated
View file

@ -1017,6 +1017,7 @@ dependencies = [
"clap 3.1.18", "clap 3.1.18",
"err-derive 0.3.1", "err-derive 0.3.1",
"hexdump", "hexdump",
"lmdb",
"log", "log",
"mktemp", "mktemp",
"rusqlite", "rusqlite",
@ -1822,6 +1823,28 @@ version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "lmdb"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b0908efb5d6496aa977d96f91413da2635a902e5e31dbef0bfb88986c248539"
dependencies = [
"bitflags",
"libc",
"lmdb-sys",
]
[[package]]
name = "lmdb-sys"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5b392838cfe8858e86fac37cf97a0e8c55cc60ba0a18365cadc33092f128ce9"
dependencies = [
"cc",
"libc",
"pkg-config",
]
[[package]] [[package]]
name = "lock_api" name = "lock_api"
version = "0.4.6" version = "0.4.6"

View file

@ -20,7 +20,7 @@ impl BlockRc {
/// Increment the reference counter associated to a hash. /// Increment the reference counter associated to a hash.
/// Returns true if the RC goes from zero to nonzero. /// Returns true if the RC goes from zero to nonzero.
pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> { pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> {
let old_rc = self.rc.db().transaction(|tx| { let old_rc = self.rc.db().transaction(|mut tx| {
let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
match old_rc.increment().serialize() { match old_rc.increment().serialize() {
Some(x) => { Some(x) => {
@ -36,7 +36,7 @@ impl BlockRc {
/// Decrement the reference counter associated to a hash. /// Decrement the reference counter associated to a hash.
/// Returns true if the RC is now zero. /// Returns true if the RC is now zero.
pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> { pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> {
let new_rc = self.rc.db().transaction(|tx| { let new_rc = self.rc.db().transaction(|mut tx| {
let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement();
match new_rc.serialize() { match new_rc.serialize() {
Some(x) => { Some(x) => {
@ -60,7 +60,7 @@ impl BlockRc {
/// deletion time has passed /// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec(); let now = now_msec();
self.rc.db().transaction(|tx| { self.rc.db().transaction(|mut tx| {
let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
match rcval { match rcval {
RcEntry::Deletable { at_time } if now > at_time => { RcEntry::Deletable { at_time } if now > at_time => {

View file

@ -21,8 +21,9 @@ err-derive = "0.3"
hexdump = "0.1" hexdump = "0.1"
log = "0.4" log = "0.4"
sled = "0.34" lmdb = "0.8"
rusqlite = "0.27" rusqlite = "0.27"
sled = "0.34"
# cli deps # cli deps
clap = { version = "3.1.18", optional = true, features = ["derive", "env"] } clap = { version = "3.1.18", optional = true, features = ["derive", "env"] }

View file

@ -1,3 +1,4 @@
pub mod lmdb_adapter;
pub mod sled_adapter; pub mod sled_adapter;
pub mod sqlite_adapter; pub mod sqlite_adapter;
@ -15,8 +16,7 @@ use err_derive::Error;
#[derive(Clone)] #[derive(Clone)]
pub struct Db(pub(crate) Arc<dyn IDb>); pub struct Db(pub(crate) Arc<dyn IDb>);
#[derive(Clone, Copy)] pub struct Transaction<'a>(pub(crate) &'a mut dyn ITx);
pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>);
#[derive(Clone)] #[derive(Clone)]
pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize); pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize);
@ -271,7 +271,7 @@ impl Tree {
#[allow(clippy::len_without_is_empty)] #[allow(clippy::len_without_is_empty)]
impl<'a> Transaction<'a> { impl<'a> Transaction<'a> {
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<Option<Value<'a>>> { pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<Option<Value<'_>>> {
self.0.get(tree.1, key.as_ref()) self.0.get(tree.1, key.as_ref())
} }
pub fn len(&self, tree: &Tree) -> Result<usize> { pub fn len(&self, tree: &Tree) -> Result<usize> {
@ -279,25 +279,25 @@ impl<'a> Transaction<'a> {
} }
pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>( pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(
&self, &mut self,
tree: &Tree, tree: &Tree,
key: T, key: T,
value: U, value: U,
) -> Result<()> { ) -> Result<()> {
self.0.insert(tree.1, key.as_ref(), value.as_ref()) self.0.insert(tree.1, key.as_ref(), value.as_ref())
} }
pub fn remove<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<bool> { pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> Result<bool> {
self.0.remove(tree.1, key.as_ref()) self.0.remove(tree.1, key.as_ref())
} }
pub fn iter(&self, tree: &Tree) -> Result<ValueIter<'a>> { pub fn iter(&self, tree: &Tree) -> Result<ValueIter<'_>> {
self.0.iter(tree.1) self.0.iter(tree.1)
} }
pub fn iter_rev(&self, tree: &Tree) -> Result<ValueIter<'a>> { pub fn iter_rev(&self, tree: &Tree) -> Result<ValueIter<'_>> {
self.0.iter_rev(tree.1) self.0.iter_rev(tree.1)
} }
pub fn range<K, R>(&self, tree: &Tree, range: R) -> Result<ValueIter<'a>> pub fn range<K, R>(&self, tree: &Tree, range: R) -> Result<ValueIter<'_>>
where where
K: AsRef<[u8]>, K: AsRef<[u8]>,
R: RangeBounds<K>, R: RangeBounds<K>,
@ -306,7 +306,7 @@ impl<'a> Transaction<'a> {
let eb = range.end_bound(); let eb = range.end_bound();
self.0.range(tree.1, get_bound(sb), get_bound(eb)) self.0.range(tree.1, get_bound(sb), get_bound(eb))
} }
pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> Result<ValueIter<'a>> pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> Result<ValueIter<'_>>
where where
K: AsRef<[u8]>, K: AsRef<[u8]>,
R: RangeBounds<K>, R: RangeBounds<K>,
@ -358,32 +358,32 @@ pub(crate) trait IDb: Send + Sync {
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>; fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
} }
pub(crate) trait ITx<'a> { pub(crate) trait ITx {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>; fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>>;
fn len(&self, tree: usize) -> Result<usize>; fn len(&self, tree: usize) -> Result<usize>;
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool>; fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool>;
fn iter(&self, tree: usize) -> Result<ValueIter<'a>>; fn iter(&self, tree: usize) -> Result<ValueIter<'_>>;
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'a>>; fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>;
fn range<'r>( fn range<'r>(
&self, &self,
tree: usize, tree: usize,
low: Bound<&'r [u8]>, low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>, high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>>; ) -> Result<ValueIter<'_>>;
fn range_rev<'r>( fn range_rev<'r>(
&self, &self,
tree: usize, tree: usize,
low: Bound<&'r [u8]>, low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>, high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>>; ) -> Result<ValueIter<'_>>;
} }
pub(crate) trait ITxFn { pub(crate) trait ITxFn {
fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult; fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult;
} }
pub(crate) enum TxFnResult { pub(crate) enum TxFnResult {
@ -404,7 +404,7 @@ impl<F, R, E> ITxFn for TxFn<F, R, E>
where where
F: Fn(Transaction<'_>) -> TxResult<R, E>, F: Fn(Transaction<'_>) -> TxResult<R, E>,
{ {
fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult { fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
let res = (self.function)(Transaction(tx)); let res = (self.function)(Transaction(tx));
let res2 = match &res { let res2 = match &res {
Ok(_) => TxFnResult::Ok, Ok(_) => TxFnResult::Ok,

270
src/db/lmdb_adapter.rs Normal file
View file

@ -0,0 +1,270 @@
use core::marker::PhantomPinned;
use core::ops::Bound;
use core::pin::Pin;
use core::ptr::NonNull;
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use lmdb::{
Database, DatabaseFlags, Environment, RoTransaction, RwTransaction, Transaction, WriteFlags,
};
use crate::{
Db, Error, IDb, ITx, ITxFn, IValue, Result, TxError, TxFnResult, TxResult, Value, ValueIter,
};
pub use lmdb;
// -- err
impl From<lmdb::Error> for Error {
fn from(e: lmdb::Error) -> Error {
Error(format!("LMDB: {}", e).into())
}
}
impl<T> From<lmdb::Error> for TxError<T> {
fn from(e: lmdb::Error) -> TxError<T> {
TxError::Db(e.into())
}
}
// -- db
pub struct LmdbDb {
db: lmdb::Environment,
trees: RwLock<(Vec<lmdb::Database>, HashMap<String, usize>)>,
}
impl LmdbDb {
pub fn init(db: lmdb::Environment) -> Db {
let s = Self {
db,
trees: RwLock::new((Vec::new(), HashMap::new())),
};
Db(Arc::new(s))
}
fn get_tree(&self, i: usize) -> Result<lmdb::Database> {
self.trees
.read()
.unwrap()
.0
.get(i)
.cloned()
.ok_or_else(|| Error("invalid tree id".into()))
}
}
impl IDb for LmdbDb {
fn open_tree(&self, name: &str) -> Result<usize> {
let mut trees = self.trees.write().unwrap();
if let Some(i) = trees.1.get(name) {
Ok(*i)
} else {
let tree = self.db.create_db(Some(name), DatabaseFlags::empty())?;
let i = trees.0.len();
trees.0.push(tree);
trees.1.insert(name.to_string(), i);
Ok(i)
}
}
fn list_trees(&self) -> Result<Vec<String>> {
unimplemented!()
}
// ----
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
let tree = self.get_tree(tree)?;
let res = TxAndValue {
tx: self.db.begin_ro_txn()?,
value: NonNull::dangling(),
_pin: PhantomPinned,
};
let mut boxed = Box::pin(res);
unsafe {
let tx = NonNull::from(&boxed.tx);
let val = match tx.as_ref().get(tree, &key) {
Err(lmdb::Error::NotFound) => return Ok(None),
v => v?,
};
let mut_ref: Pin<&mut TxAndValue<'_>> = Pin::as_mut(&mut boxed);
Pin::get_unchecked_mut(mut_ref).value = NonNull::from(&val);
}
Ok(Some(Value(Box::new(TxAndValuePin(boxed)))))
}
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
unimplemented!()
}
fn len(&self, tree: usize) -> Result<usize> {
unimplemented!()
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?;
let mut tx = self.db.begin_rw_txn()?;
tx.put(tree, &key, &value, WriteFlags::empty())?;
tx.commit()?;
Ok(())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
unimplemented!()
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
unimplemented!()
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
unimplemented!()
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
unimplemented!()
}
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
let trees = self.trees.read().unwrap();
let mut tx = LmdbTx {
trees: &trees.0[..],
tx: self.db.begin_rw_txn()?,
};
let res = f.try_on(&mut tx);
match res {
TxFnResult::Ok => {
tx.tx.commit()?;
Ok(())
}
TxFnResult::Abort => {
tx.tx.abort();
Err(TxError::Abort(()))
}
TxFnResult::DbErr => {
tx.tx.abort();
Err(TxError::Db(Error(
"(this message will be discarded)".into(),
)))
}
}
}
}
// ----
struct LmdbTx<'a, 'db> {
trees: &'db [Database],
tx: RwTransaction<'a>,
}
impl<'a, 'db> LmdbTx<'a, 'db> {
fn get_tree(&self, i: usize) -> Result<&Database> {
self.trees.get(i).ok_or_else(|| {
Error(
"invalid tree id (it might have been openned after the transaction started)".into(),
)
})
}
}
impl<'a, 'db> ITx for LmdbTx<'a, 'db> {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
let tree = self.get_tree(tree)?;
match self.tx.get::<'a, _>(*tree, &key) {
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(e.into()),
Ok(v) => Ok(Some(Value(Box::new(v)))),
}
}
fn len(&self, _tree: usize) -> Result<usize> {
unimplemented!(".len() in transaction not supported with LMDB backend")
}
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?;
self.tx.put(*tree, &key, &value, WriteFlags::empty())?;
Ok(())
}
fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;
match self.tx.del::<'a, _>(*tree, &key, None) {
Ok(()) => Ok(true),
Err(lmdb::Error::NotFound) => Ok(false),
Err(e) => Err(e.into()),
}
}
fn iter(&self, _tree: usize) -> Result<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with LMDB backend");
}
fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with LMDB backend");
}
fn range<'r>(
&self,
_tree: usize,
_low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with LMDB backend");
}
fn range_rev<'r>(
&self,
_tree: usize,
_low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with LMDB backend");
}
}
// ----
struct TxAndValue<'a> {
tx: RoTransaction<'a>,
value: NonNull<&'a [u8]>,
_pin: PhantomPinned,
}
struct TxAndValuePin<'a>(Pin<Box<TxAndValue<'a>>>);
impl<'a> IValue<'a> for TxAndValuePin<'a> {
fn take_maybe(&mut self) -> Vec<u8> {
self.as_ref().to_vec()
}
}
impl<'a> AsRef<[u8]> for TxAndValuePin<'a> {
fn as_ref(&self) -> &[u8] {
unsafe { self.0.value.as_ref() }
}
}
impl<'a> std::borrow::Borrow<[u8]> for TxAndValuePin<'a> {
fn borrow(&self) -> &[u8] {
self.as_ref()
}
}

View file

@ -19,7 +19,7 @@ pub use sled;
impl From<sled::Error> for Error { impl From<sled::Error> for Error {
fn from(e: sled::Error) -> Error { fn from(e: sled::Error) -> Error {
Error(format!("{}", e).into()) Error(format!("Sled: {}", e).into())
} }
} }
@ -162,11 +162,11 @@ impl IDb for SledDb {
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
let trees = self.trees.read().unwrap(); let trees = self.trees.read().unwrap();
let res = trees.0.transaction(|txtrees| { let res = trees.0.transaction(|txtrees| {
let tx = SledTx { let mut tx = SledTx {
trees: txtrees, trees: txtrees,
err: Cell::new(None), err: Cell::new(None),
}; };
match f.try_on(&tx) { match f.try_on(&mut tx) {
TxFnResult::Ok => { TxFnResult::Ok => {
assert!(tx.err.into_inner().is_none()); assert!(tx.err.into_inner().is_none());
Ok(()) Ok(())
@ -217,8 +217,8 @@ impl<'a> SledTx<'a> {
} }
} }
impl<'a> ITx<'a> for SledTx<'a> { impl<'a> ITx for SledTx<'a> {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> { fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
let tmp = self.save_error(tree.get(key))?; let tmp = self.save_error(tree.get(key))?;
Ok(tmp.map(From::from)) Ok(tmp.map(From::from))
@ -227,20 +227,20 @@ impl<'a> ITx<'a> for SledTx<'a> {
unimplemented!(".len() in transaction not supported with Sled backend") unimplemented!(".len() in transaction not supported with Sled backend")
} }
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
self.save_error(tree.insert(key, value))?; self.save_error(tree.insert(key, value))?;
Ok(()) Ok(())
} }
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> { fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
Ok(self.save_error(tree.remove(key))?.is_some()) Ok(self.save_error(tree.remove(key))?.is_some())
} }
fn iter(&self, _tree: usize) -> Result<ValueIter<'a>> { fn iter(&self, _tree: usize) -> Result<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with Sled backend"); unimplemented!("Iterators in transactions not supported with Sled backend");
} }
fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'a>> { fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with Sled backend"); unimplemented!("Iterators in transactions not supported with Sled backend");
} }
@ -249,7 +249,7 @@ impl<'a> ITx<'a> for SledTx<'a> {
_tree: usize, _tree: usize,
_low: Bound<&'r [u8]>, _low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, _high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>> { ) -> Result<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with Sled backend"); unimplemented!("Iterators in transactions not supported with Sled backend");
} }
fn range_rev<'r>( fn range_rev<'r>(
@ -257,7 +257,7 @@ impl<'a> ITx<'a> for SledTx<'a> {
_tree: usize, _tree: usize,
_low: Bound<&'r [u8]>, _low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, _high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>> { ) -> Result<ValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with Sled backend"); unimplemented!("Iterators in transactions not supported with Sled backend");
} }
} }

View file

@ -17,7 +17,7 @@ pub use rusqlite;
impl From<rusqlite::Error> for Error { impl From<rusqlite::Error> for Error {
fn from(e: rusqlite::Error) -> Error { fn from(e: rusqlite::Error) -> Error {
Error(format!("{}", e).into()) Error(format!("Sqlite: {}", e).into())
} }
} }
@ -235,11 +235,11 @@ impl IDb for SqliteDb {
let mut db = self.db.lock().unwrap(); let mut db = self.db.lock().unwrap();
trace!("transaction: lock acquired"); trace!("transaction: lock acquired");
let tx = SqliteTx { let mut tx = SqliteTx {
tx: db.transaction()?, tx: db.transaction()?,
trees: trees.as_ref(), trees: trees.as_ref(),
}; };
let res = match f.try_on(&tx) { let res = match f.try_on(&mut tx) {
TxFnResult::Ok => { TxFnResult::Ok => {
tx.tx.commit()?; tx.tx.commit()?;
Ok(()) Ok(())
@ -278,8 +278,8 @@ impl<'a> SqliteTx<'a> {
} }
} }
impl<'a> ITx<'a> for SqliteTx<'a> { impl<'a> ITx for SqliteTx<'a> {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> { fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
let mut stmt = self let mut stmt = self
.tx .tx
@ -300,7 +300,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
} }
} }
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
self.tx.execute( self.tx.execute(
&format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree), &format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),
@ -308,7 +308,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
)?; )?;
Ok(()) Ok(())
} }
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> { fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
let res = self let res = self
.tx .tx
@ -316,10 +316,10 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
Ok(res > 0) Ok(res > 0)
} }
fn iter(&self, _tree: usize) -> Result<ValueIter<'a>> { fn iter(&self, _tree: usize) -> Result<ValueIter<'_>> {
unimplemented!(); unimplemented!();
} }
fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'a>> { fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'_>> {
unimplemented!(); unimplemented!();
} }
@ -328,7 +328,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
_tree: usize, _tree: usize,
_low: Bound<&'r [u8]>, _low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, _high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>> { ) -> Result<ValueIter<'_>> {
unimplemented!(); unimplemented!();
} }
fn range_rev<'r>( fn range_rev<'r>(
@ -336,7 +336,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
_tree: usize, _tree: usize,
_low: Bound<&'r [u8]>, _low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, _high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>> { ) -> Result<ValueIter<'_>> {
unimplemented!(); unimplemented!();
} }
} }

View file

@ -1,5 +1,6 @@
use crate::*; use crate::*;
use crate::lmdb_adapter::LmdbDb;
use crate::sled_adapter::SledDb; use crate::sled_adapter::SledDb;
use crate::sqlite_adapter::SqliteDb; use crate::sqlite_adapter::SqliteDb;
@ -16,7 +17,7 @@ fn test_suite(db: Db) {
tree.insert(ka, va).unwrap(); tree.insert(ka, va).unwrap();
assert_eq!(tree.get(ka).unwrap().unwrap(), va); assert_eq!(tree.get(ka).unwrap().unwrap(), va);
let res = db.transaction::<_, (), _>(|tx| { let res = db.transaction::<_, (), _>(|mut tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va); assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
tx.insert(&tree, ka, vb).unwrap(); tx.insert(&tree, ka, vb).unwrap();
@ -28,7 +29,7 @@ fn test_suite(db: Db) {
assert!(matches!(res, Ok(12))); assert!(matches!(res, Ok(12)));
assert_eq!(tree.get(ka).unwrap().unwrap(), vb); assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
let res = db.transaction::<(), _, _>(|tx| { let res = db.transaction::<(), _, _>(|mut tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb); assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
tx.insert(&tree, ka, vc).unwrap(); tx.insert(&tree, ka, vc).unwrap();
@ -78,6 +79,18 @@ fn test_suite(db: Db) {
drop(iter); drop(iter);
} }
#[test]
fn test_lmdb_db() {
let path = mktemp::Temp::new_dir().unwrap();
let db = lmdb::Environment::new()
.set_max_dbs(100)
.open(&path)
.unwrap();
let db = LmdbDb::init(db);
test_suite(db);
drop(path);
}
#[test] #[test]
fn test_sled_db() { fn test_sled_db() {
let path = mktemp::Temp::new_dir().unwrap(); let path = mktemp::Temp::new_dir().unwrap();

View file

@ -179,7 +179,7 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> {
let tree_key = self.table.data.tree_key(pk, sk); let tree_key = self.table.data.tree_key(pk, sk);
let new_entry = self.local_counter.db().transaction(|tx| { let new_entry = self.local_counter.db().transaction(|mut tx| {
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
Some(old_bytes) => rmp_serde::decode::from_slice::<LocalCounterEntry>(&old_bytes) Some(old_bytes) => rmp_serde::decode::from_slice::<LocalCounterEntry>(&old_bytes)
.map_err(Error::RmpDecode) .map_err(Error::RmpDecode)

View file

@ -182,7 +182,7 @@ where
tree_key: &[u8], tree_key: &[u8],
f: impl Fn(Option<F::E>) -> F::E, f: impl Fn(Option<F::E>) -> F::E,
) -> Result<Option<F::E>, Error> { ) -> Result<Option<F::E>, Error> {
let changed = self.store.db().transaction(|tx| { let changed = self.store.db().transaction(|mut tx| {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? {
Some(old_bytes) => { Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
@ -203,6 +203,7 @@ where
.map_err(Error::RmpEncode) .map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?; .map_err(db::TxError::Abort)?;
let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
drop(old_bytes);
if value_changed || encoding_changed { if value_changed || encoding_changed {
let new_bytes_hash = blake2sum(&new_bytes[..]); let new_bytes_hash = blake2sum(&new_bytes[..]);
@ -241,15 +242,16 @@ where
} }
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
let removed = self.store.db().transaction(|tx| { let removed = self.store.db().transaction(|mut tx| {
if let Some(cur_v) = tx.get(&self.store, k)? { let remove = match tx.get(&self.store, k)? {
if cur_v == v { Some(cur_v) if cur_v == v => true,
_ => false,
};
if remove {
tx.remove(&self.store, k)?; tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?; tx.insert(&self.merkle_todo, k, vec![])?;
return Ok(true);
} }
} Ok(remove)
Ok(false)
})?; })?;
if removed { if removed {
@ -267,15 +269,16 @@ where
k: &[u8], k: &[u8],
vhash: Hash, vhash: Hash,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let removed = self.store.db().transaction(|tx| { let removed = self.store.db().transaction(|mut tx| {
if let Some(cur_v) = tx.get(&self.store, k)? { let remove_v = match tx.get(&self.store, k)? {
if blake2sum(&cur_v[..]) == vhash { Some(cur_v) if blake2sum(&cur_v[..]) == vhash => Some(cur_v.into_vec()),
_ => None,
};
if remove_v.is_some() {
tx.remove(&self.store, k)?; tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?; tx.insert(&self.merkle_todo, k, vec![])?;
return Ok(Some(cur_v.into_vec()));
} }
} Ok(remove_v)
Ok(None)
})?; })?;
if let Some(old_v) = removed { if let Some(old_v) = removed {

View file

@ -376,14 +376,14 @@ impl GcTodoEntry {
/// what we have to do is still the same /// what we have to do is still the same
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> { pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
let key = self.todo_table_key(); let key = self.todo_table_key();
gc_todo_tree.db().transaction(|tx| { gc_todo_tree.db().transaction(|mut tx| {
let old_val = tx.get(gc_todo_tree, &key)?; let remove = match tx.get(gc_todo_tree, &key)? {
match old_val { Some(ov) if ov == self.value_hash.as_slice() => true,
Some(ov) if ov == self.value_hash.as_slice() => { _ => false,
};
if remove {
tx.remove(gc_todo_tree, &key)?; tx.remove(gc_todo_tree, &key)?;
} }
_ => (),
}
tx.commit(()) tx.commit(())
})?; })?;
Ok(()) Ok(())

View file

@ -137,17 +137,17 @@ where
self.data self.data
.merkle_tree .merkle_tree
.db() .db()
.transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?;
let deleted = self.data.merkle_todo.db().transaction(|tx| { let deleted = self.data.merkle_todo.db().transaction(|mut tx| {
let old_val = tx.get(&self.data.merkle_todo, k)?; let remove = match tx.get(&self.data.merkle_todo, k)? {
match old_val { Some(ov) if ov == vhash_by => true,
Some(ov) if ov == vhash_by => { _ => false,
};
if remove {
tx.remove(&self.data.merkle_todo, k)?; tx.remove(&self.data.merkle_todo, k)?;
tx.commit(true)
}
_ => tx.commit(false),
} }
Ok(remove)
})?; })?;
if !deleted { if !deleted {
@ -162,7 +162,7 @@ where
fn update_item_rec( fn update_item_rec(
&self, &self,
tx: db::Transaction<'_>, tx: &mut db::Transaction<'_>,
k: &[u8], k: &[u8],
khash: &Hash, khash: &Hash,
key: &MerkleNodeKey, key: &MerkleNodeKey,
@ -288,7 +288,7 @@ where
fn read_node_txn( fn read_node_txn(
&self, &self,
tx: db::Transaction<'_>, tx: &mut db::Transaction<'_>,
k: &MerkleNodeKey, k: &MerkleNodeKey,
) -> db::TxResult<MerkleNode, Error> { ) -> db::TxResult<MerkleNode, Error> {
let ent = tx.get(&self.data.merkle_tree, k.encode())?; let ent = tx.get(&self.data.merkle_tree, k.encode())?;
@ -297,7 +297,7 @@ where
fn put_node_txn( fn put_node_txn(
&self, &self,
tx: db::Transaction<'_>, tx: &mut db::Transaction<'_>,
k: &MerkleNodeKey, k: &MerkleNodeKey,
v: &MerkleNode, v: &MerkleNode,
) -> db::TxResult<Hash, Error> { ) -> db::TxResult<Hash, Error> {