garage/src/db/lib.rs
Zdenek Crha 4b54e053df convert_db: prevent conversion between same input/output engine
Use optional DB open overrides for both input and output database.

Duplicating the same override flag for input/output would result in too
many, too long flags. It would be too costly for very rare edge-case
where converting between same DB engine, just with different flags.

Because overrides flags for different engines are disjoint and we are
preventing conversion between same input/ouput DB engine, we can have
only one set.

The override flag will be passed either to input or output, based on
engine type it belongs to. It will never be passed to both of them and
cause unwelcome surprise to user.
2024-01-18 17:57:56 +01:00

473 lines
11 KiB
Rust

#[macro_use]
#[cfg(feature = "sqlite")]
extern crate tracing;
#[cfg(feature = "lmdb")]
pub mod lmdb_adapter;
#[cfg(feature = "sled")]
pub mod sled_adapter;
#[cfg(feature = "sqlite")]
pub mod sqlite_adapter;
pub mod counted_tree_hack;
#[cfg(test)]
pub mod test;
use core::ops::{Bound, RangeBounds};
use std::borrow::Cow;
use std::cell::Cell;
use std::sync::Arc;
use err_derive::Error;
pub(crate) type OnCommit = Vec<Box<dyn FnOnce()>>;
#[derive(Clone)]
pub struct Db(pub(crate) Arc<dyn IDb>);
pub struct Transaction<'a> {
tx: &'a mut dyn ITx,
on_commit: OnCommit,
}
#[derive(Clone)]
pub struct Tree(Arc<dyn IDb>, usize);
pub type Value = Vec<u8>;
pub type ValueIter<'a> = Box<dyn std::iter::Iterator<Item = Result<(Value, Value)>> + 'a>;
pub type TxValueIter<'a> = Box<dyn std::iter::Iterator<Item = TxOpResult<(Value, Value)>> + 'a>;
// ----
#[derive(Debug, Error)]
#[error(display = "{}", _0)]
pub struct Error(pub Cow<'static, str>);
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Error)]
#[error(display = "{}", _0)]
pub struct TxOpError(pub(crate) Error);
pub type TxOpResult<T> = std::result::Result<T, TxOpError>;
pub enum TxError<E> {
Abort(E),
Db(Error),
}
pub type TxResult<R, E> = std::result::Result<R, TxError<E>>;
impl<E> From<TxOpError> for TxError<E> {
fn from(e: TxOpError) -> TxError<E> {
TxError::Db(e.0)
}
}
pub fn unabort<R, E>(res: TxResult<R, E>) -> TxOpResult<std::result::Result<R, E>> {
match res {
Ok(v) => Ok(Ok(v)),
Err(TxError::Abort(e)) => Ok(Err(e)),
Err(TxError::Db(e)) => Err(TxOpError(e)),
}
}
// ----
impl Db {
pub fn engine(&self) -> String {
self.0.engine()
}
pub fn open_tree<S: AsRef<str>>(&self, name: S) -> Result<Tree> {
let tree_id = self.0.open_tree(name.as_ref())?;
Ok(Tree(self.0.clone(), tree_id))
}
pub fn list_trees(&self) -> Result<Vec<String>> {
self.0.list_trees()
}
pub fn transaction<R, E, F>(&self, fun: F) -> TxResult<R, E>
where
F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
let f = TxFn {
function: fun,
result: Cell::new(None),
};
let tx_res = self.0.transaction(&f);
let ret = f
.result
.into_inner()
.expect("Transaction did not store result");
match tx_res {
Ok(on_commit) => match ret {
Ok(value) => {
on_commit.into_iter().for_each(|f| f());
Ok(value)
}
_ => unreachable!(),
},
Err(TxError::Abort(())) => match ret {
Err(TxError::Abort(e)) => Err(TxError::Abort(e)),
_ => unreachable!(),
},
Err(TxError::Db(e2)) => match ret {
// Ok was stored -> the error occured when finalizing
// transaction
Ok(_) => Err(TxError::Db(e2)),
// An error was already stored: that's the one we want to
// return
Err(TxError::Db(e)) => Err(TxError::Db(e)),
_ => unreachable!(),
},
}
}
pub fn import(&self, other: &Db) -> Result<()> {
let existing_trees = self.list_trees()?;
if !existing_trees.is_empty() {
return Err(Error(
format!(
"destination database already contains data: {:?}",
existing_trees
)
.into(),
));
}
let tree_names = other.list_trees()?;
for name in tree_names {
let tree = self.open_tree(&name)?;
if tree.len()? > 0 {
return Err(Error(format!("tree {} already contains data", name).into()));
}
let ex_tree = other.open_tree(&name)?;
let tx_res = self.transaction(|tx| {
let mut i = 0;
for item in ex_tree.iter().map_err(TxError::Abort)? {
let (k, v) = item.map_err(TxError::Abort)?;
tx.insert(&tree, k, v)?;
i += 1;
if i % 1000 == 0 {
println!("{}: imported {}", name, i);
}
}
Ok(i)
});
let total = match tx_res {
Err(TxError::Db(e)) => return Err(e),
Err(TxError::Abort(e)) => return Err(e),
Ok(x) => x,
};
println!("{}: finished importing, {} items", name, total);
}
Ok(())
}
}
/// List of supported database engine types
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Engine {
#[cfg(feature = "lmdb")]
Lmdb,
#[cfg(feature = "sqlite")]
Sqlite,
#[cfg(feature = "sled")]
Sled,
}
impl std::fmt::Display for Engine {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
#[cfg(feature = "lmdb")]
Self::Lmdb => fmt.write_str("lmdb"),
#[cfg(feature = "sqlite")]
Self::Sqlite => fmt.write_str("sqlite"),
#[cfg(feature = "sled")]
Self::Sled => fmt.write_str("sled"),
}
}
}
impl std::str::FromStr for Engine {
type Err = Error;
fn from_str(text: &str) -> Result<Engine> {
match text {
#[cfg(feature = "lmdb")]
"lmdb" | "heed" => Ok(Self::Lmdb),
#[cfg(feature = "sqlite")]
"sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite),
#[cfg(feature = "sled")]
"sled" => Ok(Self::Sled),
kind => Err(Error(format!("Invalid DB engine: {}", kind).into())),
}
}
}
#[allow(clippy::len_without_is_empty)]
impl Tree {
#[inline]
pub fn db(&self) -> Db {
Db(self.0.clone())
}
#[inline]
pub fn get<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> {
self.0.get(self.1, key.as_ref())
}
#[inline]
pub fn len(&self) -> Result<usize> {
self.0.len(self.1)
}
#[inline]
pub fn fast_len(&self) -> Result<Option<usize>> {
self.0.fast_len(self.1)
}
#[inline]
pub fn first(&self) -> Result<Option<(Value, Value)>> {
self.iter()?.next().transpose()
}
#[inline]
pub fn get_gt<T: AsRef<[u8]>>(&self, from: T) -> Result<Option<(Value, Value)>> {
self.range((Bound::Excluded(from), Bound::Unbounded))?
.next()
.transpose()
}
/// Returns the old value if there was one
#[inline]
pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(
&self,
key: T,
value: U,
) -> Result<Option<Value>> {
self.0.insert(self.1, key.as_ref(), value.as_ref())
}
/// Returns the old value if there was one
#[inline]
pub fn remove<T: AsRef<[u8]>>(&self, key: T) -> Result<Option<Value>> {
self.0.remove(self.1, key.as_ref())
}
/// Clears all values from the tree
#[inline]
pub fn clear(&self) -> Result<()> {
self.0.clear(self.1)
}
#[inline]
pub fn iter(&self) -> Result<ValueIter<'_>> {
self.0.iter(self.1)
}
#[inline]
pub fn iter_rev(&self) -> Result<ValueIter<'_>> {
self.0.iter_rev(self.1)
}
#[inline]
pub fn range<K, R>(&self, range: R) -> Result<ValueIter<'_>>
where
K: AsRef<[u8]>,
R: RangeBounds<K>,
{
let sb = range.start_bound();
let eb = range.end_bound();
self.0.range(self.1, get_bound(sb), get_bound(eb))
}
#[inline]
pub fn range_rev<K, R>(&self, range: R) -> Result<ValueIter<'_>>
where
K: AsRef<[u8]>,
R: RangeBounds<K>,
{
let sb = range.start_bound();
let eb = range.end_bound();
self.0.range_rev(self.1, get_bound(sb), get_bound(eb))
}
}
#[allow(clippy::len_without_is_empty)]
impl<'a> Transaction<'a> {
#[inline]
pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
self.tx.get(tree.1, key.as_ref())
}
#[inline]
pub fn len(&self, tree: &Tree) -> TxOpResult<usize> {
self.tx.len(tree.1)
}
/// Returns the old value if there was one
#[inline]
pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(
&mut self,
tree: &Tree,
key: T,
value: U,
) -> TxOpResult<Option<Value>> {
self.tx.insert(tree.1, key.as_ref(), value.as_ref())
}
/// Returns the old value if there was one
#[inline]
pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> {
self.tx.remove(tree.1, key.as_ref())
}
#[inline]
pub fn iter(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
self.tx.iter(tree.1)
}
#[inline]
pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<TxValueIter<'_>> {
self.tx.iter_rev(tree.1)
}
#[inline]
pub fn range<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
where
K: AsRef<[u8]>,
R: RangeBounds<K>,
{
let sb = range.start_bound();
let eb = range.end_bound();
self.tx.range(tree.1, get_bound(sb), get_bound(eb))
}
#[inline]
pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<TxValueIter<'_>>
where
K: AsRef<[u8]>,
R: RangeBounds<K>,
{
let sb = range.start_bound();
let eb = range.end_bound();
self.tx.range_rev(tree.1, get_bound(sb), get_bound(eb))
}
#[inline]
pub fn on_commit<F: FnOnce() + 'static>(&mut self, f: F) {
self.on_commit.push(Box::new(f));
}
}
// ---- Internal interfaces
pub(crate) trait IDb: Send + Sync {
fn engine(&self) -> String;
fn open_tree(&self, name: &str) -> Result<usize>;
fn list_trees(&self) -> Result<Vec<String>>;
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
fn len(&self, tree: usize) -> Result<usize>;
fn fast_len(&self, _tree: usize) -> Result<Option<usize>> {
Ok(None)
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>;
fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>;
fn clear(&self, tree: usize) -> Result<()>;
fn iter(&self, tree: usize) -> Result<ValueIter<'_>>;
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>;
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>>;
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>>;
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()>;
}
pub(crate) trait ITx {
fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>;
fn len(&self, tree: usize) -> TxOpResult<usize>;
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>>;
fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>;
fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>;
fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>>;
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>>;
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>>;
}
pub(crate) trait ITxFn {
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult;
}
pub(crate) enum TxFnResult {
Ok(OnCommit),
Abort,
DbErr,
}
struct TxFn<F, R, E>
where
F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
function: F,
result: Cell<Option<TxResult<R, E>>>,
}
impl<F, R, E> ITxFn for TxFn<F, R, E>
where
F: Fn(&mut Transaction<'_>) -> TxResult<R, E>,
{
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
let mut tx = Transaction {
tx,
on_commit: vec![],
};
let res = (self.function)(&mut tx);
let res2 = match &res {
Ok(_) => TxFnResult::Ok(tx.on_commit),
Err(TxError::Abort(_)) => TxFnResult::Abort,
Err(TxError::Db(_)) => TxFnResult::DbErr,
};
self.result.set(Some(res));
res2
}
}
// ----
fn get_bound<K: AsRef<[u8]>>(b: Bound<&K>) -> Bound<&[u8]> {
match b {
Bound::Included(v) => Bound::Included(v.as_ref()),
Bound::Excluded(v) => Bound::Excluded(v.as_ref()),
Bound::Unbounded => Bound::Unbounded,
}
}