Abstract database behind generic interface and implement alternative drivers #322

Merged
lx merged 64 commits from db-abstraction into main 2022-06-08 08:01:56 +00:00
8 changed files with 165 additions and 106 deletions
Showing only changes of commit c439cb11a9 - Show all commits

43
Cargo.lock generated
View file

@ -2,12 +2,6 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "Inflector"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
[[package]]
name = "ahash"
version = "0.7.6"
@ -28,12 +22,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
[[package]]
name = "anyhow"
version = "1.0.56"
@ -1028,7 +1016,6 @@ version = "0.8.0"
dependencies = [
"err-derive 0.3.1",
"mktemp",
"ouroboros",
"rusqlite",
"sled",
]
@ -2211,30 +2198,6 @@ version = "6.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435"
[[package]]
name = "ouroboros"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f31a3b678685b150cba82b702dcdc5e155893f63610cf388d30cd988d4ca2bf"
dependencies = [
"aliasable",
"ouroboros_macro",
"stable_deref_trait",
]
[[package]]
name = "ouroboros_macro"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "084fd65d5dd8b3772edccb5ffd1e4b7eba43897ecd0f9401e330e8c542959408"
dependencies = [
"Inflector",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "parking_lot"
version = "0.11.2"
@ -3104,12 +3067,6 @@ version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5"
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "static_init"
version = "1.0.2"

View file

@ -547,9 +547,7 @@ impl BlockManager {
// - Ok(false) -> no block was processed, but we are ready for the next iteration
// - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
if let Some(first_pair_res) = self.resync_queue.iter()?.next() {
let (time_bytes, hash_bytes) = first_pair_res?;
if let Some((time_bytes, hash_bytes)) = self.resync_get_next()? {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
@ -642,6 +640,16 @@ impl BlockManager {
}
}
fn resync_get_next(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>, db::Error> {
match self.resync_queue.iter()?.next() {
None => Ok(None),
Some(v) => {
let (time_bytes, hash_bytes) = v?;
Ok(Some((time_bytes.into_owned(), hash_bytes.into_owned())))
}
}
}
async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {
let BlockStatus { exists, needed } = self
.mutation_lock

View file

@ -15,7 +15,6 @@ path = "lib.rs"
[dependencies]
err-derive = "0.3"
ouroboros = "0.15"
sled = "0.34"
rusqlite = "0.27"

View file

@ -22,11 +22,9 @@ pub struct Transaction<'a>(pub(crate) &'a dyn ITx<'a>);
pub struct Tree(pub(crate) Arc<dyn IDb>, pub(crate) usize);
pub type Value<'a> = Cow<'a, [u8]>;
pub type ValueIter<'a> =
Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + Send + 'a>;
pub type ValueIter<'a> = Box<dyn std::iter::Iterator<Item = Result<(Value<'a>, Value<'a>)>> + 'a>;
pub type Exporter<'a> =
Box<dyn std::iter::Iterator<Item = Result<(String, ValueIter<'a>)>> + 'a>;
pub type Exporter<'a> = Box<dyn std::iter::Iterator<Item = Result<(String, ValueIter<'a>)>> + 'a>;
// ----

View file

@ -1,11 +1,12 @@
use core::ops::Bound;
use std::cell::Cell;
use std::sync::{Arc, Mutex, RwLock, MutexGuard};
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use ouroboros::self_referencing;
use rusqlite::{params, Connection, Transaction};
use rusqlite::{params, Connection, Rows, Statement, Transaction};
use crate::{
Db, Error, Exporter, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter,
@ -114,16 +115,14 @@ impl IDb for SqliteDb {
fn iter<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> {
let tree = self.get_tree(tree)?;
let db = self.db.lock().unwrap();
let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
let mut stmt = db.prepare(&sql)?;
let res = stmt.query([])?;
unimplemented!();
DbValueIterator::new(self.db.lock().unwrap(), &sql, [])
}
fn iter_rev<'a>(&'a self, tree: usize) -> Result<ValueIter<'a>> {
let tree = self.get_tree(tree)?;
unimplemented!();
let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
DbValueIterator::new(self.db.lock().unwrap(), &sql, [])
}
fn range<'a, 'r>(
@ -263,3 +262,80 @@ impl<'a> ITx<'a> for SqliteTx<'a> {
}
}
// ----
struct DbValueIterator<'a> {
db: Option<MutexGuard<'a, Connection>>,
stmt: Option<Statement<'a>>,
iter: Option<Rows<'a>>,
_pin: PhantomPinned,
}
impl<'a> DbValueIterator<'a> {
fn new<P: rusqlite::Params>(
db: MutexGuard<'a, Connection>,
sql: &str,
args: P,
) -> Result<ValueIter<'a>> {
let res = DbValueIterator {
db: Some(db),
stmt: None,
iter: None,
_pin: PhantomPinned,
};
let mut boxed = Box::pin(res);
unsafe {
let db = NonNull::from(&boxed.db);
let stmt = db.as_ref().as_ref().unwrap().prepare(&sql)?;
let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt);
let mut stmt = NonNull::from(&boxed.stmt);
let iter = stmt.as_mut().as_mut().unwrap().query(args)?;
let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
Pin::get_unchecked_mut(mut_ref).iter = Some(iter);
}
Ok(Box::new(DbValueIteratorPin(boxed)))
}
}
struct DbValueIteratorPin<'a>(Pin<Box<DbValueIterator<'a>>>);
impl<'a> Iterator for DbValueIteratorPin<'a> {
type Item = Result<(Value<'a>, Value<'a>)>;
fn next(&mut self) -> Option<Self::Item> {
let next = unsafe {
let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut self.0);
Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next()
};
let row = match next {
Err(e) => return Some(Err(e.into())),
Ok(None) => {
// finished, drop everything
unsafe {
let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut self.0);
let t = Pin::get_unchecked_mut(mut_ref);
drop(t.iter.take());
drop(t.stmt.take());
drop(t.db.take());
}
return None;
}
Ok(Some(r)) => r,
};
let k = match row.get::<_, Vec<u8>>(0) {
Err(e) => return Some(Err(e.into())),
Ok(x) => x,
};
let v = match row.get::<_, Vec<u8>>(1) {
Err(e) => return Some(Err(e.into())),
Ok(y) => y,
};
Some(Ok((k.into(), v.into())))
}
}

View file

@ -66,18 +66,10 @@ impl Repair {
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
while let Some(item) = self
.garage
.version_table
.data
.store
.range((Bound::Excluded(pos), Bound::Unbounded))?
.next()
{
let (item_key, item_bytes) = item?;
pos = item_key.to_vec();
while let Some((item_key, item_bytes)) = self.get_next_version_after(&pos)? {
pos = item_key;
let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
if version.deleted.get() {
continue;
}
@ -113,22 +105,30 @@ impl Repair {
Ok(())
}
fn get_next_version_after(&self, pos: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, Error> {
match self
.garage
.version_table
.data
.store
.range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))?
.next()
{
None => Ok(None),
Some(item) => {
let (item_key, item_bytes) = item?;
Ok(Some((item_key.into_owned(), item_bytes.into_owned())))
}
}
}
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
while let Some(item) = self
.garage
.block_ref_table
.data
.store
.range((Bound::Excluded(pos), Bound::Unbounded))?
.next()
{
let (item_key, item_bytes) = item?;
while let Some((item_key, item_bytes)) = self.get_next_block_ref_after(&pos)? {
pos = item_key;
pos = item_key.to_vec();
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
if block_ref.deleted.get() {
continue;
}
@ -160,4 +160,21 @@ impl Repair {
}
Ok(())
}
fn get_next_block_ref_after(&self, pos: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>, Error> {
match self
.garage
.block_ref_table
.data
.store
.range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))?
.next()
{
None => Ok(None),
Some(item) => {
let (item_key, item_bytes) = item?;
Ok(Some((item_key.into_owned(), item_bytes.into_owned())))
}
}
}
}

View file

@ -25,11 +25,15 @@ impl Migrate {
.open_tree("bucket:table")
.map_err(GarageError::from)?;
let mut old_buckets = vec![];
for res in tree.iter().map_err(GarageError::from)? {
let (_k, v) = res.map_err(GarageError::from)?;
let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])
.map_err(GarageError::from)?;
old_buckets.push(bucket);
}
for bucket in old_buckets {
if let old_bucket::BucketState::Present(p) = bucket.state.get() {
self.migrate_buckets050_do_bucket(&bucket, p).await?;
}

View file

@ -89,33 +89,33 @@ where
async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
if let Some(x) = self.data.merkle_todo.iter().unwrap().next() {
// TODO unwrap to remove
match x {
Ok((key, valhash)) => {
if let Err(e) = self.update_item(&key[..], &valhash[..]) {
warn!(
"({}) Error while updating Merkle tree item: {}",
F::TABLE_NAME,
e
);
match self.updater_loop_iter() {
Ok(true) => (),
Ok(false) => {
select! {
_ = self.data.merkle_todo_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}
}
Err(e) => {
warn!(
"({}) Error while iterating on Merkle todo tree: {}",
"({}) Error while updating Merkle tree item: {}",
F::TABLE_NAME,
e
);
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
}
fn updater_loop_iter(&self) -> Result<bool, Error> {
if let Some(x) = self.data.merkle_todo.iter()?.next() {
let (key, valhash) = x?;
self.update_item(&key[..], &valhash[..])?;
Ok(true)
} else {
select! {
_ = self.data.merkle_todo_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}
}
Ok(false)
}
}