Abstract database behind generic interface and implement alternative drivers #322

Merged
lx merged 64 commits from db-abstraction into main 2022-06-08 08:01:56 +00:00
4 changed files with 27 additions and 43 deletions
Showing only changes of commit f29b91232f - Show all commits

1
Cargo.lock generated
View file

@ -991,7 +991,6 @@ dependencies = [
name = "garage_db" name = "garage_db"
version = "0.8.0" version = "0.8.0"
dependencies = [ dependencies = [
"arc-swap",
"err-derive 0.3.1", "err-derive 0.3.1",
"mktemp", "mktemp",
"sled", "sled",

View file

@ -15,7 +15,6 @@ path = "lib.rs"
[dependencies] [dependencies]
err-derive = "0.3" err-derive = "0.3"
arc-swap = "1.0"
sled = "0.34" sled = "0.34"

View file

@ -6,9 +6,9 @@ pub mod test;
use core::ops::{Bound, RangeBounds}; use core::ops::{Bound, RangeBounds};
use std::borrow::Cow; use std::borrow::Cow;
use std::cell::Cell;
use std::sync::Arc; use std::sync::Arc;
use arc_swap::ArcSwapOption;
use err_derive::Error; use err_derive::Error;
#[derive(Clone)] #[derive(Clone)]
@ -25,7 +25,7 @@ pub type ValueIter<'a> =
Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + Send + Sync + 'a>; Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + Send + Sync + 'a>;
pub type Exporter<'a> = pub type Exporter<'a> =
Box<dyn std::iter::Iterator<Item=Result<(String, ValueIter<'a>)>> + Send + Sync + 'a>; Box<dyn std::iter::Iterator<Item = Result<(String, ValueIter<'a>)>> + Send + Sync + 'a>;
// ---- // ----
@ -59,30 +59,26 @@ impl Db {
pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E> pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
where where
F: Fn(Transaction<'_>) -> TxResult<R, E>, F: Fn(Transaction<'_>) -> TxResult<R, E>,
R: Send + Sync,
E: Send + Sync,
{ {
let f = TxFn { let f = TxFn {
function: fun, function: fun,
result: ArcSwapOption::new(None), result: Cell::new(None),
}; };
match self.0.transaction(&f) { match self.0.transaction(&f) {
Err(TxError::Db(e)) => Err(TxError::Db(e)), Err(TxError::Db(e)) => Err(TxError::Db(e)),
Err(TxError::Abort(())) => { Err(TxError::Abort(())) => {
let r_arc = f let r = f
.result .result
.into_inner() .into_inner()
.expect("Transaction did not store result"); .expect("Transaction did not store result");
let r = Arc::try_unwrap(r_arc).ok().expect("Many refs");
assert!(matches!(r, Err(TxError::Abort(_)))); assert!(matches!(r, Err(TxError::Abort(_))));
r r
} }
Ok(()) => { Ok(()) => {
let r_arc = f let r = f
.result .result
.into_inner() .into_inner()
.expect("Transaction did not store result"); .expect("Transaction did not store result");
let r = Arc::try_unwrap(r_arc).ok().expect("Many refs");
assert!(matches!(r, Ok(_))); assert!(matches!(r, Ok(_)));
r r
} }
@ -186,18 +182,12 @@ impl<'a> Transaction<'a> {
#[must_use] #[must_use]
pub fn abort<R, E>(self, e: E) -> TxResult<R, E> pub fn abort<R, E>(self, e: E) -> TxResult<R, E>
where
R: Send + Sync,
E: Send + Sync,
{ {
Err(TxError::Abort(e)) Err(TxError::Abort(e))
} }
#[must_use] #[must_use]
pub fn commit<R, E>(self, r: R) -> TxResult<R, E> pub fn commit<R, E>(self, r: R) -> TxResult<R, E>
where
R: Send + Sync,
E: Send + Sync,
{ {
Ok(r) Ok(r)
} }
@ -270,18 +260,14 @@ enum TxFnResult {
struct TxFn<F, R, E> struct TxFn<F, R, E>
where where
F: Fn(Transaction<'_>) -> TxResult<R, E>, F: Fn(Transaction<'_>) -> TxResult<R, E>,
R: Send + Sync,
E: Send + Sync,
{ {
function: F, function: F,
result: ArcSwapOption<TxResult<R, E>>, result: Cell<Option<TxResult<R, E>>>,
} }
impl<F, R, E> ITxFn for TxFn<F, R, E> impl<F, R, E> ITxFn for TxFn<F, R, E>
where where
F: Fn(Transaction<'_>) -> TxResult<R, E>, F: Fn(Transaction<'_>) -> TxResult<R, E>,
R: Send + Sync,
E: Send + Sync,
{ {
fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult { fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult {
let res = (self.function)(Transaction(tx)); let res = (self.function)(Transaction(tx));
@ -290,7 +276,7 @@ where
Err(TxError::Abort(_)) => TxFnResult::Abort, Err(TxError::Abort(_)) => TxFnResult::Abort,
Err(TxError::Db(_)) => TxFnResult::Err, Err(TxError::Db(_)) => TxFnResult::Err,
}; };
self.result.store(Some(Arc::new(res))); self.result.set(Some(res));
retval retval
} }
} }

View file

@ -1,16 +1,17 @@
use core::ops::Bound; use core::ops::Bound;
use std::cell::Cell;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use arc_swap::ArcSwapOption;
use sled::transaction::{ use sled::transaction::{
ConflictableTransactionError, TransactionError, Transactional, TransactionalTree, ConflictableTransactionError, TransactionError, Transactional, TransactionalTree,
UnabortableTransactionError, UnabortableTransactionError,
}; };
use crate::{Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter, Exporter}; use crate::{
Db, Error, Exporter, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter,
};
pub use sled; pub use sled;
@ -47,14 +48,14 @@ impl SledDb {
pub fn export<'a>(&'a self) -> Result<Exporter<'a>> { pub fn export<'a>(&'a self) -> Result<Exporter<'a>> {
let mut trees = vec![]; let mut trees = vec![];
for name in self.db.tree_names() { for name in self.db.tree_names() {
let name = std::str::from_utf8(&name).map_err(|e| Error(format!("{}", e).into()))?.to_string(); let name = std::str::from_utf8(&name)
.map_err(|e| Error(format!("{}", e).into()))?
.to_string();
let tree = self.open_tree(&name)?; let tree = self.open_tree(&name)?;
let tree = self.trees.read().unwrap().0.get(tree).unwrap().clone(); let tree = self.trees.read().unwrap().0.get(tree).unwrap().clone();
trees.push((name, tree)); trees.push((name, tree));
} }
let trees_exporter: Exporter<'a> = Box::new(trees let trees_exporter: Exporter<'a> = Box::new(trees.into_iter().map(|(name, tree)| {
.into_iter()
.map(|(name, tree)| {
let iter: ValueIter<'a> = Box::new(tree.iter().map(|v| { let iter: ValueIter<'a> = Box::new(tree.iter().map(|v| {
v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into())) v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into()))
.map_err(Into::into) .map_err(Into::into)
@ -172,7 +173,7 @@ impl IDb for SledDb {
let res = trees.0.transaction(|txtrees| { let res = trees.0.transaction(|txtrees| {
let tx = SledTx { let tx = SledTx {
trees: txtrees, trees: txtrees,
err: ArcSwapOption::new(None), err: Cell::new(None),
}; };
match f.try_on(&tx) { match f.try_on(&tx) {
TxFnResult::Ok => { TxFnResult::Ok => {
@ -184,11 +185,10 @@ impl IDb for SledDb {
Err(ConflictableTransactionError::Abort(())) Err(ConflictableTransactionError::Abort(()))
} }
TxFnResult::Err => { TxFnResult::Err => {
let err_arc = tx let err = tx
.err .err
.into_inner() .into_inner()
.expect("Transaction did not store error"); .expect("Transaction did not store error");
let err = Arc::try_unwrap(err_arc).ok().expect("Many refs");
Err(err.into()) Err(err.into())
} }
} }
@ -205,14 +205,14 @@ impl IDb for SledDb {
struct SledTx<'a> { struct SledTx<'a> {
trees: &'a [TransactionalTree], trees: &'a [TransactionalTree],
err: ArcSwapOption<UnabortableTransactionError>, err: Cell<Option<UnabortableTransactionError>>,
} }
impl<'a> SledTx<'a> { impl<'a> SledTx<'a> {
fn get_tree(&self, i: usize) -> Result<&TransactionalTree> { fn get_tree(&self, i: usize) -> Result<&TransactionalTree> {
self.trees self.trees.get(i).ok_or(Error(
.get(i) "invalid tree id (it might have been openned after the transaction started)".into(),
.ok_or(Error("invalid tree id (it might have been openned after the transaction started)".into())) ))
} }
fn save_error<R>(&self, v: std::result::Result<R, UnabortableTransactionError>) -> Result<R> { fn save_error<R>(&self, v: std::result::Result<R, UnabortableTransactionError>) -> Result<R> {
@ -220,7 +220,7 @@ impl<'a> SledTx<'a> {
Ok(x) => Ok(x), Ok(x) => Ok(x),
Err(e) => { Err(e) => {
let txt = format!("{}", e); let txt = format!("{}", e);
self.err.store(Some(Arc::new(e))); self.err.set(Some(e));
Err(Error(txt.into())) Err(Error(txt.into()))
} }
} }