Abstract database behind generic interface and implement alternative drivers #322
7 changed files with 409 additions and 0 deletions
29
Cargo.lock
generated
29
Cargo.lock
generated
|
@ -987,6 +987,16 @@ dependencies = [
|
||||||
"zstd",
|
"zstd",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "garage_db"
|
||||||
|
version = "0.8.0"
|
||||||
|
dependencies = [
|
||||||
|
"arc-swap",
|
||||||
|
"err-derive 0.3.1",
|
||||||
|
"mktemp",
|
||||||
|
"sled",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "garage_model"
|
name = "garage_model"
|
||||||
version = "0.5.1"
|
version = "0.5.1"
|
||||||
|
@ -1130,6 +1140,7 @@ dependencies = [
|
||||||
"bytes 1.1.0",
|
"bytes 1.1.0",
|
||||||
"futures",
|
"futures",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
"garage_db",
|
||||||
"garage_rpc 0.7.0",
|
"garage_rpc 0.7.0",
|
||||||
"garage_util 0.7.0",
|
"garage_util 0.7.0",
|
||||||
"hexdump",
|
"hexdump",
|
||||||
|
@ -1855,6 +1866,15 @@ dependencies = [
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "mktemp"
|
||||||
|
version = "0.4.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "975de676448231fcde04b9149d2543077e166b78fc29eae5aa219e7928410da2"
|
||||||
|
dependencies = [
|
||||||
|
"uuid",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "multer"
|
name = "multer"
|
||||||
version = "2.0.2"
|
version = "2.0.2"
|
||||||
|
@ -3489,6 +3509,15 @@ dependencies = [
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "uuid"
|
||||||
|
version = "0.8.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
|
||||||
|
dependencies = [
|
||||||
|
"getrandom",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "vcpkg"
|
name = "vcpkg"
|
||||||
version = "0.2.15"
|
version = "0.2.15"
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = [
|
members = [
|
||||||
|
"src/db",
|
||||||
"src/util",
|
"src/util",
|
||||||
"src/rpc",
|
"src/rpc",
|
||||||
"src/table",
|
"src/table",
|
||||||
|
|
23
src/db/Cargo.toml
Normal file
23
src/db/Cargo.toml
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
[package]
|
||||||
|
name = "garage_db"
|
||||||
|
version = "0.8.0"
|
||||||
|
authors = ["Alex Auvolat <alex@adnab.me>"]
|
||||||
|
edition = "2018"
|
||||||
|
license = "AGPL-3.0"
|
||||||
|
description = "Abstraction over multiple key/value storage engines that supports transactions"
|
||||||
|
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
|
||||||
|
readme = "../../README.md"
|
||||||
|
|
||||||
|
[lib]
|
||||||
|
path = "lib.rs"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
err-derive = "0.3"
|
||||||
|
arc-swap = "1.0"
|
||||||
|
|
||||||
|
sled = "0.34"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
mktemp = "0.4"
|
174
src/db/lib.rs
Normal file
174
src/db/lib.rs
Normal file
|
@ -0,0 +1,174 @@
|
||||||
|
pub mod sled_adapter;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub mod test;
|
||||||
|
|
||||||
|
use std::borrow::Cow;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use arc_swap::ArcSwapOption;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Db(pub(crate) Arc<dyn IDb>);
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>);
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize);
|
||||||
|
|
||||||
|
pub type Value<'a> = Cow<'a, [u8]>;
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Error(Cow<'static, str>);
|
||||||
|
|
||||||
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum TxError<E> {
|
||||||
|
Abort(E),
|
||||||
|
Db(Error),
|
||||||
|
}
|
||||||
|
pub type TxResult<R, E> = std::result::Result<R, TxError<E>>;
|
||||||
|
|
||||||
|
impl<E> From<Error> for TxError<E> {
|
||||||
|
fn from(e: Error) -> TxError<E> {
|
||||||
|
TxError::Db(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
impl Db {
|
||||||
|
pub fn tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> {
|
||||||
|
let tree_id = self.0.tree(name.as_ref())?;
|
||||||
|
Ok(Tree(self.0.clone(), tree_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
|
||||||
|
where
|
||||||
|
F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync,
|
||||||
|
R: Send + Sync,
|
||||||
|
E: Send + Sync,
|
||||||
|
{
|
||||||
|
let f = TxFn {
|
||||||
|
function: fun,
|
||||||
|
result: ArcSwapOption::new(None),
|
||||||
|
};
|
||||||
|
match self.0.transaction(&f) {
|
||||||
|
Err(TxError::Db(e)) => Err(TxError::Db(e)),
|
||||||
|
Err(TxError::Abort(())) => {
|
||||||
|
let r_arc = f
|
||||||
|
.result
|
||||||
|
.into_inner()
|
||||||
|
.expect("Transaction did not store result");
|
||||||
|
let r = Arc::try_unwrap(r_arc).ok().expect("Many refs");
|
||||||
|
assert!(matches!(r, Err(TxError::Abort(_))));
|
||||||
|
r
|
||||||
|
}
|
||||||
|
Ok(()) => {
|
||||||
|
let r_arc = f
|
||||||
|
.result
|
||||||
|
.into_inner()
|
||||||
|
.expect("Transaction did not store result");
|
||||||
|
let r = Arc::try_unwrap(r_arc).ok().expect("Many refs");
|
||||||
|
assert!(matches!(r, Ok(_)));
|
||||||
|
r
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Tree {
|
||||||
|
pub fn get<'a, T: AsRef<[u8]>>(&'a self, key: T) -> Result<Option<Value<'a>>> {
|
||||||
|
self.0.get(self.1, key.as_ref())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn put<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> {
|
||||||
|
self.0.put(self.1, key.as_ref(), value.as_ref())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Transaction<'a> {
|
||||||
|
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<Option<Value<'a>>> {
|
||||||
|
self.0.get(tree.1, key.as_ref())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn put<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, tree: &Tree, key: T, value: U) -> Result<()> {
|
||||||
|
self.0.put(tree.1, key.as_ref(), value.as_ref())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn abort<R, E>(self, e: E) -> TxResult<R, E>
|
||||||
|
where
|
||||||
|
R: Send + Sync,
|
||||||
|
E: Send + Sync,
|
||||||
|
{
|
||||||
|
Err(TxError::Abort(e))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn commit<R, E>(self, r: R) -> TxResult<R, E>
|
||||||
|
where
|
||||||
|
R: Send + Sync,
|
||||||
|
E: Send + Sync,
|
||||||
|
{
|
||||||
|
Ok(r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- Internal interfaces
|
||||||
|
|
||||||
|
pub(crate) trait IDb: Send + Sync {
|
||||||
|
fn tree(&self, name: &str) -> Result<usize>;
|
||||||
|
|
||||||
|
fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>;
|
||||||
|
fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
|
||||||
|
|
||||||
|
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) trait ITx<'a> {
|
||||||
|
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>;
|
||||||
|
fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) trait ITxFn: Send + Sync {
|
||||||
|
fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
enum TxFnResult {
|
||||||
|
Abort,
|
||||||
|
Ok,
|
||||||
|
Err,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TxFn<F, R, E>
|
||||||
|
where
|
||||||
|
F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync,
|
||||||
|
R: Send + Sync,
|
||||||
|
E: Send + Sync,
|
||||||
|
{
|
||||||
|
function: F,
|
||||||
|
result: ArcSwapOption<TxResult<R, E>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F, R, E> ITxFn for TxFn<F, R, E>
|
||||||
|
where
|
||||||
|
F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync,
|
||||||
|
R: Send + Sync,
|
||||||
|
E: Send + Sync,
|
||||||
|
{
|
||||||
|
fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult {
|
||||||
|
let res = (self.function)(Transaction(tx));
|
||||||
|
let retval = match &res {
|
||||||
|
Ok(_) => TxFnResult::Ok,
|
||||||
|
Err(TxError::Abort(_)) => TxFnResult::Abort,
|
||||||
|
Err(TxError::Db(_)) => TxFnResult::Err,
|
||||||
|
};
|
||||||
|
self.result.store(Some(Arc::new(res)));
|
||||||
|
retval
|
||||||
|
}
|
||||||
|
}
|
132
src/db/sled_adapter.rs
Normal file
132
src/db/sled_adapter.rs
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
|
use arc_swap::ArcSwapOption;
|
||||||
|
|
||||||
|
use sled::transaction::{
|
||||||
|
ConflictableTransactionError, TransactionError, Transactional, TransactionalTree, UnabortableTransactionError
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, Db};
|
||||||
|
|
||||||
|
impl From<sled::Error> for Error {
|
||||||
|
fn from(e: sled::Error) -> Error {
|
||||||
|
Error(format!("{}", e).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SledDb {
|
||||||
|
db: sled::Db,
|
||||||
|
trees: RwLock<(Vec<sled::Tree>, HashMap<String, usize>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SledDb {
|
||||||
|
pub fn new(db: sled::Db) -> Db {
|
||||||
|
let s = Self {
|
||||||
|
db,
|
||||||
|
trees: RwLock::new((Vec::new(), HashMap::new())),
|
||||||
|
};
|
||||||
|
Db(Arc::new(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_tree(&self, i: usize) -> Result<sled::Tree> {
|
||||||
|
self.trees
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.0
|
||||||
|
.get(i)
|
||||||
|
.cloned()
|
||||||
|
.ok_or(Error("invalid tree id".into()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IDb for SledDb {
|
||||||
|
fn tree(&self, name: &str) -> Result<usize> {
|
||||||
|
let mut trees = self.trees.write().unwrap();
|
||||||
|
if let Some(i) = trees.1.get(name) {
|
||||||
|
Ok(*i)
|
||||||
|
} else {
|
||||||
|
let tree = self.db.open_tree(name)?;
|
||||||
|
let i = trees.0.len();
|
||||||
|
trees.0.push(tree);
|
||||||
|
trees.1.insert(name.to_string(), i);
|
||||||
|
Ok(i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> {
|
||||||
|
let tree = self.get_tree(tree)?;
|
||||||
|
Ok(tree.get(key)?.map(|v| v.to_vec().into()))
|
||||||
|
}
|
||||||
|
fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
|
||||||
|
let tree = self.get_tree(tree)?;
|
||||||
|
tree.insert(key, value)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
|
||||||
|
let trees = self.trees.read().unwrap();
|
||||||
|
let res = trees.0.transaction(|txtrees| {
|
||||||
|
let tx = SledTx {
|
||||||
|
trees: txtrees,
|
||||||
|
err: ArcSwapOption::new(None),
|
||||||
|
};
|
||||||
|
match f.try_on(&tx) {
|
||||||
|
TxFnResult::Ok => {
|
||||||
|
assert!(tx.err.into_inner().is_none());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
TxFnResult::Abort => Err(ConflictableTransactionError::Abort(())),
|
||||||
|
TxFnResult::Err => {
|
||||||
|
let err_arc = tx
|
||||||
|
.err
|
||||||
|
.into_inner()
|
||||||
|
.expect("Transaction did not store error");
|
||||||
|
let err = Arc::try_unwrap(err_arc).ok().expect("Many refs");
|
||||||
|
Err(err.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
match res {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(TransactionError::Abort(())) => Err(TxError::Abort(())),
|
||||||
|
Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
struct SledTx<'a> {
|
||||||
|
trees: &'a [TransactionalTree],
|
||||||
|
err: ArcSwapOption<UnabortableTransactionError>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> SledTx<'a> {
|
||||||
|
fn save_error<R>(&self, v: std::result::Result<R, UnabortableTransactionError>) -> Result<R> {
|
||||||
|
match v {
|
||||||
|
Ok(x) => Ok(x),
|
||||||
|
Err(e) => {
|
||||||
|
let txt = format!("{}", e);
|
||||||
|
self.err.store(Some(Arc::new(e)));
|
||||||
|
Err(Error(txt.into()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> ITx<'a> for SledTx<'a> {
|
||||||
|
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> {
|
||||||
|
let tree = self.trees.get(tree)
|
||||||
|
.ok_or(Error("invalid tree id".into()))?;
|
||||||
|
let tmp = self.save_error(tree.get(key))?;
|
||||||
|
Ok(tmp.map(|v| v.to_vec().into()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
|
||||||
|
let tree = self.trees.get(tree)
|
||||||
|
.ok_or(Error("invalid tree id".into()))?;
|
||||||
|
self.save_error(tree.insert(key, value))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
49
src/db/test.rs
Normal file
49
src/db/test.rs
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
use crate::*;
|
||||||
|
|
||||||
|
use crate::sled_adapter::SledDb;
|
||||||
|
|
||||||
|
fn test_suite(db: Db) -> Result<()> {
|
||||||
|
let tree = db.tree("tree")?;
|
||||||
|
|
||||||
|
let va: &[u8] = &b"plop"[..];
|
||||||
|
let vb: &[u8] = &b"plip"[..];
|
||||||
|
let vc: &[u8] = &b"plup"[..];
|
||||||
|
|
||||||
|
tree.put(b"test", va)?;
|
||||||
|
assert_eq!(tree.get(b"test")?, Some(va.into()));
|
||||||
|
|
||||||
|
let res = db.transaction::<_, (), _>(|tx| {
|
||||||
|
assert_eq!(tx.get(&tree, b"test")?, Some(va.into()));
|
||||||
|
|
||||||
|
tx.put(&tree, b"test", vb)?;
|
||||||
|
|
||||||
|
assert_eq!(tx.get(&tree, b"test")?, Some(vb.into()));
|
||||||
|
|
||||||
|
tx.commit(12)
|
||||||
|
});
|
||||||
|
assert!(matches!(res, Ok(12)));
|
||||||
|
assert_eq!(tree.get(b"test")?, Some(vb.into()));
|
||||||
|
|
||||||
|
let res = db.transaction::<(), _, _>(|tx| {
|
||||||
|
assert_eq!(tx.get(&tree, b"test")?, Some(vb.into()));
|
||||||
|
|
||||||
|
tx.put(&tree, b"test", vc)?;
|
||||||
|
|
||||||
|
assert_eq!(tx.get(&tree, b"test")?, Some(vc.into()));
|
||||||
|
|
||||||
|
tx.abort(42)
|
||||||
|
});
|
||||||
|
assert!(matches!(res, Err(TxError::Abort(42))));
|
||||||
|
assert_eq!(tree.get(b"test")?, Some(vb.into()));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sled_db() -> Result<()> {
|
||||||
|
let path = mktemp::Temp::new_dir().unwrap();
|
||||||
|
let db = SledDb::new(sled::open(path.to_path_buf()).unwrap());
|
||||||
|
test_suite(db)?;
|
||||||
|
drop(path);
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -14,6 +14,7 @@ path = "lib.rs"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
garage_db = { version = "0.8.0", path = "../db" }
|
||||||
garage_rpc = { version = "0.7.0", path = "../rpc" }
|
garage_rpc = { version = "0.7.0", path = "../rpc" }
|
||||||
garage_util = { version = "0.7.0", path = "../util" }
|
garage_util = { version = "0.7.0", path = "../util" }
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue