diff --git a/Cargo.lock b/Cargo.lock index 630642ff..1f81ebec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -987,6 +987,16 @@ dependencies = [ "zstd", ] +[[package]] +name = "garage_db" +version = "0.8.0" +dependencies = [ + "arc-swap", + "err-derive 0.3.1", + "mktemp", + "sled", +] + [[package]] name = "garage_model" version = "0.5.1" @@ -1130,6 +1140,7 @@ dependencies = [ "bytes 1.1.0", "futures", "futures-util", + "garage_db", "garage_rpc 0.7.0", "garage_util 0.7.0", "hexdump", @@ -1855,6 +1866,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "mktemp" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "975de676448231fcde04b9149d2543077e166b78fc29eae5aa219e7928410da2" +dependencies = [ + "uuid", +] + [[package]] name = "multer" version = "2.0.2" @@ -3489,6 +3509,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +dependencies = [ + "getrandom", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index edd0e3f9..122285db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ + "src/db", "src/util", "src/rpc", "src/table", diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml new file mode 100644 index 00000000..025016d5 --- /dev/null +++ b/src/db/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "garage_db" +version = "0.8.0" +authors = ["Alex Auvolat "] +edition = "2018" +license = "AGPL-3.0" +description = "Abstraction over multiple key/value storage engines that supports transactions" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +err-derive = "0.3" +arc-swap = "1.0" + +sled = "0.34" + +[dev-dependencies] +mktemp = "0.4" diff --git a/src/db/lib.rs b/src/db/lib.rs new file mode 100644 index 00000000..0f23a9b4 --- /dev/null +++ b/src/db/lib.rs @@ -0,0 +1,174 @@ +pub mod sled_adapter; + +#[cfg(test)] +pub mod test; + +use std::borrow::Cow; +use std::sync::Arc; + +use arc_swap::ArcSwapOption; + +#[derive(Clone)] +pub struct Db(pub(crate) Arc); + +#[derive(Clone)] +pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>); + +#[derive(Clone)] +pub struct Tree(pub(crate) Arc, pub(crate) usize); + +pub type Value<'a> = Cow<'a, [u8]>; + +// ---- + +#[derive(Debug)] +pub struct Error(Cow<'static, str>); + +pub type Result = std::result::Result; + +#[derive(Debug)] +pub enum TxError { + Abort(E), + Db(Error), +} +pub type TxResult = std::result::Result>; + +impl From for TxError { + fn from(e: Error) -> TxError { + TxError::Db(e) + } +} + +// ---- + +impl Db { + pub fn tree>(&self, name: S) -> Result { + let tree_id = self.0.tree(name.as_ref())?; + Ok(Tree(self.0.clone(), tree_id)) + } + + pub fn transaction(&self, fun: F) -> TxResult + where + F: Fn(Transaction<'_>) -> TxResult + Send + Sync, + R: Send + Sync, + E: Send + Sync, + { + let f = TxFn { + function: fun, + result: ArcSwapOption::new(None), + }; + match self.0.transaction(&f) { + Err(TxError::Db(e)) => Err(TxError::Db(e)), + Err(TxError::Abort(())) => { + let r_arc = f + .result + .into_inner() + .expect("Transaction did not store result"); + let r = Arc::try_unwrap(r_arc).ok().expect("Many refs"); + assert!(matches!(r, Err(TxError::Abort(_)))); + r + } + Ok(()) => { + let r_arc = f + .result + .into_inner() + .expect("Transaction did not store result"); + let r = Arc::try_unwrap(r_arc).ok().expect("Many refs"); + assert!(matches!(r, Ok(_))); + r + } + } + } +} + +impl Tree { + pub fn get<'a, T: AsRef<[u8]>>(&'a self, key: T) -> Result>> { + self.0.get(self.1, key.as_ref()) + } + + pub fn put, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> { + self.0.put(self.1, key.as_ref(), value.as_ref()) + } +} + +impl<'a> Transaction<'a> { + pub fn get>(&self, tree: &Tree, key: T) -> Result>> { + self.0.get(tree.1, key.as_ref()) + } + + pub fn put, U: AsRef<[u8]>>(&self, tree: &Tree, key: T, value: U) -> Result<()> { + self.0.put(tree.1, key.as_ref(), value.as_ref()) + } + + #[must_use] + pub fn abort(self, e: E) -> TxResult + where + R: Send + Sync, + E: Send + Sync, + { + Err(TxError::Abort(e)) + } + + #[must_use] + pub fn commit(self, r: R) -> TxResult + where + R: Send + Sync, + E: Send + Sync, + { + Ok(r) + } +} + +// ---- Internal interfaces + +pub(crate) trait IDb: Send + Sync { + fn tree(&self, name: &str) -> Result; + + fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result>>; + fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; + + fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>; +} + +pub(crate) trait ITx<'a> { + fn get(&self, tree: usize, key: &[u8]) -> Result>>; + fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; +} + +pub(crate) trait ITxFn: Send + Sync { + fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult; +} + +enum TxFnResult { + Abort, + Ok, + Err, +} + +struct TxFn +where + F: Fn(Transaction<'_>) -> TxResult + Send + Sync, + R: Send + Sync, + E: Send + Sync, +{ + function: F, + result: ArcSwapOption>, +} + +impl ITxFn for TxFn +where + F: Fn(Transaction<'_>) -> TxResult + Send + Sync, + R: Send + Sync, + E: Send + Sync, +{ + fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult { + let res = (self.function)(Transaction(tx)); + let retval = match &res { + Ok(_) => TxFnResult::Ok, + Err(TxError::Abort(_)) => TxFnResult::Abort, + Err(TxError::Db(_)) => TxFnResult::Err, + }; + self.result.store(Some(Arc::new(res))); + retval + } +} diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs new file mode 100644 index 00000000..617b4844 --- /dev/null +++ b/src/db/sled_adapter.rs @@ -0,0 +1,132 @@ +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use arc_swap::ArcSwapOption; + +use sled::transaction::{ + ConflictableTransactionError, TransactionError, Transactional, TransactionalTree, UnabortableTransactionError +}; + +use crate::{Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, Db}; + +impl From for Error { + fn from(e: sled::Error) -> Error { + Error(format!("{}", e).into()) + } +} + +pub struct SledDb { + db: sled::Db, + trees: RwLock<(Vec, HashMap)>, +} + +impl SledDb { + pub fn new(db: sled::Db) -> Db { + let s = Self { + db, + trees: RwLock::new((Vec::new(), HashMap::new())), + }; + Db(Arc::new(s)) + } + + fn get_tree(&self, i: usize) -> Result { + self.trees + .read() + .unwrap() + .0 + .get(i) + .cloned() + .ok_or(Error("invalid tree id".into())) + } +} + +impl IDb for SledDb { + fn tree(&self, name: &str) -> Result { + let mut trees = self.trees.write().unwrap(); + if let Some(i) = trees.1.get(name) { + Ok(*i) + } else { + let tree = self.db.open_tree(name)?; + let i = trees.0.len(); + trees.0.push(tree); + trees.1.insert(name.to_string(), i); + Ok(i) + } + } + + fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result>> { + let tree = self.get_tree(tree)?; + Ok(tree.get(key)?.map(|v| v.to_vec().into())) + } + fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + let tree = self.get_tree(tree)?; + tree.insert(key, value)?; + Ok(()) + } + + fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { + let trees = self.trees.read().unwrap(); + let res = trees.0.transaction(|txtrees| { + let tx = SledTx { + trees: txtrees, + err: ArcSwapOption::new(None), + }; + match f.try_on(&tx) { + TxFnResult::Ok => { + assert!(tx.err.into_inner().is_none()); + Ok(()) + } + TxFnResult::Abort => Err(ConflictableTransactionError::Abort(())), + TxFnResult::Err => { + let err_arc = tx + .err + .into_inner() + .expect("Transaction did not store error"); + let err = Arc::try_unwrap(err_arc).ok().expect("Many refs"); + Err(err.into()) + } + } + }); + match res { + Ok(()) => Ok(()), + Err(TransactionError::Abort(())) => Err(TxError::Abort(())), + Err(TransactionError::Storage(s)) => Err(TxError::Db(s.into())), + } + } +} + +// ---- + +struct SledTx<'a> { + trees: &'a [TransactionalTree], + err: ArcSwapOption, +} + +impl<'a> SledTx<'a> { + fn save_error(&self, v: std::result::Result) -> Result { + match v { + Ok(x) => Ok(x), + Err(e) => { + let txt = format!("{}", e); + self.err.store(Some(Arc::new(e))); + Err(Error(txt.into())) + } + } + } +} + +impl<'a> ITx<'a> for SledTx<'a> { + fn get(&self, tree: usize, key: &[u8]) -> Result>> { + let tree = self.trees.get(tree) + .ok_or(Error("invalid tree id".into()))?; + let tmp = self.save_error(tree.get(key))?; + Ok(tmp.map(|v| v.to_vec().into())) + } + + fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { + let tree = self.trees.get(tree) + .ok_or(Error("invalid tree id".into()))?; + self.save_error(tree.insert(key, value))?; + Ok(()) + } +} diff --git a/src/db/test.rs b/src/db/test.rs new file mode 100644 index 00000000..f0e6c5de --- /dev/null +++ b/src/db/test.rs @@ -0,0 +1,49 @@ +use crate::*; + +use crate::sled_adapter::SledDb; + +fn test_suite(db: Db) -> Result<()> { + let tree = db.tree("tree")?; + + let va: &[u8] = &b"plop"[..]; + let vb: &[u8] = &b"plip"[..]; + let vc: &[u8] = &b"plup"[..]; + + tree.put(b"test", va)?; + assert_eq!(tree.get(b"test")?, Some(va.into())); + + let res = db.transaction::<_, (), _>(|tx| { + assert_eq!(tx.get(&tree, b"test")?, Some(va.into())); + + tx.put(&tree, b"test", vb)?; + + assert_eq!(tx.get(&tree, b"test")?, Some(vb.into())); + + tx.commit(12) + }); + assert!(matches!(res, Ok(12))); + assert_eq!(tree.get(b"test")?, Some(vb.into())); + + let res = db.transaction::<(), _, _>(|tx| { + assert_eq!(tx.get(&tree, b"test")?, Some(vb.into())); + + tx.put(&tree, b"test", vc)?; + + assert_eq!(tx.get(&tree, b"test")?, Some(vc.into())); + + tx.abort(42) + }); + assert!(matches!(res, Err(TxError::Abort(42)))); + assert_eq!(tree.get(b"test")?, Some(vb.into())); + + Ok(()) +} + +#[test] +fn test_sled_db() -> Result<()> { + let path = mktemp::Temp::new_dir().unwrap(); + let db = SledDb::new(sled::open(path.to_path_buf()).unwrap()); + test_suite(db)?; + drop(path); + Ok(()) +} diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index ed1a213f..6ae50366 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } garage_rpc = { version = "0.7.0", path = "../rpc" } garage_util = { version = "0.7.0", path = "../util" }