Abstract database behind generic interface and implement alternative drivers #322

Merged
lx merged 64 commits from db-abstraction into main 2022-06-08 08:01:56 +00:00
8 changed files with 90 additions and 85 deletions
Showing only changes of commit cc0d984118 - Show all commits

View file

@ -56,6 +56,8 @@ const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
// to delete the block locally.
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
type OptKVPair = Option<(Vec<u8>, Vec<u8>)>;
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpc {
@ -640,7 +642,7 @@ impl BlockManager {
}
}
fn resync_get_next(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>, db::Error> {
fn resync_get_next(&self) -> Result<OptKVPair, db::Error> {
match self.resync_queue.iter()?.next() {
None => Ok(None),
Some(v) => {
@ -970,7 +972,7 @@ impl ErrorCounter {
}
}
fn decode<'a>(data: db::Value<'a>) -> Self {
fn decode(data: db::Value<'_>) -> Self {
Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),

View file

@ -29,14 +29,14 @@ pub type Exporter<'a> = Box<dyn std::iter::Iterator<Item = Result<(String, Value
pub struct Value<'a>(pub(crate) Box<dyn IValue<'a> + 'a>);
pub trait IValue<'a>: AsRef<[u8]> {
fn into_vec(&mut self) -> Vec<u8>;
pub trait IValue<'a>: AsRef<[u8]> + core::borrow::Borrow<[u8]> {
fn take_maybe(&mut self) -> Vec<u8>;
}
impl<'a> Value<'a> {
#[inline]
pub fn into_vec(mut self) -> Vec<u8> {
self.0.into_vec()
self.0.take_maybe()
}
}
@ -50,7 +50,7 @@ impl<'a> AsRef<[u8]> for Value<'a> {
impl<'a> std::borrow::Borrow<[u8]> for Value<'a> {
#[inline]
fn borrow(&self) -> &[u8] {
self.0.as_ref().as_ref()
self.0.as_ref().borrow()
}
}
@ -82,7 +82,7 @@ impl<'a> std::fmt::Debug for Value<'a> {
}
impl<'a> IValue<'a> for Vec<u8> {
fn into_vec(&mut self) -> Vec<u8> {
fn take_maybe(&mut self) -> Vec<u8> {
std::mem::take(self)
}
}
@ -106,7 +106,7 @@ impl<'a> From<&'a [u8]> for Value<'a> {
}
impl<'a> IValue<'a> for &'a [u8] {
fn into_vec(&mut self) -> Vec<u8> {
fn take_maybe(&mut self) -> Vec<u8> {
self.to_vec()
}
}
@ -175,21 +175,22 @@ impl Db {
}
}
pub fn export<'a>(&'a self) -> Result<Exporter<'a>> {
pub fn export(&self) -> Result<Exporter<'_>> {
self.0.export()
}
pub fn import<'a>(&self, ex: Exporter<'a>) -> Result<()> {
pub fn import(&self, ex: Exporter<'_>) -> Result<()> {
self.0.import(ex)
}
}
#[allow(clippy::len_without_is_empty)]
impl Tree {
pub fn db(&self) -> Db {
Db(self.0.clone())
}
pub fn get<'a, T: AsRef<[u8]>>(&'a self, key: T) -> Result<Option<Value<'a>>> {
pub fn get<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value<'_>>> {
self.0.get(self.1, key.as_ref())
}
pub fn len(&self) -> Result<usize> {
@ -199,18 +200,18 @@ impl Tree {
pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> {
self.0.insert(self.1, key.as_ref(), value.as_ref())
}
pub fn remove<'a, T: AsRef<[u8]>>(&'a self, key: T) -> Result<bool> {
pub fn remove<T: AsRef<[u8]>>(&self, key: T) -> Result<bool> {
self.0.remove(self.1, key.as_ref())
}
pub fn iter<'a>(&'a self) -> Result<ValueIter<'a>> {
pub fn iter(&self) -> Result<ValueIter<'_>> {
self.0.iter(self.1)
}
pub fn iter_rev<'a>(&'a self) -> Result<ValueIter<'a>> {
pub fn iter_rev(&self) -> Result<ValueIter<'_>> {
self.0.iter_rev(self.1)
}
pub fn range<'a, K, R>(&'a self, range: R) -> Result<ValueIter<'a>>
pub fn range<K, R>(&self, range: R) -> Result<ValueIter<'_>>
where
K: AsRef<[u8]>,
R: RangeBounds<K>,
@ -219,7 +220,7 @@ impl Tree {
let eb = range.end_bound();
self.0.range(self.1, get_bound(sb), get_bound(eb))
}
pub fn range_rev<'a, K, R>(&'a self, range: R) -> Result<ValueIter<'a>>
pub fn range_rev<K, R>(&self, range: R) -> Result<ValueIter<'_>>
where
K: AsRef<[u8]>,
R: RangeBounds<K>,
@ -230,6 +231,7 @@ impl Tree {
}
}
#[allow(clippy::len_without_is_empty)]
impl<'a> Transaction<'a> {
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<Option<Value<'a>>> {
self.0.get(tree.1, key.as_ref())
@ -278,12 +280,10 @@ impl<'a> Transaction<'a> {
// ----
#[must_use]
pub fn abort<R, E>(self, e: E) -> TxResult<R, E> {
Err(TxError::Abort(e))
}
#[must_use]
pub fn commit<R, E>(self, r: R) -> TxResult<R, E> {
Ok(r)
}
@ -294,32 +294,32 @@ impl<'a> Transaction<'a> {
pub(crate) trait IDb: Send + Sync {
fn open_tree(&self, name: &str) -> Result<usize>;
fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>>;
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>>;
fn len(&self, tree: usize) -> Result<usize>;
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()>;
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool>;
fn iter<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>>;
fn iter_rev<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>>;
fn iter(&self, tree: usize) -> Result<ValueIter<'_>>;
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>;
fn range<'a, 'r>(
&'a self,
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>>;
fn range_rev<'a, 'r>(
&'a self,
) -> Result<ValueIter<'_>>;
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>>;
) -> Result<ValueIter<'_>>;
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()>;
fn export<'a>(&'a self) -> Result<Exporter<'a>>;
fn import<'a>(&self, ex: Exporter<'a>) -> Result<()>;
fn export(&self) -> Result<Exporter<'_>>;
fn import(&self, ex: Exporter<'_>) -> Result<()>;
}
pub(crate) trait ITx<'a> {

View file

@ -27,7 +27,7 @@ impl From<sled::Error> for Error {
// -- val
impl<'a> IValue<'a> for sled::IVec {
fn into_vec(&mut self) -> Vec<u8> {
fn take_maybe(&mut self) -> Vec<u8> {
self.to_vec()
}
}
@ -52,7 +52,7 @@ pub struct SledDb {
}
impl SledDb {
pub fn new(db: sled::Db) -> Db {
pub fn init(db: sled::Db) -> Db {
let s = Self {
db,
trees: RwLock::new((Vec::new(), HashMap::new())),
@ -67,7 +67,7 @@ impl SledDb {
.0
.get(i)
.cloned()
.ok_or(Error("invalid tree id".into()))
.ok_or_else(|| Error("invalid tree id".into()))
}
}
@ -87,7 +87,7 @@ impl IDb for SledDb {
// ----
fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
let tree = self.get_tree(tree)?;
Ok(tree.get(key)?.map(From::from))
}
@ -108,37 +108,37 @@ impl IDb for SledDb {
Ok(())
}
fn iter<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> {
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
Ok(Box::new(tree.iter().map(|v| {
v.map(|(x, y)| (x.into(), y.into())).map_err(Into::into)
})))
}
fn iter_rev<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> {
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
Ok(Box::new(tree.iter().rev().map(|v| {
v.map(|(x, y)| (x.into(), y.into())).map_err(Into::into)
})))
}
fn range<'a, 'r>(
&'a self,
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>> {
) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).map(|v| {
v.map(|(x, y)| (x.into(), y.into())).map_err(Into::into)
})))
}
fn range_rev<'a, 'r>(
&'a self,
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>> {
) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
Ok(Box::new(tree.range::<&'r [u8], _>((low, high)).rev().map(
|v| v.map(|(x, y)| (x.into(), y.into())).map_err(Into::into),
@ -178,7 +178,7 @@ impl IDb for SledDb {
// ----
fn export<'a>(&'a self) -> Result<Exporter<'a>> {
fn export(&self) -> Result<Exporter<'_>> {
let mut trees = vec![];
for name in self.db.tree_names() {
let name = std::str::from_utf8(&name)
@ -188,17 +188,17 @@ impl IDb for SledDb {
let tree = self.trees.read().unwrap().0.get(tree).unwrap().clone();
trees.push((name, tree));
}
let trees_exporter: Exporter<'a> = Box::new(trees.into_iter().map(|(name, tree)| {
let iter: ValueIter<'a> = Box::new(
let trees_exporter: Exporter<'_> = Box::new(trees.into_iter().map(|(name, tree)| {
let iter: ValueIter<'_> = Box::new(
tree.iter()
.map(|v| v.map(|(x, y)| (x.into(), y.into())).map_err(Into::into)),
);
Ok((name.to_string(), iter))
Ok((name, iter))
}));
Ok(trees_exporter)
}
fn import<'a>(&self, ex: Exporter<'a>) -> Result<()> {
fn import(&self, ex: Exporter<'_>) -> Result<()> {
for ex_tree in ex {
let (name, data) = ex_tree?;
@ -234,9 +234,11 @@ struct SledTx<'a> {
impl<'a> SledTx<'a> {
fn get_tree(&self, i: usize) -> Result<&TransactionalTree> {
self.trees.get(i).ok_or(Error(
self.trees.get(i).ok_or_else(|| {
Error(
"invalid tree id (it might have been openned after the transaction started)".into(),
))
)
})
}
fn save_error<R>(&self, v: std::result::Result<R, UnabortableTransactionError>) -> Result<R> {

View file

@ -35,7 +35,7 @@ pub struct SqliteDb {
}
impl SqliteDb {
pub fn new(db: rusqlite::Connection) -> Db {
pub fn init(db: rusqlite::Connection) -> Db {
let s = Self {
db: Mutex::new(db),
trees: RwLock::new(Vec::new()),
@ -49,7 +49,7 @@ impl SqliteDb {
.unwrap()
.get(i)
.cloned()
.ok_or(Error("invalid tree id".into()))
.ok_or_else(|| Error("invalid tree id".into()))
}
}
@ -77,7 +77,7 @@ impl IDb for SqliteDb {
// ----
fn get<'a>(&'a self, tree: usize, key: &[u8]) -> Result<Option<Value<'a>>> {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
let tree = self.get_tree(tree)?;
let db = self.db.lock().unwrap();
let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
@ -102,7 +102,7 @@ impl IDb for SqliteDb {
let mut res_iter = stmt.query([])?;
match res_iter.next()? {
None => Ok(0),
Some(v) => Ok(v.get::<_, usize>(0)?.into()),
Some(v) => Ok(v.get::<_, usize>(0)?),
}
}
@ -116,24 +116,24 @@ impl IDb for SqliteDb {
Ok(())
}
fn iter<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> {
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
DbValueIterator::new(self.db.lock().unwrap(), &sql, [])
DbValueIterator::make(self.db.lock().unwrap(), &sql, [])
}
fn iter_rev<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> {
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
DbValueIterator::new(self.db.lock().unwrap(), &sql, [])
DbValueIterator::make(self.db.lock().unwrap(), &sql, [])
}
fn range<'a, 'r>(
&'a self,
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>> {
) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
let (bounds_sql, params) = bounds_sql(low, high);
@ -143,18 +143,18 @@ impl IDb for SqliteDb {
.iter()
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
DbValueIterator::new::<&[&dyn rusqlite::ToSql]>(
DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(
self.db.lock().unwrap(),
&sql,
params.as_ref(),
)
}
fn range_rev<'a, 'r>(
&'a self,
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'a>> {
) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?;
let (bounds_sql, params) = bounds_sql(low, high);
@ -164,7 +164,7 @@ impl IDb for SqliteDb {
.iter()
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
DbValueIterator::new::<&[&dyn rusqlite::ToSql]>(
DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(
self.db.lock().unwrap(),
&sql,
params.as_ref(),
@ -200,11 +200,11 @@ impl IDb for SqliteDb {
// ----
fn export<'a>(&'a self) -> Result<Exporter<'a>> {
fn export(&self) -> Result<Exporter<'_>> {
unimplemented!()
}
fn import<'a>(&self, ex: Exporter<'a>) -> Result<()> {
fn import(&self, ex: Exporter<'_>) -> Result<()> {
unimplemented!()
}
@ -220,9 +220,11 @@ struct SqliteTx<'a> {
impl<'a> SqliteTx<'a> {
fn get_tree(&self, i: usize) -> Result<String> {
self.trees.get(i).cloned().ok_or(Error(
self.trees.get(i).cloned().ok_or_else(|| {
Error(
"invalid tree id (it might have been openned after the transaction started)".into(),
))
)
})
}
}
@ -244,7 +246,7 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
let mut res_iter = stmt.query([])?;
match res_iter.next()? {
None => Ok(0),
Some(v) => Ok(v.get::<_, usize>(0)?.into()),
Some(v) => Ok(v.get::<_, usize>(0)?),
}
}
@ -299,13 +301,13 @@ struct DbValueIterator<'a> {
}
impl<'a> DbValueIterator<'a> {
fn new<P: rusqlite::Params>(
fn make<P: rusqlite::Params>(
db: MutexGuard<'a, Connection>,
sql: &str,
args: P,
) -> Result<ValueIter<'a>> {
let res = DbValueIterator {
db: db,
db,
stmt: None,
iter: None,
_pin: PhantomPinned,
@ -314,7 +316,7 @@ impl<'a> DbValueIterator<'a> {
unsafe {
let db = NonNull::from(&boxed.db);
let stmt = db.as_ref().prepare(&sql)?;
let stmt = db.as_ref().prepare(sql)?;
let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt);

View file

@ -81,13 +81,13 @@ fn test_suite(db: Db) {
#[test]
fn test_sled_db() {
let path = mktemp::Temp::new_dir().unwrap();
let db = SledDb::new(sled::open(path.to_path_buf()).unwrap());
let db = SledDb::init(sled::open(path.to_path_buf()).unwrap());
test_suite(db);
drop(path);
}
#[test]
fn test_sqlite_db() {
let db = SqliteDb::new(rusqlite::Connection::open_in_memory().unwrap());
let db = SqliteDb::init(rusqlite::Connection::open_in_memory().unwrap());
test_suite(db);
}

View file

@ -16,6 +16,8 @@ pub struct Repair {
pub garage: Arc<Garage>,
}
type OptKVPair = Option<(Vec<u8>, Vec<u8>)>;
impl Repair {
pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
@ -105,7 +107,7 @@ impl Repair {
Ok(())
}
fn get_next_version_after(&self, pos: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, Error> {
fn get_next_version_after(&self, pos: &[u8]) -> Result<OptKVPair, Error> {
match self
.garage
.version_table
@ -161,7 +163,8 @@ impl Repair {
Ok(())
}
fn get_next_block_ref_after(&self, pos: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, Error> {
#[allow(clippy::type_complexity)]
fn get_next_block_ref_after(&self, pos: &[u8]) -> Result<OptKVPair, Error> {
match self
.garage
.block_ref_table

View file

@ -40,7 +40,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
.flush_every_ms(Some(config.sled_flush_every_ms))
.open()
.expect("Unable to open sled DB");
let db = db::sled_adapter::SledDb::new(db);
let db = db::sled_adapter::SledDb::init(db);
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();

View file

@ -206,12 +206,8 @@ where
if value_changed || encoding_changed {
let new_bytes_hash = blake2sum(&new_bytes[..]);
tx.insert(
&self.merkle_todo,
tree_key.to_vec(),
new_bytes_hash.as_slice(),
)?;
tx.insert(&self.store, tree_key.to_vec(), new_bytes)?;
tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?;
tx.insert(&self.store, tree_key, new_bytes)?;
Ok(Some((old_entry, new_entry, new_bytes_hash)))
} else {
Ok(None)