Abstract database behind generic interface and implement alternative drivers #322

Merged
lx merged 64 commits from db-abstraction into main 2022-06-08 08:01:56 +00:00
11 changed files with 215 additions and 164 deletions
Showing only changes of commit a9e79f848b - Show all commits

View file

@ -20,6 +20,7 @@ use opentelemetry::{
};
use garage_db as db;
use garage_db::counted_tree_hack::CountedTree;
use garage_util::data::*;
use garage_util::error::*;
@ -94,9 +95,9 @@ pub struct BlockManager {
rc: BlockRc,
resync_queue: db::Tree,
resync_queue: CountedTree,
resync_notify: Notify,
resync_errors: db::Tree,
resync_errors: CountedTree,
system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
@ -126,10 +127,14 @@ impl BlockManager {
let resync_queue = db
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
let resync_queue =
CountedTree::new(resync_queue).expect("Could not count block_local_resync_queue");
let resync_errors = db
.open_tree("block_local_resync_errors")
.expect("Unable to open block_local_resync_errors tree");
let resync_errors =
CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors");
let endpoint = system
.netapp
@ -299,12 +304,16 @@ impl BlockManager {
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> Result<usize, Error> {
Ok(self.resync_queue.len()?)
// This currently can't return an error because the CountedTree hack
// doesn't error on .len(), but this will change when we remove the hack
// (hopefully someday!)
Ok(self.resync_queue.len())
}
/// Get number of blocks that have an error
pub fn resync_errors_len(&self) -> Result<usize, Error> {
Ok(self.resync_errors.len()?)
// (see resync_queue_len comment)
Ok(self.resync_errors.len())
}
/// Get number of items in the refcount table

View file

@ -1,6 +1,6 @@
use opentelemetry::{global, metrics::*};
use garage_db as db;
use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics {
@ -23,14 +23,12 @@ pub struct BlockManagerMetrics {
}
impl BlockManagerMetrics {
pub fn new(resync_queue: db::Tree, resync_errors: db::Tree) -> Self {
pub fn new(resync_queue: CountedTree, resync_errors: CountedTree) -> Self {
let meter = global::meter("garage_model/block");
Self {
_resync_queue_len: meter
.u64_value_observer("block.resync_queue_length", move |observer| {
if let Ok(v) = resync_queue.len() {
observer.observe(v as u64, &[]);
}
observer.observe(resync_queue.len() as u64, &[]);
})
.with_description(
"Number of block hashes queued for local check and possible resync",
@ -38,9 +36,7 @@ impl BlockManagerMetrics {
.init(),
_resync_errored_blocks: meter
.u64_value_observer("block.resync_errored_blocks", move |observer| {
if let Ok(v) = resync_errors.len() {
observer.observe(v as u64, &[]);
}
observer.observe(resync_errors.len() as u64, &[]);
})
.with_description("Number of block hashes whose last resync resulted in an error")
.init(),

130
src/db/counted_tree_hack.rs Normal file
View file

@ -0,0 +1,130 @@
//! This hack allows a db tree to keep in RAM a counter of the number of entries
//! it contains, which is used to call .len() on it. This is usefull only for
//! the sled backend where .len() otherwise would have to traverse the whole
//! tree to count items. For sqlite and lmdb, this is mostly useless (but
//! hopefully not harmfull!). Note that a CountedTree cannot be part of a
//! transaction.
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use crate::{Result, Tree, TxError, Value, ValueIter};
#[derive(Clone)]
pub struct CountedTree(Arc<CountedTreeInternal>);
struct CountedTreeInternal {
tree: Tree,
len: AtomicUsize,
}
impl CountedTree {
pub fn new(tree: Tree) -> Result<Self> {
let len = tree.len()?;
Ok(Self(Arc::new(CountedTreeInternal {
tree,
len: AtomicUsize::new(len),
})))
}
pub fn len(&self) -> usize {
self.0.len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Value>> {
self.0.tree.get(key)
}
pub fn first(&self) -> Result<Option<(Value, Value)>> {
self.0.tree.first()
}
pub fn iter(&self) -> Result<ValueIter<'_>> {
self.0.tree.iter()
}
// ---- writing functions ----
pub fn insert<K, V>(&self, key: K, value: V) -> Result<bool>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let inserted = self.0.tree.insert(key, value)?;
if inserted {
self.0.len.fetch_add(1, Ordering::Relaxed);
}
Ok(inserted)
}
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<bool> {
let removed = self.0.tree.remove(key)?;
if removed {
self.0.len.fetch_sub(1, Ordering::Relaxed);
}
Ok(removed)
}
/*
pub fn pop_min(&self) -> Result<Option<(Value, Value)>> {
let res = self.0.tree.pop_min();
if let Ok(Some(_)) = &res {
self.0.len.fetch_sub(1, Ordering::Relaxed);
};
res
}
*/
pub fn compare_and_swap<K, OV, NV>(
&self,
key: K,
expected_old: Option<OV>,
new: Option<NV>,
) -> Result<bool>
where
K: AsRef<[u8]>,
OV: AsRef<[u8]>,
NV: AsRef<[u8]>,
{
let old_some = expected_old.is_some();
let new_some = new.is_some();
match self.0.tree.db().transaction(|mut tx| {
let old_val = tx.get(&self.0.tree, &key)?;
if old_val.as_ref().map(|x| &x[..]) == expected_old.as_ref().map(AsRef::as_ref) {
match &new {
Some(v) => {
tx.insert(&self.0.tree, &key, v)?;
}
None => {
tx.remove(&self.0.tree, &key)?;
}
}
tx.commit(())
} else {
tx.abort(())
}
}) {
Ok(()) => {
match (old_some, new_some) {
(false, true) => {
self.0.len.fetch_add(1, Ordering::Relaxed);
}
(true, false) => {
self.0.len.fetch_sub(1, Ordering::Relaxed);
}
_ => (),
}
Ok(true)
}
Err(TxError::Abort(())) => Ok(false),
Err(TxError::Db(e)) => Err(e),
}
}
}

View file

@ -2,6 +2,8 @@ pub mod lmdb_adapter;
pub mod sled_adapter;
pub mod sqlite_adapter;
pub mod counted_tree_hack;
#[cfg(test)]
pub mod test;
@ -164,10 +166,13 @@ impl Tree {
.transpose()
}
/// True if item didn't exist before, false if item already existed
/// and was replaced.
#[inline]
pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> {
pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<bool> {
self.0.insert(self.1, key.as_ref(), value.as_ref())
}
/// True if item was removed, false if item already didn't exist
#[inline]
pub fn remove<T: AsRef<[u8]>>(&self, key: T) -> Result<bool> {
self.0.remove(self.1, key.as_ref())
@ -215,15 +220,18 @@ impl<'a> Transaction<'a> {
self.0.len(tree.1)
}
/// True if item didn't exist before, false if item already existed
/// and was replaced.
#[inline]
pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(
&mut self,
tree: &Tree,
key: T,
value: U,
) -> Result<()> {
) -> Result<bool> {
self.0.insert(tree.1, key.as_ref(), value.as_ref())
}
/// True if item was removed, false if item already didn't exist
#[inline]
pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> Result<bool> {
self.0.remove(tree.1, key.as_ref())
@ -281,7 +289,7 @@ pub(crate) trait IDb: Send + Sync {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
fn len(&self, tree: usize) -> Result<usize>;
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool>;
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool>;
fn iter(&self, tree: usize) -> Result<ValueIter<'_>>;
@ -307,7 +315,7 @@ pub(crate) trait ITx {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
fn len(&self, tree: usize) -> Result<usize>;
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool>;
fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool>;
fn iter(&self, tree: usize) -> Result<ValueIter<'_>>;

View file

@ -122,12 +122,13 @@ impl IDb for LmdbDb {
Ok(tree.len(&tx)?.try_into().unwrap())
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;
let mut tx = self.db.write_txn()?;
let old_val = tree.get(&tx, key)?.map(Vec::from);
tree.put(&mut tx, key, value)?;
tx.commit()?;
Ok(())
Ok(old_val.is_none())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
@ -221,10 +222,11 @@ impl<'a, 'db> ITx for LmdbTx<'a, 'db> {
unimplemented!(".len() in transaction not supported with LMDB backend")
}
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool> {
let tree = *self.get_tree(tree)?;
let old_val = tree.get(&self.tx, key)?.map(Vec::from);
tree.put(&mut self.tx, key, value)?;
Ok(())
Ok(old_val.is_none())
}
fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = *self.get_tree(tree)?;

View file

@ -93,10 +93,10 @@ impl IDb for SledDb {
Ok(tree.len())
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;
tree.insert(key, value)?;
Ok(())
let old_val = tree.insert(key, value)?;
Ok(old_val.is_none())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
@ -206,10 +206,10 @@ impl<'a> ITx for SledTx<'a> {
unimplemented!(".len() in transaction not supported with Sled backend")
}
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;
self.save_error(tree.insert(key, value))?;
Ok(())
let old_val = self.save_error(tree.insert(key, value))?;
Ok(old_val.is_none())
}
fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;

View file

@ -54,6 +54,17 @@ impl SqliteDbInner {
.map(String::as_str)
.ok_or_else(|| Error("invalid tree id".into()))
}
fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> {
let mut stmt = self
.db
.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
let mut res_iter = stmt.query([key])?;
match res_iter.next()? {
None => Ok(None),
Some(v) => Ok(Some(v.get::<_, Vec<u8>>(0)?)),
}
}
}
impl IDb for SqliteDb {
@ -111,15 +122,7 @@ impl IDb for SqliteDb {
trace!("get {}: lock acquired", tree);
let tree = this.get_tree(tree)?;
let mut stmt = this
.db
.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
let mut res_iter = stmt.query([key])?;
match res_iter.next()? {
None => Ok(None),
Some(v) => Ok(Some(v.get::<_, Vec<u8>>(0)?)),
}
this.internal_get(tree, key)
}
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
@ -148,17 +151,18 @@ impl IDb for SqliteDb {
}
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool> {
trace!("insert {}: lock db", tree);
let this = self.0.lock().unwrap();
trace!("insert {}: lock acquired", tree);
let tree = this.get_tree(tree)?;
let old_val = this.internal_get(tree, key)?;
this.db.execute(
&format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),
params![key, value],
)?;
Ok(())
Ok(old_val.is_none())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
@ -276,11 +280,8 @@ impl<'a> SqliteTx<'a> {
)
})
}
}
impl<'a> ITx for SqliteTx<'a> {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
let tree = self.get_tree(tree)?;
fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> {
let mut stmt = self
.tx
.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
@ -290,6 +291,13 @@ impl<'a> ITx for SqliteTx<'a> {
Some(v) => Ok(Some(v.get::<_, Vec<u8>>(0)?)),
}
}
}
impl<'a> ITx for SqliteTx<'a> {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
let tree = self.get_tree(tree)?;
self.internal_get(tree, key)
}
fn len(&self, tree: usize) -> Result<usize> {
let tree = self.get_tree(tree)?;
let mut stmt = self.tx.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
@ -300,13 +308,14 @@ impl<'a> ITx for SqliteTx<'a> {
}
}
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;
let old_val = self.internal_get(tree, key)?;
self.tx.execute(
&format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),
params![key, value],
)?;
Ok(())
Ok(old_val.is_none())
}
fn remove(&mut self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;

View file

@ -6,6 +6,7 @@ use serde_bytes::ByteBuf;
use tokio::sync::Notify;
use garage_db as db;
use garage_db::counted_tree_hack::CountedTree;
use garage_util::data::*;
use garage_util::error::*;
@ -30,7 +31,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_tree: db::Tree,
pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
pub(crate) gc_todo: db::Tree,
pub(crate) gc_todo: CountedTree,
pub(crate) metrics: TableMetrics,
}
@ -55,6 +56,7 @@ where
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree");
let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
@ -319,6 +321,6 @@ where
}
pub fn gc_todo_len(&self) -> Result<usize, Error> {
Ok(self.gc_todo.len()?)
Ok(self.gc_todo.len())
}
}

View file

@ -12,7 +12,7 @@ use futures::select;
use futures_util::future::*;
use tokio::sync::watch;
use garage_db as db;
use garage_db::counted_tree_hack::CountedTree;
use garage_util::data::*;
use garage_util::error::*;
@ -370,7 +370,7 @@ impl GcTodoEntry {
}
/// Saves the GcTodoEntry in the gc_todo tree
pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
pub(crate) fn save(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
@ -380,16 +380,12 @@ impl GcTodoEntry {
/// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
let key = self.todo_table_key();
gc_todo_tree.db().transaction(|mut tx| {
let remove =
matches!(tx.get(gc_todo_tree, &key)?, Some(ov) if ov == self.value_hash.as_slice());
if remove {
tx.remove(gc_todo_tree, &key)?;
}
tx.commit(())
})?;
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &CountedTree) -> Result<(), Error> {
gc_todo_tree.compare_and_swap::<_, _, &[u8]>(
&self.todo_table_key(),
Some(self.value_hash),
None,
)?;
Ok(())
}

View file

@ -1,6 +1,7 @@
use opentelemetry::{global, metrics::*, KeyValue};
use garage_db as db;
use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct TableMetrics {
@ -19,7 +20,7 @@ pub struct TableMetrics {
pub(crate) sync_items_received: Counter<u64>,
}
impl TableMetrics {
pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: db::Tree) -> Self {
pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: CountedTree) -> Self {
let meter = global::meter(table_name);
TableMetrics {
_merkle_todo_len: meter
@ -40,12 +41,10 @@ impl TableMetrics {
.u64_value_observer(
"table.gc_todo_queue_length",
move |observer| {
if let Ok(v) = gc_todo.len() {
observer.observe(
v as u64,
&[KeyValue::new("table_name", table_name)],
);
}
observer.observe(
gc_todo.len() as u64,
&[KeyValue::new("table_name", table_name)],
);
},
)
.with_description("Table garbage collector TODO queue length")

View file

@ -1,100 +0,0 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use sled::{CompareAndSwapError, IVec, Iter, Result, Tree};
#[derive(Clone)]
pub struct SledCountedTree(Arc<SledCountedTreeInternal>);
struct SledCountedTreeInternal {
tree: Tree,
len: AtomicUsize,
}
impl SledCountedTree {
pub fn new(tree: Tree) -> Self {
let len = tree.len();
Self(Arc::new(SledCountedTreeInternal {
tree,
len: AtomicUsize::new(len),
}))
}
pub fn len(&self) -> usize {
self.0.len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.0.tree.is_empty()
}
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
self.0.tree.get(key)
}
pub fn iter(&self) -> Iter {
self.0.tree.iter()
}
// ---- writing functions ----
pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
where
K: AsRef<[u8]>,
V: Into<IVec>,
{
let res = self.0.tree.insert(key, value);
if res == Ok(None) {
self.0.len.fetch_add(1, Ordering::Relaxed);
}
res
}
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
let res = self.0.tree.remove(key);
if matches!(res, Ok(Some(_))) {
self.0.len.fetch_sub(1, Ordering::Relaxed);
}
res
}
pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
let res = self.0.tree.pop_min();
if let Ok(Some(_)) = &res {
self.0.len.fetch_sub(1, Ordering::Relaxed);
};
res
}
pub fn compare_and_swap<K, OV, NV>(
&self,
key: K,
old: Option<OV>,
new: Option<NV>,
) -> Result<std::result::Result<(), CompareAndSwapError>>
where
K: AsRef<[u8]>,
OV: AsRef<[u8]>,
NV: Into<IVec>,
{
let old_some = old.is_some();
let new_some = new.is_some();
let res = self.0.tree.compare_and_swap(key, old, new);
if res == Ok(Ok(())) {
match (old_some, new_some) {
(false, true) => {
self.0.len.fetch_add(1, Ordering::Relaxed);
}
(true, false) => {
self.0.len.fetch_sub(1, Ordering::Relaxed);
}
_ => (),
}
}
res
}
}