Make block rc code more understandable

This commit is contained in:
Alex 2021-03-15 18:27:26 +01:00
parent 3bf2df622a
commit 0290afe1f8
3 changed files with 40 additions and 34 deletions

View file

@ -423,6 +423,12 @@ impl AdminRpcHandler {
self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?;
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
writeln!(
&mut ret,
" number of blocks: {}",
self.garage.block_manager.rc.len()
)
.unwrap();
writeln!(
&mut ret,
" resync queue length: {}",
@ -451,6 +457,18 @@ impl AdminRpcHandler {
t.data.merkle_updater.todo.len()
)
.unwrap();
writeln!(
to,
" Merkle tree size: {}",
t.data.merkle_updater.merkle_tree.len()
)
.unwrap();
writeln!(
to,
" GC todo queue length: {}",
t.data.gc_todo.len()
)
.unwrap();
Ok(())
}
}

View file

@ -77,7 +77,6 @@ impl BlockManager {
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
rc.set_merge_operator(rc_merge);
let resync_queue = db
.open_tree("block_local_resync_queue")
@ -194,7 +193,7 @@ impl BlockManager {
let needed = self
.rc
.get(hash.as_ref())?
.map(|x| u64_from_bytes(x.as_ref()) > 0)
.map(|x| u64_from_be_bytes(x) > 0)
.unwrap_or(false);
if needed {
let path = self.block_path(hash);
@ -218,17 +217,27 @@ impl BlockManager {
}
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
let old_rc = self.rc.get(&hash)?;
self.rc.merge(&hash, vec![1])?;
if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
let old_rc = self.rc.fetch_and_update(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
Some(u64::to_be_bytes(old_v + 1).to_vec())
})?;
let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0);
if old_rc == 0 {
self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?;
}
Ok(())
}
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.merge(&hash, vec![0])?;
if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
let new_rc = self.rc.update_and_fetch(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
if old_v > 1 {
Some(u64::to_be_bytes(old_v - 1).to_vec())
} else {
None
}
})?;
if new_rc.is_none() {
self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?;
}
Ok(())
@ -251,7 +260,7 @@ impl BlockManager {
let mut n_failures = 0usize;
while !*must_exit.borrow() {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
let time_msec = u64_from_bytes(&time_bytes[0..8]);
let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
let now = now_msec();
if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
@ -295,7 +304,7 @@ impl BlockManager {
let needed = self
.rc
.get(hash.as_ref())?
.map(|x| u64_from_bytes(x.as_ref()) > 0)
.map(|x| u64_from_be_bytes(x) > 0)
.unwrap_or(false);
if exists != needed {
@ -487,30 +496,9 @@ impl BlockManager {
}
}
fn u64_from_bytes(bytes: &[u8]) -> u64 {
assert!(bytes.len() == 8);
fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 {
assert!(bytes.as_ref().len() == 8);
let mut x8 = [0u8; 8];
x8.copy_from_slice(bytes);
x8.copy_from_slice(bytes.as_ref());
u64::from_be_bytes(x8)
}
fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
let old = old.map(u64_from_bytes).unwrap_or(0);
assert!(new.len() == 1);
let new = match new[0] {
0 => {
if old > 0 {
old - 1
} else {
0
}
}
1 => old + 1,
_ => unreachable!(),
};
if new == 0 {
None
} else {
Some(u64::to_be_bytes(new).to_vec())
}
}

View file

@ -46,7 +46,7 @@ pub struct MerkleUpdater {
// Content of the merkle tree: items where
// - key = .bytes() for MerkleNodeKey
// - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found
pub(crate) merkle_tree: sled::Tree,
pub merkle_tree: sled::Tree,
empty_node_hash: Hash,
}