Merge pull request 'Refactor db transactions and add on_commit for table.queue_insert' () from k2v-indices-lmdb into next

Reviewed-on: 
This commit is contained in:
Alex 2023-09-21 14:03:35 +00:00
commit 1d986bd889
14 changed files with 91 additions and 87 deletions

View file

@ -56,7 +56,7 @@ impl BlockRc {
/// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
self.rc.db().transaction(|mut tx| {
self.rc.db().transaction(|tx| {
let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
match rcval {
RcEntry::Deletable { at_time } if now > at_time => {
@ -64,7 +64,7 @@ impl BlockRc {
}
_ => (),
};
tx.commit(())
Ok(())
})?;
Ok(())
}

View file

@ -85,7 +85,7 @@ impl CountedTree {
let old_some = expected_old.is_some();
let new_some = new.is_some();
let tx_res = self.0.tree.db().transaction(|mut tx| {
let tx_res = self.0.tree.db().transaction(|tx| {
let old_val = tx.get(&self.0.tree, &key)?;
let is_same = match (&old_val, &expected_old) {
(None, None) => true,
@ -101,9 +101,9 @@ impl CountedTree {
tx.remove(&self.0.tree, &key)?;
}
}
tx.commit(())
Ok(())
} else {
tx.abort(())
Err(TxError::Abort(()))
}
});

View file

@ -22,10 +22,15 @@ use std::sync::Arc;
use err_derive::Error;
pub(crate) type OnCommit = Vec<Box<dyn FnOnce()>>;
#[derive(Clone)]
pub struct Db(pub(crate) Arc<dyn IDb>);
pub struct Transaction<'a>(&'a mut dyn ITx);
pub struct Transaction<'a> {
tx: &'a mut dyn ITx,
on_commit: OnCommit,
}
#[derive(Clone)]
pub struct Tree(Arc<dyn IDb>, usize);
@ -85,7 +90,7 @@ impl Db {
pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
where
F: Fn(Transaction<'_>) -> TxResult<R, E>,
F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
let f = TxFn {
function: fun,
@ -98,14 +103,17 @@ impl Db {
.expect("Transaction did not store result");
match tx_res {
Ok(()) => {
assert!(matches!(ret, Ok(_)));
ret
}
Err(TxError::Abort(())) => {
assert!(matches!(ret, Err(TxError::Abort(_))));
ret
Ok(on_commit) => match ret {
Ok(value) => {
on_commit.into_iter().for_each(|f| f());
Ok(value)
}
_ => unreachable!(),
},
Err(TxError::Abort(())) => match ret {
Err(TxError::Abort(e)) => Err(TxError::Abort(e)),
_ => unreachable!(),
},
Err(TxError::Db(e2)) => match ret {
// Ok was stored -> the error occured when finalizing
// transaction
@ -139,7 +147,7 @@ impl Db {
let ex_tree = other.open_tree(&name)?;
let tx_res = self.transaction(|mut tx| {
let tx_res = self.transaction(|tx| {
let mut i = 0;
for item in ex_tree.iter().map_err(TxError::Abort)? {
let (k, v) = item.map_err(TxError::Abort)?;
@ -149,7 +157,7 @@ impl Db {
println!("{}: imported {}", name, i);
}
}
tx.commit(i)
Ok(i)
});
let total = match tx_res {
Err(TxError::Db(e)) => return Err(e),
@ -249,11 +257,11 @@ impl Tree {
impl<'a> Transaction<'a> {
#[inline]
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
self.0.get(tree.1, key.as_ref())
self.tx.get(tree.1, key.as_ref())
}
#[inline]
pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
self.0.len(tree.1)
self.tx.len(tree.1)
}
/// Returns the old value if there was one
@ -264,21 +272,21 @@ impl<'a> Transaction<'a> {
key: T,
value: U,
) -> TxOpResult<Option<Value>> {
self.0.insert(tree.1, key.as_ref(), value.as_ref())
self.tx.insert(tree.1, key.as_ref(), value.as_ref())
}
/// Returns the old value if there was one
#[inline]
pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
self.0.remove(tree.1, key.as_ref())
self.tx.remove(tree.1, key.as_ref())
}
#[inline]
pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
self.0.iter(tree.1)
self.tx.iter(tree.1)
}
#[inline]
pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
self.0.iter_rev(tree.1)
self.tx.iter_rev(tree.1)
}
#[inline]
@ -289,7 +297,7 @@ impl<'a> Transaction<'a> {
{
let sb = range.start_bound();
let eb = range.end_bound();
self.0.range(tree.1, get_bound(sb), get_bound(eb))
self.tx.range(tree.1, get_bound(sb), get_bound(eb))
}
#[inline]
pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
@ -299,19 +307,12 @@ impl<'a> Transaction<'a> {
{
let sb = range.start_bound();
let eb = range.end_bound();
self.0.range_rev(tree.1, get_bound(sb), get_bound(eb))
}
// ----
#[inline]
pub fn abort<R, E>(self, e: E) -> TxResult<R, E> {
Err(TxError::Abort(e))
self.tx.range_rev(tree.1, get_bound(sb), get_bound(eb))
}
#[inline]
pub fn commit<R, E>(self, r: R) -> TxResult<R, E> {
Ok(r)
pub fn on_commit<F: FnOnce() + 'static>(&mut self, f: F) {
self.on_commit.push(Box::new(f));
}
}
@ -348,7 +349,7 @@ pub(crate) trait IDb: Send + Sync {
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>>;
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()>;
}
pub(crate) trait ITx {
@ -380,14 +381,14 @@ pub(crate) trait ITxFn {
}
pub(crate) enum TxFnResult {
Ok,
Ok(OnCommit),
Abort,
DbErr,
}
struct TxFn<F, R, E>
where
F: Fn(Transaction<'_>) -> TxResult<R, E>,
F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
function: F,
result: Cell<Option<TxResult<R, E>>>,
@ -395,12 +396,16 @@ where
impl<F, R, E> ITxFn for TxFn<F, R, E>
where
F: Fn(Transaction<'_>) -> TxResult<R, E>,
F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
let res = (self.function)(Transaction(tx));
let mut tx = Transaction {
tx,
on_commit: vec![],
};
let res = (self.function)(&mut tx);
let res2 = match &res {
Ok(_) => TxFnResult::Ok,
Ok(_) => TxFnResult::Ok(tx.on_commit),
Err(TxError::Abort(_)) => TxFnResult::Abort,
Err(TxError::Db(_)) => TxFnResult::DbErr,
};

View file

@ -9,8 +9,8 @@ use heed::types::ByteSlice;
use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database};
use crate::{
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
TxValueIter, Value, ValueIter,
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
TxResult, TxValueIter, Value, ValueIter,
};
pub use heed;
@ -186,7 +186,7 @@ impl IDb for LmdbDb {
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
let trees = self.trees.read().unwrap();
let mut tx = LmdbTx {
trees: &trees.0[..],
@ -199,9 +199,9 @@ impl IDb for LmdbDb {
let res = f.try_on(&mut tx);
match res {
TxFnResult::Ok => {
TxFnResult::Ok(on_commit) => {
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
Ok(())
Ok(on_commit)
}
TxFnResult::Abort => {
tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?;

View file

@ -10,8 +10,8 @@ use sled::transaction::{
};
use crate::{
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
TxValueIter, Value, ValueIter,
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
TxResult, TxValueIter, Value, ValueIter,
};
pub use sled;
@ -166,7 +166,7 @@ impl IDb for SledDb {
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
let trees = self.trees.read().unwrap();
let res = trees.0.transaction(|txtrees| {
let mut tx = SledTx {
@ -174,9 +174,9 @@ impl IDb for SledDb {
err: Cell::new(None),
};
match f.try_on(&mut tx) {
TxFnResult::Ok => {
TxFnResult::Ok(on_commit) => {
assert!(tx.err.into_inner().is_none());
Ok(())
Ok(on_commit)
}
TxFnResult::Abort => {
assert!(tx.err.into_inner().is_none());
@ -189,7 +189,7 @@ impl IDb for SledDb {
}
});
match res {
Ok(()) => Ok(()),
Ok(on_commit) => Ok(on_commit),
Err(TransactionError::Abort(())) => Err(TxError::Abort(())),
Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())),
}

View file

@ -9,8 +9,8 @@ use std::sync::{Arc, Mutex, MutexGuard};
use rusqlite::{params, Connection, Rows, Statement, Transaction};
use crate::{
Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult,
TxValueIter, Value, ValueIter,
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
TxResult, TxValueIter, Value, ValueIter,
};
pub use rusqlite;
@ -261,7 +261,7 @@ impl IDb for SqliteDb {
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
trace!("transaction: lock db");
let mut this = self.0.lock().unwrap();
trace!("transaction: lock acquired");
@ -277,9 +277,9 @@ impl IDb for SqliteDb {
trees: &this_mut_ref.trees,
};
let res = match f.try_on(&mut tx) {
TxFnResult::Ok => {
TxFnResult::Ok(on_commit) => {
tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?;
Ok(())
Ok(on_commit)
}
TxFnResult::Abort => {
tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?;

View file

@ -13,26 +13,26 @@ fn test_suite(db: Db) {
assert!(tree.insert(ka, va).unwrap().is_none());
assert_eq!(tree.get(ka).unwrap().unwrap(), va);
let res = db.transaction::<_, (), _>(|mut tx| {
let res = db.transaction::<_, (), _>(|tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
assert_eq!(tx.insert(&tree, ka, vb).unwrap().unwrap(), va);
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
tx.commit(12)
Ok(12)
});
assert!(matches!(res, Ok(12)));
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
let res = db.transaction::<(), _, _>(|mut tx| {
let res = db.transaction::<(), _, _>(|tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vb);
assert_eq!(tx.insert(&tree, ka, vc).unwrap().unwrap(), vb);
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), vc);
tx.abort(42)
Err(TxError::Abort(42))
});
assert!(matches!(res, Err(TxError::Abort(42))));
assert_eq!(tree.get(ka).unwrap().unwrap(), vb);

View file

@ -52,7 +52,7 @@ impl Instance {
r#"
metadata_dir = "{path}/meta"
data_dir = "{path}/data"
db_engine = "sled"
db_engine = "lmdb"
replication_mode = "1"

View file

@ -44,6 +44,7 @@ async fn test_items_and_indices() {
let content = format!("{}: hello world", sk).into_bytes();
let content2 = format!("{}: hello universe", sk).into_bytes();
let content3 = format!("{}: concurrent value", sk).into_bytes();
eprintln!("test iteration {}: {}", i, sk);
// Put initially, no causality token
let res = ctx
@ -89,7 +90,7 @@ async fn test_items_and_indices() {
assert_eq!(res_body, content);
// ReadIndex -- now there should be some stuff
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx
.k2v
.request
@ -158,7 +159,7 @@ async fn test_items_and_indices() {
assert_eq!(res_body, content2);
// ReadIndex -- now there should be some stuff
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx
.k2v
.request
@ -230,7 +231,7 @@ async fn test_items_and_indices() {
);
// ReadIndex -- now there should be some stuff
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx
.k2v
.request
@ -299,7 +300,7 @@ async fn test_items_and_indices() {
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// ReadIndex -- now there should be some stuff
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx
.k2v
.request

View file

@ -294,7 +294,7 @@ impl<T: CountedItem> IndexCounter<T> {
let counter_entry = local_counter.into_counter_entry(self.this_node);
self.local_counter
.db()
.transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
.transaction(|tx| self.table.queue_insert(tx, &counter_entry))?;
next_start = Some(local_counter_k);
}
@ -360,7 +360,7 @@ impl<T: CountedItem> IndexCounter<T> {
let counter_entry = local_counter.into_counter_entry(self.this_node);
self.local_counter
.db()
.transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
.transaction(|tx| self.table.queue_insert(tx, &counter_entry))?;
next_start = Some(counted_entry_k);
}

View file

@ -330,9 +330,7 @@ async fn process_object(
"Lifecycle: expiring 1 object in bucket {:?}",
object.bucket_id
);
db.transaction(|mut tx| {
garage.object_table.queue_insert(&mut tx, &deleted_object)
})?;
db.transaction(|tx| garage.object_table.queue_insert(tx, &deleted_object))?;
*objects_expired += 1;
}
}
@ -365,9 +363,7 @@ async fn process_object(
);
let aborted_object =
Object::new(object.bucket_id, object.key.clone(), aborted_versions);
db.transaction(|mut tx| {
garage.object_table.queue_insert(&mut tx, &aborted_object)
})?;
db.transaction(|tx| garage.object_table.queue_insert(tx, &aborted_object))?;
*mpu_aborted += n_aborted;
}
}

View file

@ -34,7 +34,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_todo_notify: Notify,
pub(crate) insert_queue: db::Tree,
pub(crate) insert_queue_notify: Notify,
pub(crate) insert_queue_notify: Arc<Notify>,
pub(crate) gc_todo: CountedTree,
@ -80,7 +80,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
merkle_todo,
merkle_todo_notify: Notify::new(),
insert_queue,
insert_queue_notify: Notify::new(),
insert_queue_notify: Arc::new(Notify::new()),
gc_todo,
metrics,
})
@ -203,14 +203,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
) -> Result<Option<F::E>, Error> {
let tree_key = self.tree_key(partition_key, sort_key);
let changed = self.store.db().transaction(|mut tx| {
let changed = self.store.db().transaction(|tx| {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
let new_entry = update_fn(tx, Some(old_entry.clone()))?;
(Some(old_entry), Some(old_bytes), new_entry)
}
None => (None, None, update_fn(&mut tx, None)?),
None => (None, None, update_fn(tx, None)?),
};
// Changed can be true in two scenarios
@ -233,7 +233,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
tx.insert(&self.store, &tree_key, new_bytes)?;
self.instance
.updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
.updated(tx, old_entry.as_ref(), Some(&new_entry))?;
Ok(Some((new_entry, new_bytes_hash)))
} else {
@ -270,14 +270,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let removed = self
.store
.db()
.transaction(|mut tx| match tx.get(&self.store, k)? {
.transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if cur_v == v => {
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
self.instance.updated(&mut tx, Some(&old_entry), None)?;
self.instance.updated(tx, Some(&old_entry), None)?;
Ok(true)
}
_ => Ok(false),
@ -298,14 +298,14 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let removed = self
.store
.db()
.transaction(|mut tx| match tx.get(&self.store, k)? {
.transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
self.instance.updated(&mut tx, Some(&old_entry), None)?;
self.instance.updated(tx, Some(&old_entry), None)?;
Ok(true)
}
_ => Ok(false),
@ -339,7 +339,9 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
tx.insert(&self.insert_queue, &tree_key, new_entry)?;
self.insert_queue_notify.notify_one();
let notif = self.insert_queue_notify.clone();
tx.on_commit(move || notif.notify_one());
Ok(())
}

View file

@ -108,9 +108,9 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
self.data
.merkle_tree
.db()
.transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?;
.transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
let deleted = self.data.merkle_todo.db().transaction(|mut tx| {
let deleted = self.data.merkle_todo.db().transaction(|tx| {
let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
if remove {
tx.remove(&self.data.merkle_todo, k)?;

View file

@ -53,7 +53,7 @@ impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
self.0.insert_many(values).await?;
self.0.data.insert_queue.db().transaction(|mut tx| {
self.0.data.insert_queue.db().transaction(|tx| {
for (k, v) in kv_pairs.iter() {
if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? {
if &v2 == v {