Abstract database behind generic interface and implement alternative drivers #322

Merged
lx merged 64 commits from db-abstraction into main 2022-06-08 08:01:56 +00:00
5 changed files with 42 additions and 78 deletions
Showing only changes of commit 4bf706b170 - Show all commits

View file

@ -56,8 +56,6 @@ const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
// to delete the block locally.
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
type OptKVPair = Option<(Vec<u8>, Vec<u8>)>;
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpc {
@ -549,7 +547,12 @@ impl BlockManager {
// - Ok(false) -> no block was processed, but we are ready for the next iteration
// - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_get_next()? {
let next = match self.resync_queue.first()? {
Some((k, v)) => Some((k.into_vec(), v.into_vec())),
None => None,
};
if let Some((time_bytes, hash_bytes)) = next {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
@ -642,16 +645,6 @@ impl BlockManager {
}
}
fn resync_get_next(&self) -> Result<OptKVPair, db::Error> {
match self.resync_queue.iter()?.next() {
None => Ok(None),
Some(v) => {
let (time_bytes, hash_bytes) = v?;
Ok(Some((time_bytes.into_vec(), hash_bytes.into_vec())))
}
}
}
async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {
let BlockStatus { exists, needed } = self
.mutation_lock

View file

@ -226,6 +226,15 @@ impl Tree {
self.0.len(self.1)
}
pub fn first(&self) -> Result<Option<(Value<'_>, Value<'_>)>> {
self.iter()?.next().transpose()
}
pub fn get_gt<T: AsRef<[u8]>>(&self, from: T) -> Result<Option<(Value<'_>, Value<'_>)>> {
self.range((Bound::Excluded(from), Bound::Unbounded))?
.next()
.transpose()
}
pub fn insert<T: AsRef<[u8]>, U: AsRef<[u8]>>(&self, key: T, value: U) -> Result<()> {
self.0.insert(self.1, key.as_ref(), value.as_ref())
}

View file

@ -107,9 +107,9 @@ impl IDb for SqliteDb {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
let tree = self.get_tree(tree)?;
trace!("get: lock db");
trace!("get {}: lock db", tree);
let db = self.db.lock().unwrap();
trace!("get: lock acquired");
trace!("get {}: lock acquired", tree);
let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
let mut res_iter = stmt.query([key])?;
@ -122,9 +122,9 @@ impl IDb for SqliteDb {
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?;
trace!("remove: lock db");
trace!("remove {}: lock db", tree);
let db = self.db.lock().unwrap();
trace!("remove: lock acquired");
trace!("remove {}: lock acquired", tree);
let res = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
Ok(res > 0)
@ -133,9 +133,9 @@ impl IDb for SqliteDb {
fn len(&self, tree: usize) -> Result<usize> {
let tree = self.get_tree(tree)?;
trace!("len: lock db");
trace!("len {}: lock db", tree);
let db = self.db.lock().unwrap();
trace!("len: lock acquired");
trace!("len {}: lock acquired", tree);
let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
let mut res_iter = stmt.query([])?;
@ -148,9 +148,9 @@ impl IDb for SqliteDb {
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?;
trace!("insert: lock db");
trace!("insert {}: lock db", tree);
let db = self.db.lock().unwrap();
trace!("insert: lock acquired");
trace!("insert {}: lock acquired", tree);
db.execute(
&format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),

View file

@ -1,4 +1,3 @@
use core::ops::Bound;
use std::sync::Arc;
use tokio::sync::watch;
@ -16,8 +15,6 @@ pub struct Repair {
pub garage: Arc<Garage>,
}
type OptKVPair = Option<(Vec<u8>, Vec<u8>)>;
impl Repair {
pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
@ -68,8 +65,15 @@ impl Repair {
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
while let Some((item_key, item_bytes)) = self.get_next_version_after(&pos)? {
pos = item_key;
while *must_exit.borrow() {
let item_bytes = {
let (k, v) = match self.garage.version_table.data.store.get_gt(pos)? {
Some(pair) => pair,
None => break,
};
pos = k.into_vec();
v.into_vec()
};
let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
if version.deleted.get() {
@ -99,36 +103,22 @@ impl Repair {
))
.await?;
}
if *must_exit.borrow() {
break;
}
}
Ok(())
}
fn get_next_version_after(&self, pos: &[u8]) -> Result<OptKVPair, Error> {
match self
.garage
.version_table
.data
.store
.range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))?
.next()
{
None => Ok(None),
Some(item) => {
let (item_key, item_bytes) = item?;
Ok(Some((item_key.into_vec(), item_bytes.into_vec())))
}
}
}
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
while let Some((item_key, item_bytes)) = self.get_next_block_ref_after(&pos)? {
pos = item_key;
while *must_exit.borrow() {
let item_bytes = {
let (k, v) = match self.garage.block_ref_table.data.store.get_gt(pos)? {
Some(pair) => pair,
None => break,
};
pos = k.into_vec();
v.into_vec()
};
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
if block_ref.deleted.get() {
@ -155,29 +145,7 @@ impl Repair {
})
.await?;
}
if *must_exit.borrow() {
break;
}
}
Ok(())
}
#[allow(clippy::type_complexity)]
fn get_next_block_ref_after(&self, pos: &[u8]) -> Result<OptKVPair, Error> {
match self
.garage
.block_ref_table
.data
.store
.range::<&[u8], _>((Bound::Excluded(pos), Bound::Unbounded))?
.next()
{
None => Ok(None),
Some(item) => {
let (item_key, item_bytes) = item?;
Ok(Some((item_key.into_vec(), item_bytes.into_vec())))
}
}
}
}

View file

@ -110,13 +110,7 @@ where
}
fn updater_loop_iter(&self) -> Result<bool, Error> {
// TODO undo this iter hack
let mut iter = self.data.merkle_todo.iter()?;
if let Some(x) = iter.next() {
let (key, valhash) = x?;
let key = key.to_vec();
let valhash = valhash.to_vec();
drop(iter);
if let Some((key, valhash)) = self.data.merkle_todo.first()? {
self.update_item(&key, &valhash)?;
Ok(true)
} else {