Abstract database behind generic interface and implement alternative drivers #322

Merged
lx merged 64 commits from db-abstraction into main 2022-06-08 08:01:56 +00:00
26 changed files with 355 additions and 214 deletions
Showing only changes of commit 9f0f5b2e37 - Show all commits

8
Cargo.lock generated
View file

@ -889,6 +889,7 @@ dependencies = [
"futures", "futures",
"futures-util", "futures-util",
"garage_api", "garage_api",
"garage_db",
"garage_model 0.7.0", "garage_model 0.7.0",
"garage_rpc 0.7.0", "garage_rpc 0.7.0",
"garage_table 0.7.0", "garage_table 0.7.0",
@ -972,6 +973,7 @@ dependencies = [
"bytes 1.1.0", "bytes 1.1.0",
"futures", "futures",
"futures-util", "futures-util",
"garage_db",
"garage_rpc 0.7.0", "garage_rpc 0.7.0",
"garage_table 0.7.0", "garage_table 0.7.0",
"garage_util 0.7.0", "garage_util 0.7.0",
@ -981,7 +983,6 @@ dependencies = [
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
"sled",
"tokio", "tokio",
"tracing", "tracing",
"zstd", "zstd",
@ -1034,6 +1035,7 @@ dependencies = [
"futures", "futures",
"futures-util", "futures-util",
"garage_block", "garage_block",
"garage_db",
"garage_model 0.5.1", "garage_model 0.5.1",
"garage_rpc 0.7.0", "garage_rpc 0.7.0",
"garage_table 0.7.0", "garage_table 0.7.0",
@ -1045,7 +1047,6 @@ dependencies = [
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
"sled",
"tokio", "tokio",
"tracing", "tracing",
"zstd", "zstd",
@ -1149,7 +1150,6 @@ dependencies = [
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
"sled",
"tokio", "tokio",
"tracing", "tracing",
] ]
@ -1188,6 +1188,7 @@ dependencies = [
"chrono", "chrono",
"err-derive 0.3.1", "err-derive 0.3.1",
"futures", "futures",
"garage_db",
"hex", "hex",
"http", "http",
"hyper", "hyper",
@ -1198,7 +1199,6 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"sha2", "sha2",
"sled",
"tokio", "tokio",
"toml", "toml",
"tracing", "tracing",

View file

@ -14,6 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
garage_db = { version = "0.8.0", path = "../db" }
garage_rpc = { version = "0.7.0", path = "../rpc" } garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_util = { version = "0.7.0", path = "../util" } garage_util = { version = "0.7.0", path = "../util" }
garage_table = { version = "0.7.0", path = "../table" } garage_table = { version = "0.7.0", path = "../table" }
@ -27,8 +28,6 @@ tracing = "0.1.30"
rand = "0.8" rand = "0.8"
zstd = { version = "0.9", default-features = false } zstd = { version = "0.9", default-features = false }
sled = "0.34"
rmp-serde = "0.15" rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11" serde_bytes = "0.11"

View file

@ -17,10 +17,11 @@ use opentelemetry::{
Context, KeyValue, Context, KeyValue,
}; };
use garage_db as db;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::metrics::RecordDuration; use garage_util::metrics::RecordDuration;
use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*; use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
@ -91,9 +92,9 @@ pub struct BlockManager {
rc: BlockRc, rc: BlockRc,
resync_queue: SledCountedTree, resync_queue: db::Tree,
resync_notify: Notify, resync_notify: Notify,
resync_errors: SledCountedTree, resync_errors: db::Tree,
system: Arc<System>, system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>, endpoint: Arc<Endpoint<BlockRpc, Self>>,
@ -108,7 +109,7 @@ struct BlockManagerLocked();
impl BlockManager { impl BlockManager {
pub fn new( pub fn new(
db: &sled::Db, db: &db::Db,
data_dir: PathBuf, data_dir: PathBuf,
compression_level: Option<i32>, compression_level: Option<i32>,
background_tranquility: u32, background_tranquility: u32,
@ -123,12 +124,10 @@ impl BlockManager {
let resync_queue = db let resync_queue = db
.open_tree("block_local_resync_queue") .open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree"); .expect("Unable to open block_local_resync_queue tree");
let resync_queue = SledCountedTree::new(resync_queue);
let resync_errors = db let resync_errors = db
.open_tree("block_local_resync_errors") .open_tree("block_local_resync_errors")
.expect("Unable to open block_local_resync_errors tree"); .expect("Unable to open block_local_resync_errors tree");
let resync_errors = SledCountedTree::new(resync_errors);
let endpoint = system let endpoint = system
.netapp .netapp
@ -219,7 +218,7 @@ impl BlockManager {
/// to fix any mismatch between the two. /// to fix any mismatch between the two.
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// 1. Repair blocks from RC table. // 1. Repair blocks from RC table.
for (i, entry) in self.rc.rc.iter().enumerate() { for (i, entry) in self.rc.rc.iter()?.enumerate() {
let (hash, _) = entry?; let (hash, _) = entry?;
let hash = Hash::try_from(&hash[..]).unwrap(); let hash = Hash::try_from(&hash[..]).unwrap();
self.put_to_resync(&hash, Duration::from_secs(0))?; self.put_to_resync(&hash, Duration::from_secs(0))?;
@ -265,17 +264,17 @@ impl BlockManager {
/// Get lenght of resync queue /// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize { pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len() self.resync_queue.len().unwrap() // TODO fix unwrap
} }
/// Get number of blocks that have an error /// Get number of blocks that have an error
pub fn resync_errors_len(&self) -> usize { pub fn resync_errors_len(&self) -> usize {
self.resync_errors.len() self.resync_errors.len().unwrap() // TODO fix unwrap
} }
/// Get number of items in the refcount table /// Get number of items in the refcount table
pub fn rc_len(&self) -> usize { pub fn rc_len(&self) -> usize {
self.rc.rc.len() self.rc.rc.len().unwrap() // TODO fix unwrap
} }
//// ----- Managing the reference counter ---- //// ----- Managing the reference counter ----
@ -503,12 +502,12 @@ impl BlockManager {
}); });
} }
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), sled::Error> { fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), db::Error> {
let when = now_msec() + delay.as_millis() as u64; let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when) self.put_to_resync_at(hash, when)
} }
fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), sled::Error> { fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), db::Error> {
trace!("Put resync_queue: {} {:?}", when, hash); trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec(); let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref()); key.extend(hash.as_ref());
@ -547,11 +546,8 @@ impl BlockManager {
// - Ok(true) -> a block was processed (successfully or not) // - Ok(true) -> a block was processed (successfully or not)
// - Ok(false) -> no block was processed, but we are ready for the next iteration // - Ok(false) -> no block was processed, but we are ready for the next iteration
// - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
async fn resync_iter( async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
&self, if let Some(first_pair_res) = self.resync_queue.iter()?.next() {
must_exit: &mut watch::Receiver<bool>,
) -> Result<bool, sled::Error> {
if let Some(first_pair_res) = self.resync_queue.iter().next() {
let (time_bytes, hash_bytes) = first_pair_res?; let (time_bytes, hash_bytes) = first_pair_res?;
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
@ -966,7 +962,7 @@ impl ErrorCounter {
} }
} }
fn decode(data: sled::IVec) -> Self { fn decode<'a>(data: db::Value<'a>) -> Self {
Self { Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()), errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()), last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),

View file

@ -1,6 +1,6 @@
use opentelemetry::{global, metrics::*}; use opentelemetry::{global, metrics::*};
use garage_util::sled_counter::SledCountedTree; use garage_db as db;
/// TableMetrics reference all counter used for metrics /// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics { pub struct BlockManagerMetrics {
@ -23,12 +23,12 @@ pub struct BlockManagerMetrics {
} }
impl BlockManagerMetrics { impl BlockManagerMetrics {
pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self { pub fn new(resync_queue: db::Tree, resync_errors: db::Tree) -> Self {
let meter = global::meter("garage_model/block"); let meter = global::meter("garage_model/block");
Self { Self {
_resync_queue_len: meter _resync_queue_len: meter
.u64_value_observer("block.resync_queue_length", move |observer| { .u64_value_observer("block.resync_queue_length", move |observer| {
observer.observe(resync_queue.len() as u64, &[]) observer.observe(resync_queue.len().unwrap() as u64, &[]) // TODO fix unwrap
}) })
.with_description( .with_description(
"Number of block hashes queued for local check and possible resync", "Number of block hashes queued for local check and possible resync",
@ -36,7 +36,7 @@ impl BlockManagerMetrics {
.init(), .init(),
_resync_errored_blocks: meter _resync_errored_blocks: meter
.u64_value_observer("block.resync_errored_blocks", move |observer| { .u64_value_observer("block.resync_errored_blocks", move |observer| {
observer.observe(resync_errors.len() as u64, &[]) observer.observe(resync_errors.len().unwrap() as u64, &[]) // TODO fix unwrap
}) })
.with_description("Number of block hashes whose last resync resulted in an error") .with_description("Number of block hashes whose last resync resulted in an error")
.init(), .init(),

View file

@ -1,5 +1,7 @@
use std::convert::TryInto; use std::convert::TryInto;
use garage_db as db;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::time::*; use garage_util::time::*;
@ -7,31 +9,47 @@ use garage_util::time::*;
use crate::manager::BLOCK_GC_DELAY; use crate::manager::BLOCK_GC_DELAY;
pub struct BlockRc { pub struct BlockRc {
pub(crate) rc: sled::Tree, pub(crate) rc: db::Tree,
} }
impl BlockRc { impl BlockRc {
pub(crate) fn new(rc: sled::Tree) -> Self { pub(crate) fn new(rc: db::Tree) -> Self {
Self { rc } Self { rc }
} }
/// Increment the reference counter associated to a hash. /// Increment the reference counter associated to a hash.
/// Returns true if the RC goes from zero to nonzero. /// Returns true if the RC goes from zero to nonzero.
pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> { pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> {
let old_rc = self let old_rc = self.rc.db().transaction(|tx| {
.rc let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
.fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?; match old_rc.increment().serialize() {
let old_rc = RcEntry::parse_opt(old_rc); Some(x) => {
tx.insert(&self.rc, &hash, x)?;
}
None => {
tx.remove(&self.rc, &hash)?;
}
};
tx.commit(old_rc)
})?;
Ok(old_rc.is_zero()) Ok(old_rc.is_zero())
} }
/// Decrement the reference counter associated to a hash. /// Decrement the reference counter associated to a hash.
/// Returns true if the RC is now zero. /// Returns true if the RC is now zero.
pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> { pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> {
let new_rc = self let new_rc = self.rc.db().transaction(|tx| {
.rc let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement();
.update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?; match new_rc.serialize() {
let new_rc = RcEntry::parse_opt(new_rc); Some(x) => {
tx.insert(&self.rc, &hash, x)?;
}
None => {
tx.remove(&self.rc, &hash)?;
}
};
tx.commit(new_rc)
})?;
Ok(matches!(new_rc, RcEntry::Deletable { .. })) Ok(matches!(new_rc, RcEntry::Deletable { .. }))
} }
@ -44,12 +62,15 @@ impl BlockRc {
/// deletion time has passed /// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec(); let now = now_msec();
self.rc.update_and_fetch(&hash, |rcval| { self.rc.db().transaction(|tx| {
let updated = match RcEntry::parse_opt(rcval) { let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent, match rcval {
v => v, RcEntry::Deletable { at_time } if now > at_time => {
tx.remove(&self.rc, &hash)?;
}
_ => (),
}; };
updated.serialize() tx.commit(())
})?; })?;
Ok(()) Ok(())
} }

View file

@ -3,26 +3,31 @@ pub mod sled_adapter;
#[cfg(test)] #[cfg(test)]
pub mod test; pub mod test;
use core::ops::{Bound, RangeBounds};
use std::borrow::Cow; use std::borrow::Cow;
use std::sync::Arc; use std::sync::Arc;
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use err_derive::Error;
#[derive(Clone)] #[derive(Clone)]
pub struct Db(pub(crate) Arc<dyn IDb>); pub struct Db(pub(crate) Arc<dyn IDb>);
#[derive(Clone)] #[derive(Clone, Copy)]
pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>); pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>);
#[derive(Clone)] #[derive(Clone)]
pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize); pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize);
pub type Value<'a> = Cow<'a, [u8]>; pub type Value<'a> = Cow<'a, [u8]>;
pub type ValueIter<'a> = Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + 'a>; pub type ValueIter<'a> =
Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + Send + Sync + 'a>;
// ---- // ----
#[derive(Debug)] #[derive(Debug, Error)]
#[error(display = "{}", _0)]
pub struct Error(Cow<'static, str>); pub struct Error(Cow<'static, str>);
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@ -43,14 +48,14 @@ impl<E> From<Error> for TxError<E> {
// ---- // ----
impl Db { impl Db {
pub fn tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> { pub fn open_tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> {
let tree_id = self.0.tree(name.as_ref())?; let tree_id = self.0.open_tree(name.as_ref())?;
Ok(Tree(self.0.clone(), tree_id)) Ok(Tree(self.0.clone(), tree_id))
} }
pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E> pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
where where
F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync, F: Fn(Transaction<'_>) -> TxResult<R, E>,
R: Send + Sync, R: Send + Sync,
E: Send + Sync, E: Send + Sync,
{ {
@ -83,20 +88,50 @@ impl Db {
} }
impl Tree { impl Tree {
pub fn db(&self) -> Db {
Db(self.0.clone())
}
pub fn get<'a, T: AsRef<[u8]>>(&'a self, key: T) -> Result<Option<Value<'a>>> { pub fn get<'a, T: AsRef<[u8]>>(&'a self, key: T) -> Result<Option<Value<'a>>> {
self.0.get(self.1, key.as_ref()) self.0.get(self.1, key.as_ref())
} }
pub fn put<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> { pub fn len(&self) -> Result<usize> {
self.0.put(self.1, key.as_ref(), value.as_ref()) self.0.len(self.1)
} }
pub fn iter<'a>(&'a self, reverse: bool) -> Result<ValueIter<'a>> { pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> {
self.0.range(self.1, None, reverse) self.0.insert(self.1, key.as_ref(), value.as_ref())
} }
pub fn range<'a, T: AsRef<[u8]>>(&'a self, start: T, reverse: bool) -> Result<ValueIter<'a>> { pub fn remove<'a, T: AsRef<[u8]>>(&'a self, key: T) -> Result<bool> {
self.0.range(self.1, Some(start.as_ref()), reverse) self.0.remove(self.1, key.as_ref())
}
pub fn iter<'a>(&'a self) -> Result<ValueIter<'a>> {
self.0.iter(self.1)
}
pub fn iter_rev<'a>(&'a self) -> Result<ValueIter<'a>> {
self.0.iter_rev(self.1)
}
pub fn range<'a, K, R>(&'a self, range: R) -> Result<ValueIter<'a>>
where
K: AsRef<[u8]>,
R: RangeBounds<K>,
{
let sb = range.start_bound();
let eb = range.end_bound();
self.0.range(self.1, get_bound(sb), get_bound(eb))
}
pub fn range_rev<'a, K, R>(&'a self, range: R) -> Result<ValueIter<'a>>
where
K: AsRef<[u8]>,
R: RangeBounds<K>,
{
let sb = range.start_bound();
let eb = range.end_bound();
self.0.range_rev(self.1, get_bound(sb), get_bound(eb))
} }
} }
@ -105,8 +140,17 @@ impl<'a> Transaction<'a> {
self.0.get(tree.1, key.as_ref()) self.0.get(tree.1, key.as_ref())
} }
pub fn put<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, tree: &Tree, key: T, value: U) -> Result<()> { pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(
self.0.put(tree.1, key.as_ref(), value.as_ref()) &self,
tree: &Tree,
key: T,
value: U,
) -> Result<()> {
self.0.insert(tree.1, key.as_ref(), value.as_ref())
}
pub fn remove<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<bool> {
self.0.remove(tree.1, key.as_ref())
} }
#[must_use] #[must_use]
@ -131,15 +175,28 @@ impl<'a> Transaction<'a> {
// ---- Internal interfaces // ---- Internal interfaces
pub(crate) trait IDb: Send + Sync { pub(crate) trait IDb: Send + Sync {
fn tree(&self, name: &str) -> Result<usize>; fn open_tree(&self, name: &str) -> Result<usize>;
fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>; fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>;
fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; fn len(&self, tree: usize) -> Result<usize>;
fn range<'a>(
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool>;
fn iter<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>>;
fn iter_rev<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>>;
fn range<'a, 'r>(
&'a self, &'a self,
tree: usize, tree: usize,
start: Option<&[u8]>, low: Bound<&'r [u8]>,
reverse: bool, high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>>;
fn range_rev<'a, 'r>(
&'a self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>>; ) -> Result<ValueIter<'a>>;
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>; fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
@ -147,10 +204,11 @@ pub(crate) trait IDb: Send + Sync {
pub(crate) trait ITx<'a> { pub(crate) trait ITx<'a> {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>; fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>;
fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>; fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool>;
} }
pub(crate) trait ITxFn: Send + Sync { pub(crate) trait ITxFn {
fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult; fn try_on<'a>(&'a self, tx: &'a dyn ITx<'a>) -> TxFnResult;
} }
@ -162,7 +220,7 @@ enum TxFnResult {
struct TxFn<F, R, E> struct TxFn<F, R, E>
where where
F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync, F: Fn(Transaction<'_>) -> TxResult<R, E>,
R: Send + Sync, R: Send + Sync,
E: Send + Sync, E: Send + Sync,
{ {
@ -172,7 +230,7 @@ where
impl<F, R, E> ITxFn for TxFn<F, R, E> impl<F, R, E> ITxFn for TxFn<F, R, E>
where where
F: Fn(Transaction<'_>) -> TxResult<R, E> + Send + Sync, F: Fn(Transaction<'_>) -> TxResult<R, E>,
R: Send + Sync, R: Send + Sync,
E: Send + Sync, E: Send + Sync,
{ {
@ -187,3 +245,13 @@ where
retval retval
} }
} }
// ----
fn get_bound<K: AsRef<[u8]>>(b: Bound<&K>) -> Bound<&[u8]> {
match b {
Bound::Included(v) => Bound::Included(v.as_ref()),
Bound::Excluded(v) => Bound::Excluded(v.as_ref()),
Bound::Unbounded => Bound::Unbounded,
}
}

View file

@ -1,3 +1,5 @@
use core::ops::Bound;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -42,7 +44,7 @@ impl SledDb {
} }
impl IDb for SledDb { impl IDb for SledDb {
fn tree(&self, name: &str) -> Result<usize> { fn open_tree(&self, name: &str) -> Result<usize> {
let mut trees = self.trees.write().unwrap(); let mut trees = self.trees.write().unwrap();
if let Some(i) = trees.1.get(name) { if let Some(i) = trees.1.get(name) {
Ok(*i) Ok(*i)
@ -60,42 +62,63 @@ impl IDb for SledDb {
Ok(tree.get(key)?.map(|v| v.to_vec().into())) Ok(tree.get(key)?.map(|v| v.to_vec().into()))
} }
fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;
Ok(tree.remove(key)?.is_some())
}
fn len(&self, tree: usize) -> Result<usize> {
let tree = self.get_tree(tree)?;
Ok(tree.len())
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
tree.insert(key, value)?; tree.insert(key, value)?;
Ok(()) Ok(())
} }
fn range<'a>( fn iter<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> {
let tree = self.get_tree(tree)?;
Ok(Box::new(tree.iter().map(|v| {
v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into()))
.map_err(Into::into)
})))
}
fn iter_rev<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> {
let tree = self.get_tree(tree)?;
Ok(Box::new(tree.iter().rev().map(|v| {
v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into()))
.map_err(Into::into)
})))
}
fn range<'a, 'r>(
&'a self, &'a self,
tree: usize, tree: usize,
start: Option<&[u8]>, low: Bound<&'r [u8]>,
reverse: bool, high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>> { ) -> Result<ValueIter<'a>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
if reverse { Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).map(|v| {
match start { v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into()))
Some(start) => Ok(Box::new(tree.range(..=start).rev().map(|v| { .map_err(Into::into)
v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into())) })))
.map_err(Into::into) }
}))), fn range_rev<'a, 'r>(
None => Ok(Box::new(tree.iter().rev().map(|v| { &'a self,
v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into())) tree: usize,
.map_err(Into::into) low: Bound<&'r [u8]>,
}))), high: Bound<&'r [u8]>,
} ) -> Result<ValueIter<'a>> {
} else { let tree = self.get_tree(tree)?;
match start { Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).rev().map(
Some(start) => Ok(Box::new(tree.range(start..).map(|v| { |v| {
v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into())) v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into()))
.map_err(Into::into) .map_err(Into::into)
}))), },
None => Ok(Box::new(tree.iter().map(|v| { )))
v.map(|(x, y)| (x.to_vec().into(), y.to_vec().into()))
.map_err(Into::into)
}))),
}
}
} }
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
@ -162,7 +185,7 @@ impl<'a> ITx<'a> for SledTx<'a> {
Ok(tmp.map(|v| v.to_vec().into())) Ok(tmp.map(|v| v.to_vec().into()))
} }
fn put(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self let tree = self
.trees .trees
.get(tree) .get(tree)
@ -170,4 +193,12 @@ impl<'a> ITx<'a> for SledTx<'a> {
self.save_error(tree.insert(key, value))?; self.save_error(tree.insert(key, value))?;
Ok(()) Ok(())
} }
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self
.trees
.get(tree)
.ok_or(Error("invalid tree id".into()))?;
Ok(self.save_error(tree.remove(key))?.is_some())
}
} }

View file

@ -3,21 +3,22 @@ use crate::*;
use crate::sled_adapter::SledDb; use crate::sled_adapter::SledDb;
fn test_suite(db: Db) -> Result<()> { fn test_suite(db: Db) -> Result<()> {
let tree = db.tree("tree")?; let tree = db.open_tree("tree")?;
let ka: &[u8] = &b"test"[..]; let ka: &[u8] = &b"test"[..];
let kb: &[u8] = &b"zwello"[..]; let kb: &[u8] = &b"zwello"[..];
let kint: &[u8] = &b"tz"[..];
let va: &[u8] = &b"plop"[..]; let va: &[u8] = &b"plop"[..];
let vb: &[u8] = &b"plip"[..]; let vb: &[u8] = &b"plip"[..];
let vc: &[u8] = &b"plup"[..]; let vc: &[u8] = &b"plup"[..];
tree.put(ka, va)?; tree.insert(ka, va)?;
assert_eq!(tree.get(ka)?, Some(va.into())); assert_eq!(tree.get(ka)?, Some(va.into()));
let res = db.transaction::<_, (), _>(|tx| { let res = db.transaction::<_, (), _>(|tx| {
assert_eq!(tx.get(&tree, ka)?, Some(va.into())); assert_eq!(tx.get(&tree, ka)?, Some(va.into()));
tx.put(&tree, ka, vb)?; tx.insert(&tree, ka, vb)?;
assert_eq!(tx.get(&tree, ka)?, Some(vb.into())); assert_eq!(tx.get(&tree, ka)?, Some(vb.into()));
@ -29,7 +30,7 @@ fn test_suite(db: Db) -> Result<()> {
let res = db.transaction::<(), _, _>(|tx| { let res = db.transaction::<(), _, _>(|tx| {
assert_eq!(tx.get(&tree, ka)?, Some(vb.into())); assert_eq!(tx.get(&tree, ka)?, Some(vb.into()));
tx.put(&tree, ka, vc)?; tx.insert(&tree, ka, vc)?;
assert_eq!(tx.get(&tree, ka)?, Some(vc.into())); assert_eq!(tx.get(&tree, ka)?, Some(vc.into()));
@ -38,27 +39,27 @@ fn test_suite(db: Db) -> Result<()> {
assert!(matches!(res, Err(TxError::Abort(42)))); assert!(matches!(res, Err(TxError::Abort(42))));
assert_eq!(tree.get(ka)?, Some(vb.into())); assert_eq!(tree.get(ka)?, Some(vb.into()));
let mut iter = tree.iter(false)?; let mut iter = tree.iter()?;
assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into())); assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into()));
assert!(iter.next().is_none()); assert!(iter.next().is_none());
tree.put(kb, vc)?; tree.insert(kb, vc)?;
assert_eq!(tree.get(kb)?, Some(vc.into())); assert_eq!(tree.get(kb)?, Some(vc.into()));
let mut iter = tree.iter(false)?; let mut iter = tree.iter()?;
assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into())); assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into()));
assert_eq!(iter.next().unwrap().unwrap(), (kb.into(), vc.into())); assert_eq!(iter.next().unwrap().unwrap(), (kb.into(), vc.into()));
assert!(iter.next().is_none()); assert!(iter.next().is_none());
let mut iter = tree.range("tz", false)?; let mut iter = tree.range(kint..)?;
assert_eq!(iter.next().unwrap().unwrap(), (kb.into(), vc.into())); assert_eq!(iter.next().unwrap().unwrap(), (kb.into(), vc.into()));
assert!(iter.next().is_none()); assert!(iter.next().is_none());
let mut iter = tree.range("tz", true)?; let mut iter = tree.range_rev(..kint)?;
assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into())); assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into()));
assert!(iter.next().is_none()); assert!(iter.next().is_none());
let mut iter = tree.iter(true)?; let mut iter = tree.iter_rev()?;
assert_eq!(iter.next().unwrap().unwrap(), (kb.into(), vc.into())); assert_eq!(iter.next().unwrap().unwrap(), (kb.into(), vc.into()));
assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into())); assert_eq!(iter.next().unwrap().unwrap(), (ka.into(), vb.into()));
assert!(iter.next().is_none()); assert!(iter.next().is_none());

View file

@ -21,6 +21,7 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
garage_db = { version = "0.8.0", path = "../db" }
garage_api = { version = "0.7.0", path = "../api" } garage_api = { version = "0.7.0", path = "../api" }
garage_model = { version = "0.7.0", path = "../model" } garage_model = { version = "0.7.0", path = "../model" }
garage_rpc = { version = "0.7.0", path = "../rpc" } garage_rpc = { version = "0.7.0", path = "../rpc" }

View file

@ -727,7 +727,7 @@ impl AdminRpcHandler {
{ {
writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap(); writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap();
if opt.detailed { if opt.detailed {
writeln!(to, " number of items: {}", t.data.store.len()).unwrap(); writeln!(to, " number of items: {}", t.data.store.len().unwrap()).unwrap(); // TODO fix len unwrap
writeln!( writeln!(
to, to,
" Merkle tree size: {}", " Merkle tree size: {}",

View file

@ -1,3 +1,4 @@
use core::ops::Bound;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::watch; use tokio::sync::watch;
@ -65,9 +66,15 @@ impl Repair {
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![]; let mut pos = vec![];
while let Some((item_key, item_bytes)) = while let Some(item) = self
self.garage.version_table.data.store.get_gt(&pos)? .garage
.version_table
.data
.store
.range((Bound::Excluded(pos), Bound::Unbounded))?
.next()
{ {
let (item_key, item_bytes) = item?;
pos = item_key.to_vec(); pos = item_key.to_vec();
let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
@ -109,9 +116,16 @@ impl Repair {
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![]; let mut pos = vec![];
while let Some((item_key, item_bytes)) = while let Some(item) = self
self.garage.block_ref_table.data.store.get_gt(&pos)? .garage
.block_ref_table
.data
.store
.range((Bound::Excluded(pos), Bound::Unbounded))?
.next()
{ {
let (item_key, item_bytes) = item?;
pos = item_key.to_vec(); pos = item_key.to_vec();
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;

View file

@ -38,6 +38,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
.flush_every_ms(Some(config.sled_flush_every_ms)) .flush_every_ms(Some(config.sled_flush_every_ms))
.open() .open()
.expect("Unable to open sled DB"); .expect("Unable to open sled DB");
let db = garage_db::sled_adapter::SledDb::new(db);
info!("Initializing background runner..."); info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c(); let watch_cancel = netapp::util::watch_ctrl_c();

View file

@ -14,6 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
garage_db = { version = "0.8.0", path = "../db" }
garage_rpc = { version = "0.7.0", path = "../rpc" } garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" } garage_table = { version = "0.7.0", path = "../table" }
garage_block = { version = "0.7.0", path = "../block" } garage_block = { version = "0.7.0", path = "../block" }
@ -30,8 +31,6 @@ tracing = "0.1.30"
rand = "0.8" rand = "0.8"
zstd = { version = "0.9", default-features = false } zstd = { version = "0.9", default-features = false }
sled = "0.34"
rmp-serde = "0.15" rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11" serde_bytes = "0.11"

View file

@ -2,6 +2,8 @@ use std::sync::Arc;
use netapp::NetworkKey; use netapp::NetworkKey;
use garage_db as db;
use garage_util::background::*; use garage_util::background::*;
use garage_util::config::*; use garage_util::config::*;
@ -33,7 +35,7 @@ pub struct Garage {
pub config: Config, pub config: Config,
/// The local database /// The local database
pub db: sled::Db, pub db: db::Db,
/// A background job runner /// A background job runner
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
/// The membership manager /// The membership manager
@ -71,7 +73,7 @@ pub struct GarageK2V {
impl Garage { impl Garage {
/// Create and run garage /// Create and run garage
pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> { pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
let network_key = NetworkKey::from_slice( let network_key = NetworkKey::from_slice(
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
) )
@ -199,7 +201,7 @@ impl Garage {
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
impl GarageK2V { impl GarageK2V {
fn new(system: Arc<System>, db: &sled::Db, meta_rep_param: TableShardedReplication) -> Self { fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table..."); info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
info!("Initialize K2V subscription manager..."); info!("Initialize K2V subscription manager...");

View file

@ -6,6 +6,8 @@ use std::time::Duration;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use garage_db as db;
use garage_rpc::ring::Ring; use garage_rpc::ring::Ring;
use garage_rpc::system::System; use garage_rpc::system::System;
use garage_util::data::*; use garage_util::data::*;
@ -135,7 +137,7 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
pub struct IndexCounter<T: CounterSchema> { pub struct IndexCounter<T: CounterSchema> {
this_node: Uuid, this_node: Uuid,
local_counter: sled::Tree, local_counter: db::Tree,
propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
} }
@ -144,7 +146,7 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn new( pub fn new(
system: Arc<System>, system: Arc<System>,
replication: TableShardedReplication, replication: TableShardedReplication,
db: &sled::Db, db: &db::Db,
) -> Arc<Self> { ) -> Arc<Self> {
let background = system.background.clone(); let background = system.background.clone();
@ -177,12 +179,12 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> {
let tree_key = self.table.data.tree_key(pk, sk); let tree_key = self.table.data.tree_key(pk, sk);
let new_entry = self.local_counter.transaction(|tx| { let new_entry = self.local_counter.db().transaction(|tx| {
let mut entry = match tx.get(&tree_key[..])? { let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
Some(old_bytes) => { Some(old_bytes) => {
rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
.map_err(Error::RmpDecode) .map_err(Error::RmpDecode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)? .map_err(db::TxError::Abort)?
} }
None => LocalCounterEntry { None => LocalCounterEntry {
values: BTreeMap::new(), values: BTreeMap::new(),
@ -197,8 +199,8 @@ impl<T: CounterSchema> IndexCounter<T> {
let new_entry_bytes = rmp_to_vec_all_named(&entry) let new_entry_bytes = rmp_to_vec_all_named(&entry)
.map_err(Error::RmpEncode) .map_err(Error::RmpEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?; .map_err(db::TxError::Abort)?;
tx.insert(&tree_key[..], new_entry_bytes)?; tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
Ok(entry) Ok(entry)
})?; })?;

View file

@ -25,7 +25,7 @@ impl Migrate {
.open_tree("bucket:table") .open_tree("bucket:table")
.map_err(GarageError::from)?; .map_err(GarageError::from)?;
for res in tree.iter() { for res in tree.iter().map_err(GarageError::from)? {
let (_k, v) = res.map_err(GarageError::from)?; let (_k, v) = res.map_err(GarageError::from)?;
let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..]) let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])
.map_err(GarageError::from)?; .map_err(GarageError::from)?;

View file

@ -26,8 +26,6 @@ hexdump = "0.1"
tracing = "0.1.30" tracing = "0.1.30"
rand = "0.8" rand = "0.8"
sled = "0.34"
rmp-serde = "0.15" rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11" serde_bytes = "0.11"

View file

@ -3,12 +3,12 @@ use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
use sled::{IVec, Transactional};
use tokio::sync::Notify; use tokio::sync::Notify;
use garage_db as db;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::sled_counter::SledCountedTree;
use garage_rpc::system::System; use garage_rpc::system::System;
@ -25,12 +25,12 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub instance: F, pub instance: F,
pub replication: R, pub replication: R,
pub store: sled::Tree, pub store: db::Tree,
pub(crate) merkle_tree: sled::Tree, pub(crate) merkle_tree: db::Tree,
pub(crate) merkle_todo: sled::Tree, pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify, pub(crate) merkle_todo_notify: Notify,
pub(crate) gc_todo: SledCountedTree, pub(crate) gc_todo: db::Tree,
pub(crate) metrics: TableMetrics, pub(crate) metrics: TableMetrics,
} }
@ -40,7 +40,7 @@ where
F: TableSchema, F: TableSchema,
R: TableReplication, R: TableReplication,
{ {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db let store = db
.open_tree(&format!("{}:table", F::TABLE_NAME)) .open_tree(&format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
@ -55,7 +55,6 @@ where
let gc_todo = db let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
let gc_todo = SledCountedTree::new(gc_todo);
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
@ -98,30 +97,30 @@ where
None => partition_hash.to_vec(), None => partition_hash.to_vec(),
Some(sk) => self.tree_key(partition_key, sk), Some(sk) => self.tree_key(partition_key, sk),
}; };
let range = self.store.range(first_key..); let range = self.store.range(first_key..)?;
self.read_range_aux(partition_hash, range, filter, limit) self.read_range_aux(partition_hash, range, filter, limit)
} }
EnumerationOrder::Reverse => match start { EnumerationOrder::Reverse => match start {
Some(sk) => { Some(sk) => {
let last_key = self.tree_key(partition_key, sk); let last_key = self.tree_key(partition_key, sk);
let range = self.store.range(..=last_key).rev(); let range = self.store.range_rev(..=last_key)?;
self.read_range_aux(partition_hash, range, filter, limit) self.read_range_aux(partition_hash, range, filter, limit)
} }
None => { None => {
let mut last_key = partition_hash.to_vec(); let mut last_key = partition_hash.to_vec();
let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap()); let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap());
last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1)); last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1));
let range = self.store.range(..last_key).rev(); let range = self.store.range_rev(..last_key)?;
self.read_range_aux(partition_hash, range, filter, limit) self.read_range_aux(partition_hash, range, filter, limit)
} }
}, },
} }
} }
fn read_range_aux( fn read_range_aux<'a>(
&self, &self,
partition_hash: Hash, partition_hash: Hash,
range: impl Iterator<Item = sled::Result<(IVec, IVec)>>, range: db::ValueIter<'a>,
filter: &Option<F::Filter>, filter: &Option<F::Filter>,
limit: usize, limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> { ) -> Result<Vec<Arc<ByteBuf>>, Error> {
@ -183,12 +182,10 @@ where
tree_key: &[u8], tree_key: &[u8],
f: impl Fn(Option<F::E>) -> F::E, f: impl Fn(Option<F::E>) -> F::E,
) -> Result<Option<F::E>, Error> { ) -> Result<Option<F::E>, Error> {
let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { let changed = self.store.db().transaction(|tx| {
let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? { let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? {
Some(old_bytes) => { Some(old_bytes) => {
let old_entry = self let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
.decode_entry(&old_bytes)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
let new_entry = f(Some(old_entry.clone())); let new_entry = f(Some(old_entry.clone()));
(Some(old_entry), Some(old_bytes), new_entry) (Some(old_entry), Some(old_bytes), new_entry)
} }
@ -204,13 +201,17 @@ where
// the associated Merkle tree entry. // the associated Merkle tree entry.
let new_bytes = rmp_to_vec_all_named(&new_entry) let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RmpEncode) .map_err(Error::RmpEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?; .map_err(db::TxError::Abort)?;
let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
if value_changed || encoding_changed { if value_changed || encoding_changed {
let new_bytes_hash = blake2sum(&new_bytes[..]); let new_bytes_hash = blake2sum(&new_bytes[..]);
mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?; tx.insert(
store.insert(tree_key.to_vec(), new_bytes)?; &self.merkle_todo,
tree_key.to_vec(),
new_bytes_hash.as_slice(),
)?;
tx.insert(&self.store, tree_key.to_vec(), new_bytes)?;
Ok(Some((old_entry, new_entry, new_bytes_hash))) Ok(Some((old_entry, new_entry, new_bytes_hash)))
} else { } else {
Ok(None) Ok(None)
@ -244,11 +245,11 @@ where
} }
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { let removed = self.store.db().transaction(|tx| {
if let Some(cur_v) = store.get(k)? { if let Some(cur_v) = tx.get(&self.store, k)? {
if cur_v == v { if cur_v == v {
store.remove(k)?; tx.remove(&self.store, k)?;
mkl_todo.insert(k, vec![])?; tx.insert(&self.merkle_todo, k, vec![])?;
return Ok(true); return Ok(true);
} }
} }
@ -270,12 +271,12 @@ where
k: &[u8], k: &[u8],
vhash: Hash, vhash: Hash,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { let removed = self.store.db().transaction(|tx| {
if let Some(cur_v) = store.get(k)? { if let Some(cur_v) = tx.get(&self.store, k)? {
if blake2sum(&cur_v[..]) == vhash { if blake2sum(&cur_v[..]) == vhash {
store.remove(k)?; tx.remove(&self.store, k)?;
mkl_todo.insert(k, vec![])?; tx.insert(&self.merkle_todo, k, vec![])?;
return Ok(Some(cur_v)); return Ok(Some(cur_v.into_owned()));
} }
} }
Ok(None) Ok(None)
@ -316,6 +317,6 @@ where
} }
pub fn gc_todo_len(&self) -> usize { pub fn gc_todo_len(&self) -> usize {
self.gc_todo.len() self.gc_todo.len().unwrap() // TODO fix unwrap
} }
} }

View file

@ -12,9 +12,10 @@ use futures::select;
use futures_util::future::*; use futures_util::future::*;
use tokio::sync::watch; use tokio::sync::watch;
use garage_db as db;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*; use garage_util::time::*;
use garage_rpc::system::System; use garage_rpc::system::System;
@ -106,7 +107,7 @@ where
// List entries in the GC todo list // List entries in the GC todo list
// These entries are put there when a tombstone is inserted in the table // These entries are put there when a tombstone is inserted in the table
// (see update_entry in data.rs) // (see update_entry in data.rs)
for entry_kv in self.data.gc_todo.iter() { for entry_kv in self.data.gc_todo.iter()? {
let (k, vhash) = entry_kv?; let (k, vhash) = entry_kv?;
let mut todo_entry = GcTodoEntry::parse(&k, &vhash); let mut todo_entry = GcTodoEntry::parse(&k, &vhash);
@ -353,17 +354,17 @@ impl GcTodoEntry {
} }
/// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree /// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree
pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self { pub(crate) fn parse(db_k: &[u8], db_v: &[u8]) -> Self {
Self { Self {
tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()), tombstone_timestamp: u64::from_be_bytes(db_k[0..8].try_into().unwrap()),
key: sled_k[8..].to_vec(), key: db_k[8..].to_vec(),
value_hash: Hash::try_from(sled_v).unwrap(), value_hash: Hash::try_from(db_v).unwrap(),
value: None, value: None,
} }
} }
/// Saves the GcTodoEntry in the gc_todo tree /// Saves the GcTodoEntry in the gc_todo tree
pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> { pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(()) Ok(())
} }
@ -373,12 +374,15 @@ impl GcTodoEntry {
/// This is usefull to remove a todo entry only under the condition /// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e. /// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same /// what we have to do is still the same
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> { pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>( let key = self.todo_table_key();
&self.todo_table_key()[..], gc_todo_tree.db().transaction(|tx| {
Some(self.value_hash), let old_val = tx.get(gc_todo_tree, &key)?;
None, if old_val == Some(self.value_hash.as_slice().into()) {
)?; tx.remove(gc_todo_tree, &key)?;
}
tx.commit(())
})?;
Ok(()) Ok(())
} }

View file

@ -4,11 +4,10 @@ use std::time::Duration;
use futures::select; use futures::select;
use futures_util::future::*; use futures_util::future::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sled::transaction::{
ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
};
use tokio::sync::watch; use tokio::sync::watch;
use garage_db as db;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
@ -90,7 +89,8 @@ where
async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() { while !*must_exit.borrow() {
if let Some(x) = self.data.merkle_todo.iter().next() { if let Some(x) = self.data.merkle_todo.iter().unwrap().next() {
// TODO unwrap to remove
match x { match x {
Ok((key, valhash)) => { Ok((key, valhash)) => {
if let Err(e) = self.update_item(&key[..], &valhash[..]) { if let Err(e) = self.update_item(&key[..], &valhash[..]) {
@ -137,13 +137,18 @@ where
}; };
self.data self.data
.merkle_tree .merkle_tree
.db()
.transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
let deleted = self let deleted = self.data.merkle_todo.db().transaction(|tx| {
.data let old_val = tx.get(&self.data.merkle_todo, k)?;
.merkle_todo if old_val == Some(vhash_by.into()) {
.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)? tx.remove(&self.data.merkle_todo, k)?;
.is_ok(); tx.commit(true)
} else {
tx.commit(false)
}
})?;
if !deleted { if !deleted {
debug!( debug!(
@ -157,12 +162,12 @@ where
fn update_item_rec( fn update_item_rec(
&self, &self,
tx: &TransactionalTree, tx: db::Transaction<'_>,
k: &[u8], k: &[u8],
khash: &Hash, khash: &Hash,
key: &MerkleNodeKey, key: &MerkleNodeKey,
new_vhash: Option<Hash>, new_vhash: Option<Hash>,
) -> ConflictableTransactionResult<Option<Hash>, Error> { ) -> db::TxResult<Option<Hash>, Error> {
let i = key.prefix.len(); let i = key.prefix.len();
// Read node at current position (defined by the prefix stored in key) // Read node at current position (defined by the prefix stored in key)
@ -203,7 +208,7 @@ where
} }
MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)), MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
x @ MerkleNode::Leaf(_, _) => { x @ MerkleNode::Leaf(_, _) => {
tx.remove(key_sub.encode())?; tx.remove(&self.data.merkle_tree, key_sub.encode())?;
Some(x) Some(x)
} }
} }
@ -283,28 +288,27 @@ where
fn read_node_txn( fn read_node_txn(
&self, &self,
tx: &TransactionalTree, tx: db::Transaction<'_>,
k: &MerkleNodeKey, k: &MerkleNodeKey,
) -> ConflictableTransactionResult<MerkleNode, Error> { ) -> db::TxResult<MerkleNode, Error> {
let ent = tx.get(k.encode())?; let ent = tx.get(&self.data.merkle_tree, k.encode())?;
MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort) MerkleNode::decode_opt(ent).map_err(db::TxError::Abort)
} }
fn put_node_txn( fn put_node_txn(
&self, &self,
tx: &TransactionalTree, tx: db::Transaction<'_>,
k: &MerkleNodeKey, k: &MerkleNodeKey,
v: &MerkleNode, v: &MerkleNode,
) -> ConflictableTransactionResult<Hash, Error> { ) -> db::TxResult<Hash, Error> {
trace!("Put Merkle node: {:?} => {:?}", k, v); trace!("Put Merkle node: {:?} => {:?}", k, v);
if *v == MerkleNode::Empty { if *v == MerkleNode::Empty {
tx.remove(k.encode())?; tx.remove(&self.data.merkle_tree, k.encode())?;
Ok(self.empty_node_hash) Ok(self.empty_node_hash)
} else { } else {
let vby = rmp_to_vec_all_named(v) let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?;
.map_err(|e| ConflictableTransactionError::Abort(e.into()))?;
let rethash = blake2sum(&vby[..]); let rethash = blake2sum(&vby[..]);
tx.insert(k.encode(), vby)?; tx.insert(&self.data.merkle_tree, k.encode(), vby)?;
Ok(rethash) Ok(rethash)
} }
} }
@ -316,11 +320,11 @@ where
} }
pub fn merkle_tree_len(&self) -> usize { pub fn merkle_tree_len(&self) -> usize {
self.data.merkle_tree.len() self.data.merkle_tree.len().unwrap() // TODO fix unwrap
} }
pub fn todo_len(&self) -> usize { pub fn todo_len(&self) -> usize {
self.data.merkle_todo.len() self.data.merkle_todo.len().unwrap() // TODO fix unwrap
} }
} }
@ -347,7 +351,7 @@ impl MerkleNodeKey {
} }
impl MerkleNode { impl MerkleNode {
fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> { fn decode_opt(ent: Option<db::Value<'_>>) -> Result<Self, Error> {
match ent { match ent {
None => Ok(MerkleNode::Empty), None => Ok(MerkleNode::Empty),
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),

View file

@ -1,6 +1,6 @@
use opentelemetry::{global, metrics::*, KeyValue}; use opentelemetry::{global, metrics::*, KeyValue};
use garage_util::sled_counter::SledCountedTree; use garage_db as db;
/// TableMetrics reference all counter used for metrics /// TableMetrics reference all counter used for metrics
pub struct TableMetrics { pub struct TableMetrics {
@ -19,11 +19,7 @@ pub struct TableMetrics {
pub(crate) sync_items_received: Counter<u64>, pub(crate) sync_items_received: Counter<u64>,
} }
impl TableMetrics { impl TableMetrics {
pub fn new( pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: db::Tree) -> Self {
table_name: &'static str,
merkle_todo: sled::Tree,
gc_todo: SledCountedTree,
) -> Self {
let meter = global::meter(table_name); let meter = global::meter(table_name);
TableMetrics { TableMetrics {
_merkle_todo_len: meter _merkle_todo_len: meter
@ -31,7 +27,7 @@ impl TableMetrics {
"table.merkle_updater_todo_queue_length", "table.merkle_updater_todo_queue_length",
move |observer| { move |observer| {
observer.observe( observer.observe(
merkle_todo.len() as u64, merkle_todo.len().unwrap() as u64, // TODO fix unwrap
&[KeyValue::new("table_name", table_name)], &[KeyValue::new("table_name", table_name)],
) )
}, },
@ -43,7 +39,7 @@ impl TableMetrics {
"table.gc_todo_queue_length", "table.gc_todo_queue_length",
move |observer| { move |observer| {
observer.observe( observer.observe(
gc_todo.len() as u64, gc_todo.len().unwrap() as u64, // TODO fix unwrap
&[KeyValue::new("table_name", table_name)], &[KeyValue::new("table_name", table_name)],
) )
}, },

View file

@ -258,7 +258,7 @@ where
while !*must_exit.borrow() { while !*must_exit.borrow() {
let mut items = Vec::new(); let mut items = Vec::new();
for item in self.data.store.range(begin.to_vec()..end.to_vec()) { for item in self.data.store.range(begin.to_vec()..end.to_vec())? {
let (key, value) = item?; let (key, value) = item?;
items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
@ -603,7 +603,8 @@ impl SyncTodo {
let retain = nodes.contains(&my_id); let retain = nodes.contains(&my_id);
if !retain { if !retain {
// Check if we have some data to send, otherwise skip // Check if we have some data to send, otherwise skip
if data.store.range(begin..end).next().is_none() { if data.store.range(begin..end).unwrap().next().is_none() {
// TODO fix unwrap
continue; continue;
} }
} }

View file

@ -13,6 +13,8 @@ use opentelemetry::{
Context, Context,
}; };
use garage_db as db;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_util::metrics::RecordDuration; use garage_util::metrics::RecordDuration;
@ -69,7 +71,7 @@ where
{ {
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub fn new(instance: F, replication: R, system: Arc<System>, db: &sled::Db) -> Arc<Self> { pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
let endpoint = system let endpoint = system
.netapp .netapp
.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME)); .endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));

View file

@ -14,6 +14,8 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
garage_db = { version = "0.8.0", path = "../db" }
blake2 = "0.9" blake2 = "0.9"
err-derive = "0.3" err-derive = "0.3"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
@ -22,8 +24,6 @@ tracing = "0.1.30"
rand = "0.8" rand = "0.8"
sha2 = "0.9" sha2 = "0.9"
sled = "0.34"
chrono = "0.4" chrono = "0.4"
rmp-serde = "0.15" rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }

View file

@ -26,8 +26,8 @@ pub enum Error {
#[error(display = "Netapp error: {}", _0)] #[error(display = "Netapp error: {}", _0)]
Netapp(#[error(source)] netapp::error::Error), Netapp(#[error(source)] netapp::error::Error),
#[error(display = "Sled error: {}", _0)] #[error(display = "DB error: {}", _0)]
Sled(#[error(source)] sled::Error), Db(#[error(source)] garage_db::Error),
#[error(display = "Messagepack encode error: {}", _0)] #[error(display = "Messagepack encode error: {}", _0)]
RmpEncode(#[error(source)] rmp_serde::encode::Error), RmpEncode(#[error(source)] rmp_serde::encode::Error),
@ -78,11 +78,11 @@ impl Error {
} }
} }
impl From<sled::transaction::TransactionError<Error>> for Error { impl From<garage_db::TxError<Error>> for Error {
fn from(e: sled::transaction::TransactionError<Error>) -> Error { fn from(e: garage_db::TxError<Error>) -> Error {
match e { match e {
sled::transaction::TransactionError::Abort(x) => x, garage_db::TxError::Abort(x) => x,
sled::transaction::TransactionError::Storage(x) => Error::Sled(x), garage_db::TxError::Db(x) => Error::Db(x),
} }
} }
} }

View file

@ -11,7 +11,7 @@ pub mod error;
pub mod formater; pub mod formater;
pub mod metrics; pub mod metrics;
pub mod persister; pub mod persister;
pub mod sled_counter; //pub mod sled_counter;
pub mod time; pub mod time;
pub mod token_bucket; pub mod token_bucket;
pub mod tranquilizer; pub mod tranquilizer;