From 4e72c713f157ae9d5103a461c4c213b2aa6a84b9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 3 Jun 2022 15:31:07 +0200 Subject: [PATCH] Start LMDB adapter, with fixed semantics --- Cargo.lock | 23 ++++ src/block/rc.rs | 6 +- src/db/Cargo.toml | 3 +- src/db/lib.rs | 38 +++--- src/db/lmdb_adapter.rs | 270 +++++++++++++++++++++++++++++++++++++ src/db/sled_adapter.rs | 22 +-- src/db/sqlite_adapter.rs | 22 +-- src/db/test.rs | 17 ++- src/model/index_counter.rs | 2 +- src/table/data.rs | 37 ++--- src/table/gc.rs | 14 +- src/table/merkle.rs | 24 ++-- 12 files changed, 394 insertions(+), 84 deletions(-) create mode 100644 src/db/lmdb_adapter.rs diff --git a/Cargo.lock b/Cargo.lock index a953779b..b051e72e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1017,6 +1017,7 @@ dependencies = [ "clap 3.1.18", "err-derive 0.3.1", "hexdump", + "lmdb", "log", "mktemp", "rusqlite", @@ -1822,6 +1823,28 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" +[[package]] +name = "lmdb" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0908efb5d6496aa977d96f91413da2635a902e5e31dbef0bfb88986c248539" +dependencies = [ + "bitflags", + "libc", + "lmdb-sys", +] + +[[package]] +name = "lmdb-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5b392838cfe8858e86fac37cf97a0e8c55cc60ba0a18365cadc33092f128ce9" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "lock_api" version = "0.4.6" diff --git a/src/block/rc.rs b/src/block/rc.rs index 7d85f67e..e0b952fd 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -20,7 +20,7 @@ impl BlockRc { /// Increment the reference counter associated to a hash. /// Returns true if the RC goes from zero to nonzero. pub(crate) fn block_incref(&self, hash: &Hash) -> Result { - let old_rc = self.rc.db().transaction(|tx| { + let old_rc = self.rc.db().transaction(|mut tx| { let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); match old_rc.increment().serialize() { Some(x) => { @@ -36,7 +36,7 @@ impl BlockRc { /// Decrement the reference counter associated to a hash. /// Returns true if the RC is now zero. pub(crate) fn block_decref(&self, hash: &Hash) -> Result { - let new_rc = self.rc.db().transaction(|tx| { + let new_rc = self.rc.db().transaction(|mut tx| { let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); match new_rc.serialize() { Some(x) => { @@ -60,7 +60,7 @@ impl BlockRc { /// deletion time has passed pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { let now = now_msec(); - self.rc.db().transaction(|tx| { + self.rc.db().transaction(|mut tx| { let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); match rcval { RcEntry::Deletable { at_time } if now > at_time => { diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 36b96229..b4601ff7 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -21,8 +21,9 @@ err-derive = "0.3" hexdump = "0.1" log = "0.4" -sled = "0.34" +lmdb = "0.8" rusqlite = "0.27" +sled = "0.34" # cli deps clap = { version = "3.1.18", optional = true, features = ["derive", "env"] } diff --git a/src/db/lib.rs b/src/db/lib.rs index 045c16c5..86042eaf 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -1,3 +1,4 @@ +pub mod lmdb_adapter; pub mod sled_adapter; pub mod sqlite_adapter; @@ -15,8 +16,7 @@ use err_derive::Error; #[derive(Clone)] pub struct Db(pub(crate) Arc); -#[derive(Clone, Copy)] -pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>); +pub struct Transaction<'a>(pub(crate) &'a mut dyn ITx); #[derive(Clone)] pub struct Tree(pub(crate) Arc, pub(crate) usize); @@ -271,7 +271,7 @@ impl Tree { #[allow(clippy::len_without_is_empty)] impl<'a> Transaction<'a> { - pub fn get>(&self, tree: &Tree, key: T) -> Result>> { + pub fn get>(&self, tree: &Tree, key: T) -> Result>> { self.0.get(tree.1, key.as_ref()) } pub fn len(&self, tree: &Tree) -> Result { @@ -279,25 +279,25 @@ impl<'a> Transaction<'a> { } pub fn insert, U: AsRef<[u8]>>( - &self, + &mut self, tree: &Tree, key: T, value: U, ) -> Result<()> { self.0.insert(tree.1, key.as_ref(), value.as_ref()) } - pub fn remove>(&self, tree: &Tree, key: T) -> Result { + pub fn remove>(&mut self, tree: &Tree, key: T) -> Result { self.0.remove(tree.1, key.as_ref()) } - pub fn iter(&self, tree: &Tree) -> Result> { + pub fn iter(&self, tree: &Tree) -> Result> { self.0.iter(tree.1) } - pub fn iter_rev(&self, tree: &Tree) -> Result> { + pub fn iter_rev(&self, tree: &Tree) -> Result> { self.0.iter_rev(tree.1) } - pub fn range(&self, tree: &Tree, range: R) -> Result> + pub fn range(&self, tree: &Tree, range: R) -> Result> where K: AsRef<[u8]>, R: RangeBounds, @@ -306,7 +306,7 @@ impl<'a> Transaction<'a> { let eb = range.end_bound(); self.0.range(tree.1, get_bound(sb), get_bound(eb)) } - pub fn range_rev(&self, tree: &Tree, range: R) -> Result> + pub fn range_rev(&self, tree: &Tree, range: R) -> Result> where K: AsRef<[u8]>, R: RangeBounds, @@ -358,32 +358,32 @@ pub(crate) trait IDb: Send + Sync { fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>; } -pub(crate) trait ITx<'a> { - fn get(&self, tree: usize, key: &[u8]) -> Result>>; +pub(crate) trait ITx { + fn get(&self, tree: usize, key: &[u8]) -> Result>>; fn len(&self, tree: usize) -> Result; - fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; - fn remove(&self, tree: usize, key: &[u8]) -> Result; + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; + fn remove(&mut self, tree: usize, key: &[u8]) -> Result; - fn iter(&self, tree: usize) -> Result>; - fn iter_rev(&self, tree: usize) -> Result>; + fn iter(&self, tree: usize) -> Result>; + fn iter_rev(&self, tree: usize) -> Result>; fn range<'r>( &self, tree: usize, low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, - ) -> Result>; + ) -> Result>; fn range_rev<'r>( &self, tree: usize, low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, - ) -> Result>; + ) -> Result>; } pub(crate) trait ITxFn { - fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult; + fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult; } pub(crate) enum TxFnResult { @@ -404,7 +404,7 @@ impl ITxFn for TxFn where F: Fn(Transaction<'_>) -> TxResult, { - fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult { + fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult { let res = (self.function)(Transaction(tx)); let res2 = match &res { Ok(_) => TxFnResult::Ok, diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs new file mode 100644 index 00000000..caf21517 --- /dev/null +++ b/src/db/lmdb_adapter.rs @@ -0,0 +1,270 @@ +use core::marker::PhantomPinned; +use core::ops::Bound; +use core::pin::Pin; +use core::ptr::NonNull; + +use std::cell::RefCell; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use lmdb::{ + Database, DatabaseFlags, Environment, RoTransaction, RwTransaction, Transaction, WriteFlags, +}; + +use crate::{ + Db, Error, IDb, ITx, ITxFn, IValue, Result, TxError, TxFnResult, TxResult, Value, ValueIter, +}; + +pub use lmdb; + +// -- err + +impl From for Error { + fn from(e: lmdb::Error) -> Error { + Error(format!("LMDB: {}", e).into()) + } +} + +impl From for TxError { + fn from(e: lmdb::Error) -> TxError { + TxError::Db(e.into()) + } +} + +// -- db + +pub struct LmdbDb { + db: lmdb::Environment, + trees: RwLock<(Vec, HashMap)>, +} + +impl LmdbDb { + pub fn init(db: lmdb::Environment) -> Db { + let s = Self { + db, + trees: RwLock::new((Vec::new(), HashMap::new())), + }; + Db(Arc::new(s)) + } + + fn get_tree(&self, i: usize) -> Result { + self.trees + .read() + .unwrap() + .0 + .get(i) + .cloned() + .ok_or_else(|| Error("invalid tree id".into())) + } +} + +impl IDb for LmdbDb { + fn open_tree(&self, name: &str) -> Result { + let mut trees = self.trees.write().unwrap(); + if let Some(i) = trees.1.get(name) { + Ok(*i) + } else { + let tree = self.db.create_db(Some(name), DatabaseFlags::empty())?; + let i = trees.0.len(); + trees.0.push(tree); + trees.1.insert(name.to_string(), i); + Ok(i) + } + } + + fn list_trees(&self) -> Result> { + unimplemented!() + } + + // ---- + + fn get(&self, tree: usize, key: &[u8]) -> Result>> { + let tree = self.get_tree(tree)?; + + let res = TxAndValue { + tx: self.db.begin_ro_txn()?, + value: NonNull::dangling(), + _pin: PhantomPinned, + }; + let mut boxed = Box::pin(res); + + unsafe { + let tx = NonNull::from(&boxed.tx); + let val = match tx.as_ref().get(tree, &key) { + Err(lmdb::Error::NotFound) => return Ok(None), + v => v?, + }; + + let mut_ref: Pin<&mut TxAndValue<'_>> = Pin::as_mut(&mut boxed); + Pin::get_unchecked_mut(mut_ref).value = NonNull::from(&val); + } + + Ok(Some(Value(Box::new(TxAndValuePin(boxed))))) + } + + fn remove(&self, tree: usize, key: &[u8]) -> Result { + unimplemented!() + } + + fn len(&self, tree: usize) -> Result { + unimplemented!() + } + + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + let tree = self.get_tree(tree)?; + let mut tx = self.db.begin_rw_txn()?; + tx.put(tree, &key, &value, WriteFlags::empty())?; + tx.commit()?; + Ok(()) + } + + fn iter(&self, tree: usize) -> Result> { + unimplemented!() + } + + fn iter_rev(&self, tree: usize) -> Result> { + unimplemented!() + } + + fn range<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + unimplemented!() + } + fn range_rev<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + unimplemented!() + } + + // ---- + + fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { + let trees = self.trees.read().unwrap(); + let mut tx = LmdbTx { + trees: &trees.0[..], + tx: self.db.begin_rw_txn()?, + }; + + let res = f.try_on(&mut tx); + match res { + TxFnResult::Ok => { + tx.tx.commit()?; + Ok(()) + } + TxFnResult::Abort => { + tx.tx.abort(); + Err(TxError::Abort(())) + } + TxFnResult::DbErr => { + tx.tx.abort(); + Err(TxError::Db(Error( + "(this message will be discarded)".into(), + ))) + } + } + } +} + +// ---- + +struct LmdbTx<'a, 'db> { + trees: &'db [Database], + tx: RwTransaction<'a>, +} + +impl<'a, 'db> LmdbTx<'a, 'db> { + fn get_tree(&self, i: usize) -> Result<&Database> { + self.trees.get(i).ok_or_else(|| { + Error( + "invalid tree id (it might have been openned after the transaction started)".into(), + ) + }) + } +} + +impl<'a, 'db> ITx for LmdbTx<'a, 'db> { + fn get(&self, tree: usize, key: &[u8]) -> Result>> { + let tree = self.get_tree(tree)?; + match self.tx.get::<'a, _>(*tree, &key) { + Err(lmdb::Error::NotFound) => Ok(None), + Err(e) => Err(e.into()), + Ok(v) => Ok(Some(Value(Box::new(v)))), + } + } + fn len(&self, _tree: usize) -> Result { + unimplemented!(".len() in transaction not supported with LMDB backend") + } + + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + let tree = self.get_tree(tree)?; + self.tx.put(*tree, &key, &value, WriteFlags::empty())?; + Ok(()) + } + fn remove(&mut self, tree: usize, key: &[u8]) -> Result { + let tree = self.get_tree(tree)?; + match self.tx.del::<'a, _>(*tree, &key, None) { + Ok(()) => Ok(true), + Err(lmdb::Error::NotFound) => Ok(false), + Err(e) => Err(e.into()), + } + } + + fn iter(&self, _tree: usize) -> Result> { + unimplemented!("Iterators in transactions not supported with LMDB backend"); + } + fn iter_rev(&self, _tree: usize) -> Result> { + unimplemented!("Iterators in transactions not supported with LMDB backend"); + } + + fn range<'r>( + &self, + _tree: usize, + _low: Bound<&'r [u8]>, + _high: Bound<&'r [u8]>, + ) -> Result> { + unimplemented!("Iterators in transactions not supported with LMDB backend"); + } + fn range_rev<'r>( + &self, + _tree: usize, + _low: Bound<&'r [u8]>, + _high: Bound<&'r [u8]>, + ) -> Result> { + unimplemented!("Iterators in transactions not supported with LMDB backend"); + } +} + +// ---- + +struct TxAndValue<'a> { + tx: RoTransaction<'a>, + value: NonNull<&'a [u8]>, + _pin: PhantomPinned, +} + +struct TxAndValuePin<'a>(Pin>>); + +impl<'a> IValue<'a> for TxAndValuePin<'a> { + fn take_maybe(&mut self) -> Vec { + self.as_ref().to_vec() + } +} + +impl<'a> AsRef<[u8]> for TxAndValuePin<'a> { + fn as_ref(&self) -> &[u8] { + unsafe { self.0.value.as_ref() } + } +} + +impl<'a> std::borrow::Borrow<[u8]> for TxAndValuePin<'a> { + fn borrow(&self) -> &[u8] { + self.as_ref() + } +} diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs index 3388b0ca..2953785e 100644 --- a/src/db/sled_adapter.rs +++ b/src/db/sled_adapter.rs @@ -19,7 +19,7 @@ pub use sled; impl From for Error { fn from(e: sled::Error) -> Error { - Error(format!("{}", e).into()) + Error(format!("Sled: {}", e).into()) } } @@ -162,11 +162,11 @@ impl IDb for SledDb { fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { let trees = self.trees.read().unwrap(); let res = trees.0.transaction(|txtrees| { - let tx = SledTx { + let mut tx = SledTx { trees: txtrees, err: Cell::new(None), }; - match f.try_on(&tx) { + match f.try_on(&mut tx) { TxFnResult::Ok => { assert!(tx.err.into_inner().is_none()); Ok(()) @@ -217,8 +217,8 @@ impl<'a> SledTx<'a> { } } -impl<'a> ITx<'a> for SledTx<'a> { - fn get(&self, tree: usize, key: &[u8]) -> Result>> { +impl<'a> ITx for SledTx<'a> { + fn get(&self, tree: usize, key: &[u8]) -> Result>> { let tree = self.get_tree(tree)?; let tmp = self.save_error(tree.get(key))?; Ok(tmp.map(From::from)) @@ -227,20 +227,20 @@ impl<'a> ITx<'a> for SledTx<'a> { unimplemented!(".len() in transaction not supported with Sled backend") } - fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { let tree = self.get_tree(tree)?; self.save_error(tree.insert(key, value))?; Ok(()) } - fn remove(&self, tree: usize, key: &[u8]) -> Result { + fn remove(&mut self, tree: usize, key: &[u8]) -> Result { let tree = self.get_tree(tree)?; Ok(self.save_error(tree.remove(key))?.is_some()) } - fn iter(&self, _tree: usize) -> Result> { + fn iter(&self, _tree: usize) -> Result> { unimplemented!("Iterators in transactions not supported with Sled backend"); } - fn iter_rev(&self, _tree: usize) -> Result> { + fn iter_rev(&self, _tree: usize) -> Result> { unimplemented!("Iterators in transactions not supported with Sled backend"); } @@ -249,7 +249,7 @@ impl<'a> ITx<'a> for SledTx<'a> { _tree: usize, _low: Bound<&'r [u8]>, _high: Bound<&'r [u8]>, - ) -> Result> { + ) -> Result> { unimplemented!("Iterators in transactions not supported with Sled backend"); } fn range_rev<'r>( @@ -257,7 +257,7 @@ impl<'a> ITx<'a> for SledTx<'a> { _tree: usize, _low: Bound<&'r [u8]>, _high: Bound<&'r [u8]>, - ) -> Result> { + ) -> Result> { unimplemented!("Iterators in transactions not supported with Sled backend"); } } diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 4f79b34b..9f2bf919 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -17,7 +17,7 @@ pub use rusqlite; impl From for Error { fn from(e: rusqlite::Error) -> Error { - Error(format!("{}", e).into()) + Error(format!("Sqlite: {}", e).into()) } } @@ -235,11 +235,11 @@ impl IDb for SqliteDb { let mut db = self.db.lock().unwrap(); trace!("transaction: lock acquired"); - let tx = SqliteTx { + let mut tx = SqliteTx { tx: db.transaction()?, trees: trees.as_ref(), }; - let res = match f.try_on(&tx) { + let res = match f.try_on(&mut tx) { TxFnResult::Ok => { tx.tx.commit()?; Ok(()) @@ -278,8 +278,8 @@ impl<'a> SqliteTx<'a> { } } -impl<'a> ITx<'a> for SqliteTx<'a> { - fn get(&self, tree: usize, key: &[u8]) -> Result>> { +impl<'a> ITx for SqliteTx<'a> { + fn get(&self, tree: usize, key: &[u8]) -> Result>> { let tree = self.get_tree(tree)?; let mut stmt = self .tx @@ -300,7 +300,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> { } } - fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { let tree = self.get_tree(tree)?; self.tx.execute( &format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree), @@ -308,7 +308,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> { )?; Ok(()) } - fn remove(&self, tree: usize, key: &[u8]) -> Result { + fn remove(&mut self, tree: usize, key: &[u8]) -> Result { let tree = self.get_tree(tree)?; let res = self .tx @@ -316,10 +316,10 @@ impl<'a> ITx<'a> for SqliteTx<'a> { Ok(res > 0) } - fn iter(&self, _tree: usize) -> Result> { + fn iter(&self, _tree: usize) -> Result> { unimplemented!(); } - fn iter_rev(&self, _tree: usize) -> Result> { + fn iter_rev(&self, _tree: usize) -> Result> { unimplemented!(); } @@ -328,7 +328,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> { _tree: usize, _low: Bound<&'r [u8]>, _high: Bound<&'r [u8]>, - ) -> Result> { + ) -> Result> { unimplemented!(); } fn range_rev<'r>( @@ -336,7 +336,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> { _tree: usize, _low: Bound<&'r [u8]>, _high: Bound<&'r [u8]>, - ) -> Result> { + ) -> Result> { unimplemented!(); } } diff --git a/src/db/test.rs b/src/db/test.rs index 75200cf1..e5b83ab5 100644 --- a/src/db/test.rs +++ b/src/db/test.rs @@ -1,5 +1,6 @@ use crate::*; +use crate::lmdb_adapter::LmdbDb; use crate::sled_adapter::SledDb; use crate::sqlite_adapter::SqliteDb; @@ -16,7 +17,7 @@ fn test_suite(db: Db) { tree.insert(ka, va).unwrap(); assert_eq!(tree.get(ka).unwrap().unwrap(), va); - let res = db.transaction::<_, (), _>(|tx| { + let res = db.transaction::<_, (), _>(|mut tx| { assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va); tx.insert(&tree, ka, vb).unwrap(); @@ -28,7 +29,7 @@ fn test_suite(db: Db) { assert!(matches!(res, Ok(12))); assert_eq!(tree.get(ka).unwrap().unwrap(), vb); - let res = db.transaction::<(), _, _>(|tx| { + let res = db.transaction::<(), _, _>(|mut tx| { assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb); tx.insert(&tree, ka, vc).unwrap(); @@ -78,6 +79,18 @@ fn test_suite(db: Db) { drop(iter); } +#[test] +fn test_lmdb_db() { + let path = mktemp::Temp::new_dir().unwrap(); + let db = lmdb::Environment::new() + .set_max_dbs(100) + .open(&path) + .unwrap(); + let db = LmdbDb::init(db); + test_suite(db); + drop(path); +} + #[test] fn test_sled_db() { let path = mktemp::Temp::new_dir().unwrap(); diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 9e343e5f..d8c1229a 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -179,7 +179,7 @@ impl IndexCounter { pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { let tree_key = self.table.data.tree_key(pk, sk); - let new_entry = self.local_counter.db().transaction(|tx| { + let new_entry = self.local_counter.db().transaction(|mut tx| { let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { Some(old_bytes) => rmp_serde::decode::from_slice::(&old_bytes) .map_err(Error::RmpDecode) diff --git a/src/table/data.rs b/src/table/data.rs index 17402bb6..cca96f68 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -182,7 +182,7 @@ where tree_key: &[u8], f: impl Fn(Option) -> F::E, ) -> Result, Error> { - let changed = self.store.db().transaction(|tx| { + let changed = self.store.db().transaction(|mut tx| { let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { Some(old_bytes) => { let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; @@ -203,6 +203,7 @@ where .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); + drop(old_bytes); if value_changed || encoding_changed { let new_bytes_hash = blake2sum(&new_bytes[..]); @@ -241,15 +242,16 @@ where } pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { - let removed = self.store.db().transaction(|tx| { - if let Some(cur_v) = tx.get(&self.store, k)? { - if cur_v == v { - tx.remove(&self.store, k)?; - tx.insert(&self.merkle_todo, k, vec![])?; - return Ok(true); - } + let removed = self.store.db().transaction(|mut tx| { + let remove = match tx.get(&self.store, k)? { + Some(cur_v) if cur_v == v => true, + _ => false, + }; + if remove { + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; } - Ok(false) + Ok(remove) })?; if removed { @@ -267,15 +269,16 @@ where k: &[u8], vhash: Hash, ) -> Result { - let removed = self.store.db().transaction(|tx| { - if let Some(cur_v) = tx.get(&self.store, k)? { - if blake2sum(&cur_v[..]) == vhash { - tx.remove(&self.store, k)?; - tx.insert(&self.merkle_todo, k, vec![])?; - return Ok(Some(cur_v.into_vec())); - } + let removed = self.store.db().transaction(|mut tx| { + let remove_v = match tx.get(&self.store, k)? { + Some(cur_v) if blake2sum(&cur_v[..]) == vhash => Some(cur_v.into_vec()), + _ => None, + }; + if remove_v.is_some() { + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; } - Ok(None) + Ok(remove_v) })?; if let Some(old_v) = removed { diff --git a/src/table/gc.rs b/src/table/gc.rs index 260fecfa..e2611389 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -376,13 +376,13 @@ impl GcTodoEntry { /// what we have to do is still the same pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> { let key = self.todo_table_key(); - gc_todo_tree.db().transaction(|tx| { - let old_val = tx.get(gc_todo_tree, &key)?; - match old_val { - Some(ov) if ov == self.value_hash.as_slice() => { - tx.remove(gc_todo_tree, &key)?; - } - _ => (), + gc_todo_tree.db().transaction(|mut tx| { + let remove = match tx.get(gc_todo_tree, &key)? { + Some(ov) if ov == self.value_hash.as_slice() => true, + _ => false, + }; + if remove { + tx.remove(gc_todo_tree, &key)?; } tx.commit(()) })?; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 8c574d09..92e1445b 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -137,17 +137,17 @@ where self.data .merkle_tree .db() - .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; + .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?; - let deleted = self.data.merkle_todo.db().transaction(|tx| { - let old_val = tx.get(&self.data.merkle_todo, k)?; - match old_val { - Some(ov) if ov == vhash_by => { - tx.remove(&self.data.merkle_todo, k)?; - tx.commit(true) - } - _ => tx.commit(false), + let deleted = self.data.merkle_todo.db().transaction(|mut tx| { + let remove = match tx.get(&self.data.merkle_todo, k)? { + Some(ov) if ov == vhash_by => true, + _ => false, + }; + if remove { + tx.remove(&self.data.merkle_todo, k)?; } + Ok(remove) })?; if !deleted { @@ -162,7 +162,7 @@ where fn update_item_rec( &self, - tx: db::Transaction<'_>, + tx: &mut db::Transaction<'_>, k: &[u8], khash: &Hash, key: &MerkleNodeKey, @@ -288,7 +288,7 @@ where fn read_node_txn( &self, - tx: db::Transaction<'_>, + tx: &mut db::Transaction<'_>, k: &MerkleNodeKey, ) -> db::TxResult { let ent = tx.get(&self.data.merkle_tree, k.encode())?; @@ -297,7 +297,7 @@ where fn put_node_txn( &self, - tx: db::Transaction<'_>, + tx: &mut db::Transaction<'_>, k: &MerkleNodeKey, v: &MerkleNode, ) -> db::TxResult {