diff --git a/src/block/manager.rs b/src/block/manager.rs index b7dcaf8a..ea984646 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,3 +1,5 @@ +use core::ops::Bound; + use std::convert::TryInto; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -218,19 +220,35 @@ impl BlockManager { /// to fix any mismatch between the two. pub async fn repair_data_store(&self, must_exit: &watch::Receiver) -> Result<(), Error> { // 1. Repair blocks from RC table. - // TODO don't do this like this - let mut hashes = vec![]; - for (i, entry) in self.rc.rc.iter()?.enumerate() { - let (hash, _) = entry?; - let hash = Hash::try_from(&hash[..]).unwrap(); - hashes.push(hash); - if i & 0xFF == 0 && *must_exit.borrow() { - return Ok(()); + let mut next_start: Option = None; + loop { + let mut batch_of_hashes = vec![]; + let start_bound = match next_start.as_ref() { + None => Bound::Unbounded, + Some(x) => Bound::Excluded(x.as_slice()), + }; + for entry in self + .rc + .rc + .range::<&[u8], _>((start_bound, Bound::Unbounded))? + { + let (hash, _) = entry?; + let hash = Hash::try_from(&hash[..]).unwrap(); + batch_of_hashes.push(hash); + if batch_of_hashes.len() >= 1000 { + break; + } } - } - for (i, hash) in hashes.into_iter().enumerate() { - self.put_to_resync(&hash, Duration::from_secs(0))?; - if i & 0xFF == 0 && *must_exit.borrow() { + if batch_of_hashes.is_empty() { + break; + } + + for hash in batch_of_hashes.into_iter() { + self.put_to_resync(&hash, Duration::from_secs(0))?; + next_start = Some(hash) + } + + if *must_exit.borrow() { return Ok(()); } } @@ -271,18 +289,18 @@ impl BlockManager { } /// Get lenght of resync queue - pub fn resync_queue_len(&self) -> usize { - self.resync_queue.len().unwrap() // TODO fix unwrap + pub fn resync_queue_len(&self) -> Result { + Ok(self.resync_queue.len()?) } /// Get number of blocks that have an error - pub fn resync_errors_len(&self) -> usize { - self.resync_errors.len().unwrap() // TODO fix unwrap + pub fn resync_errors_len(&self) -> Result { + Ok(self.resync_errors.len()?) } /// Get number of items in the refcount table - pub fn rc_len(&self) -> usize { - self.rc.rc.len().unwrap() // TODO fix unwrap + pub fn rc_len(&self) -> Result { + Ok(self.rc.rc.len()?) } //// ----- Managing the reference counter ---- diff --git a/src/garage/admin.rs b/src/garage/admin.rs index cce88b35..3af8b046 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -660,11 +660,11 @@ impl AdminRpcHandler { } Ok(AdminRpc::Ok(ret)) } else { - Ok(AdminRpc::Ok(self.gather_stats_local(opt))) + Ok(AdminRpc::Ok(self.gather_stats_local(opt)?)) } } - fn gather_stats_local(&self, opt: StatsOpt) -> String { + fn gather_stats_local(&self, opt: StatsOpt) -> Result { let mut ret = String::new(); writeln!( &mut ret, @@ -689,59 +689,71 @@ impl AdminRpcHandler { writeln!(&mut ret, " {:?} {}", n, c).unwrap(); } - self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt); - self.gather_table_stats(&mut ret, &self.garage.key_table, &opt); - self.gather_table_stats(&mut ret, &self.garage.object_table, &opt); - self.gather_table_stats(&mut ret, &self.garage.version_table, &opt); - self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt); + self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?; writeln!(&mut ret, "\nBlock manager stats:").unwrap(); if opt.detailed { writeln!( &mut ret, " number of RC entries (~= number of blocks): {}", - self.garage.block_manager.rc_len() + self.garage.block_manager.rc_len()? ) .unwrap(); } writeln!( &mut ret, " resync queue length: {}", - self.garage.block_manager.resync_queue_len() + self.garage.block_manager.resync_queue_len()? ) .unwrap(); writeln!( &mut ret, " blocks with resync errors: {}", - self.garage.block_manager.resync_errors_len() + self.garage.block_manager.resync_errors_len()? ) .unwrap(); - ret + Ok(ret) } - fn gather_table_stats(&self, to: &mut String, t: &Arc>, opt: &StatsOpt) + fn gather_table_stats( + &self, + to: &mut String, + t: &Arc>, + opt: &StatsOpt, + ) -> Result<(), Error> where F: TableSchema + 'static, R: TableReplication + 'static, { writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap(); if opt.detailed { - writeln!(to, " number of items: {}", t.data.store.len().unwrap()).unwrap(); // TODO fix len unwrap + writeln!( + to, + " number of items: {}", + t.data.store.len().map_err(GarageError::from)? + ) + .unwrap(); writeln!( to, " Merkle tree size: {}", - t.merkle_updater.merkle_tree_len() + t.merkle_updater.merkle_tree_len()? ) .unwrap(); } writeln!( to, " Merkle updater todo queue length: {}", - t.merkle_updater.todo_len() + t.merkle_updater.todo_len()? ) .unwrap(); - writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); + writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()?).unwrap(); + + Ok(()) } } diff --git a/src/table/data.rs b/src/table/data.rs index e688168f..839dae94 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -318,7 +318,7 @@ where } } - pub fn gc_todo_len(&self) -> usize { - self.gc_todo.len().unwrap() // TODO fix unwrap + pub fn gc_todo_len(&self) -> Result { + Ok(self.gc_todo.len()?) } } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index e7f2442e..7685b193 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -316,12 +316,12 @@ where MerkleNode::decode_opt(&ent) } - pub fn merkle_tree_len(&self) -> usize { - self.data.merkle_tree.len().unwrap() // TODO fix unwrap + pub fn merkle_tree_len(&self) -> Result { + Ok(self.data.merkle_tree.len()?) } - pub fn todo_len(&self) -> usize { - self.data.merkle_todo.len().unwrap() // TODO fix unwrap + pub fn todo_len(&self) -> Result { + Ok(self.data.merkle_todo.len()?) } }