Abstract database behind generic interface and implement alternative drivers #322

Merged
lx merged 64 commits from db-abstraction into main 2022-06-08 08:01:56 +00:00
4 changed files with 70 additions and 40 deletions
Showing only changes of commit 1dabd98330 - Show all commits

View file

@ -1,3 +1,5 @@
use core::ops::Bound;
use std::convert::TryInto; use std::convert::TryInto;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
@ -218,19 +220,35 @@ impl BlockManager {
/// to fix any mismatch between the two. /// to fix any mismatch between the two.
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// 1. Repair blocks from RC table. // 1. Repair blocks from RC table.
// TODO don't do this like this let mut next_start: Option<Hash> = None;
let mut hashes = vec![]; loop {
for (i, entry) in self.rc.rc.iter()?.enumerate() { let mut batch_of_hashes = vec![];
let (hash, _) = entry?; let start_bound = match next_start.as_ref() {
let hash = Hash::try_from(&hash[..]).unwrap(); None => Bound::Unbounded,
hashes.push(hash); Some(x) => Bound::Excluded(x.as_slice()),
if i & 0xFF == 0 && *must_exit.borrow() { };
return Ok(()); for entry in self
.rc
.rc
.range::<&[u8], _>((start_bound, Bound::Unbounded))?
{
let (hash, _) = entry?;
let hash = Hash::try_from(&hash[..]).unwrap();
batch_of_hashes.push(hash);
if batch_of_hashes.len() >= 1000 {
break;
}
} }
} if batch_of_hashes.is_empty() {
for (i, hash) in hashes.into_iter().enumerate() { break;
self.put_to_resync(&hash, Duration::from_secs(0))?; }
if i & 0xFF == 0 && *must_exit.borrow() {
for hash in batch_of_hashes.into_iter() {
self.put_to_resync(&hash, Duration::from_secs(0))?;
next_start = Some(hash)
}
if *must_exit.borrow() {
return Ok(()); return Ok(());
} }
} }
@ -271,18 +289,18 @@ impl BlockManager {
} }
/// Get lenght of resync queue /// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize { pub fn resync_queue_len(&self) -> Result<usize, Error> {
self.resync_queue.len().unwrap() // TODO fix unwrap Ok(self.resync_queue.len()?)
} }
/// Get number of blocks that have an error /// Get number of blocks that have an error
pub fn resync_errors_len(&self) -> usize { pub fn resync_errors_len(&self) -> Result<usize, Error> {
self.resync_errors.len().unwrap() // TODO fix unwrap Ok(self.resync_errors.len()?)
} }
/// Get number of items in the refcount table /// Get number of items in the refcount table
pub fn rc_len(&self) -> usize { pub fn rc_len(&self) -> Result<usize, Error> {
self.rc.rc.len().unwrap() // TODO fix unwrap Ok(self.rc.rc.len()?)
} }
//// ----- Managing the reference counter ---- //// ----- Managing the reference counter ----

View file

@ -660,11 +660,11 @@ impl AdminRpcHandler {
} }
Ok(AdminRpc::Ok(ret)) Ok(AdminRpc::Ok(ret))
} else { } else {
Ok(AdminRpc::Ok(self.gather_stats_local(opt))) Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
} }
} }
fn gather_stats_local(&self, opt: StatsOpt) -> String { fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
let mut ret = String::new(); let mut ret = String::new();
writeln!( writeln!(
&mut ret, &mut ret,
@ -689,59 +689,71 @@ impl AdminRpcHandler {
writeln!(&mut ret, " {:?} {}", n, c).unwrap(); writeln!(&mut ret, " {:?} {}", n, c).unwrap();
} }
self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt); self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?;
self.gather_table_stats(&mut ret, &self.garage.key_table, &opt); self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?;
self.gather_table_stats(&mut ret, &self.garage.object_table, &opt); self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?;
self.gather_table_stats(&mut ret, &self.garage.version_table, &opt); self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?;
self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt); self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?;
writeln!(&mut ret, "\nBlock manager stats:").unwrap(); writeln!(&mut ret, "\nBlock manager stats:").unwrap();
if opt.detailed { if opt.detailed {
writeln!( writeln!(
&mut ret, &mut ret,
" number of RC entries (~= number of blocks): {}", " number of RC entries (~= number of blocks): {}",
self.garage.block_manager.rc_len() self.garage.block_manager.rc_len()?
) )
.unwrap(); .unwrap();
} }
writeln!( writeln!(
&mut ret, &mut ret,
" resync queue length: {}", " resync queue length: {}",
self.garage.block_manager.resync_queue_len() self.garage.block_manager.resync_queue_len()?
) )
.unwrap(); .unwrap();
writeln!( writeln!(
&mut ret, &mut ret,
" blocks with resync errors: {}", " blocks with resync errors: {}",
self.garage.block_manager.resync_errors_len() self.garage.block_manager.resync_errors_len()?
) )
.unwrap(); .unwrap();
ret Ok(ret)
} }
fn gather_table_stats<F, R>(&self, to: &mut String, t: &Arc<Table<F, R>>, opt: &StatsOpt) fn gather_table_stats<F, R>(
&self,
to: &mut String,
t: &Arc<Table<F, R>>,
opt: &StatsOpt,
) -> Result<(), Error>
where where
F: TableSchema + 'static, F: TableSchema + 'static,
R: TableReplication + 'static, R: TableReplication + 'static,
{ {
writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap(); writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap();
if opt.detailed { if opt.detailed {
writeln!(to, " number of items: {}", t.data.store.len().unwrap()).unwrap(); // TODO fix len unwrap writeln!(
to,
" number of items: {}",
t.data.store.len().map_err(GarageError::from)?
)
.unwrap();
writeln!( writeln!(
to, to,
" Merkle tree size: {}", " Merkle tree size: {}",
t.merkle_updater.merkle_tree_len() t.merkle_updater.merkle_tree_len()?
) )
.unwrap(); .unwrap();
} }
writeln!( writeln!(
to, to,
" Merkle updater todo queue length: {}", " Merkle updater todo queue length: {}",
t.merkle_updater.todo_len() t.merkle_updater.todo_len()?
) )
.unwrap(); .unwrap();
writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()?).unwrap();
Ok(())
} }
} }

View file

@ -318,7 +318,7 @@ where
} }
} }
pub fn gc_todo_len(&self) -> usize { pub fn gc_todo_len(&self) -> Result<usize, Error> {
self.gc_todo.len().unwrap() // TODO fix unwrap Ok(self.gc_todo.len()?)
} }
} }

View file

@ -316,12 +316,12 @@ where
MerkleNode::decode_opt(&ent) MerkleNode::decode_opt(&ent)
} }
pub fn merkle_tree_len(&self) -> usize { pub fn merkle_tree_len(&self) -> Result<usize, Error> {
self.data.merkle_tree.len().unwrap() // TODO fix unwrap Ok(self.data.merkle_tree.len()?)
} }
pub fn todo_len(&self) -> usize { pub fn todo_len(&self) -> Result<usize, Error> {
self.data.merkle_todo.len().unwrap() // TODO fix unwrap Ok(self.data.merkle_todo.len()?)
} }
} }