block refcount repair #782

Merged
lx merged 3 commits from block-ref-repair into next-0.10 2024-03-19 15:59:20 +00:00
4 changed files with 18 additions and 18 deletions
Showing only changes of commit 3165ab926c - Show all commits

View file

@ -156,7 +156,7 @@ impl BlockManager {
let metrics = BlockManagerMetrics::new( let metrics = BlockManagerMetrics::new(
config.compression_level, config.compression_level,
rc.rc.clone(), rc.rc_table.clone(),
resync.queue.clone(), resync.queue.clone(),
resync.errors.clone(), resync.errors.clone(),
); );
@ -387,7 +387,7 @@ impl BlockManager {
/// Get number of items in the refcount table /// Get number of items in the refcount table
pub fn rc_len(&self) -> Result<usize, Error> { pub fn rc_len(&self) -> Result<usize, Error> {
Ok(self.rc.rc.len()?) Ok(self.rc.rc_table.len()?)
} }
/// Send command to start/stop/manager scrub worker /// Send command to start/stop/manager scrub worker

View file

@ -14,14 +14,14 @@ pub type CalculateRefcount =
Box<dyn Fn(&db::Transaction, &Hash) -> db::TxResult<usize, Error> + Send + Sync>; Box<dyn Fn(&db::Transaction, &Hash) -> db::TxResult<usize, Error> + Send + Sync>;
pub struct BlockRc { pub struct BlockRc {
pub rc: db::Tree, pub rc_table: db::Tree,
pub(crate) recalc_rc: ArcSwapOption<Vec<CalculateRefcount>>, pub(crate) recalc_rc: ArcSwapOption<Vec<CalculateRefcount>>,
} }
impl BlockRc { impl BlockRc {
pub(crate) fn new(rc: db::Tree) -> Self { pub(crate) fn new(rc: db::Tree) -> Self {
Self { Self {
rc, rc_table: rc,
recalc_rc: ArcSwapOption::new(None), recalc_rc: ArcSwapOption::new(None),
} }
} }
@ -33,9 +33,9 @@ impl BlockRc {
tx: &mut db::Transaction, tx: &mut db::Transaction,
hash: &Hash, hash: &Hash,
) -> db::TxOpResult<bool> { ) -> db::TxOpResult<bool> {
let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?); let old_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
match old_rc.increment().serialize() { match old_rc.increment().serialize() {
Some(x) => tx.insert(&self.rc, hash, x)?, Some(x) => tx.insert(&self.rc_table, hash, x)?,
None => unreachable!(), None => unreachable!(),
}; };
Ok(old_rc.is_zero()) Ok(old_rc.is_zero())
@ -48,28 +48,28 @@ impl BlockRc {
tx: &mut db::Transaction, tx: &mut db::Transaction,
hash: &Hash, hash: &Hash,
) -> db::TxOpResult<bool> { ) -> db::TxOpResult<bool> {
let new_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?).decrement(); let new_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?).decrement();
match new_rc.serialize() { match new_rc.serialize() {
Some(x) => tx.insert(&self.rc, hash, x)?, Some(x) => tx.insert(&self.rc_table, hash, x)?,
None => tx.remove(&self.rc, hash)?, None => tx.remove(&self.rc_table, hash)?,
}; };
Ok(matches!(new_rc, RcEntry::Deletable { .. })) Ok(matches!(new_rc, RcEntry::Deletable { .. }))
} }
/// Read a block's reference count /// Read a block's reference count
pub(crate) fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> { pub(crate) fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> {
Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?)) Ok(RcEntry::parse_opt(self.rc_table.get(hash.as_ref())?))
} }
/// Delete an entry in the RC table if it is deletable and the /// Delete an entry in the RC table if it is deletable and the
/// deletion time has passed /// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec(); let now = now_msec();
self.rc.db().transaction(|tx| { self.rc_table.db().transaction(|tx| {
let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?); let rcval = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
match rcval { match rcval {
RcEntry::Deletable { at_time } if now > at_time => { RcEntry::Deletable { at_time } if now > at_time => {
tx.remove(&self.rc, hash)?; tx.remove(&self.rc_table, hash)?;
} }
_ => (), _ => (),
}; };
@ -84,14 +84,14 @@ impl BlockRc {
if let Some(recalc_fns) = self.recalc_rc.load().as_ref() { if let Some(recalc_fns) = self.recalc_rc.load().as_ref() {
trace!("Repair block RC for {:?}", hash); trace!("Repair block RC for {:?}", hash);
let res = self let res = self
.rc .rc_table
.db() .db()
.transaction(|tx| { .transaction(|tx| {
let mut cnt = 0; let mut cnt = 0;
for f in recalc_fns.iter() { for f in recalc_fns.iter() {
cnt += f(&tx, hash)?; cnt += f(&tx, hash)?;
} }
let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?); let old_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
trace!( trace!(
"Block RC for {:?}: stored={}, calculated={}", "Block RC for {:?}: stored={}, calculated={}",
hash, hash,
@ -112,7 +112,7 @@ impl BlockRc {
at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64, at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64,
} }
}; };
tx.insert(&self.rc, hash, new_rc.serialize().unwrap())?; tx.insert(&self.rc_table, hash, new_rc.serialize().unwrap())?;
Ok((cnt, true)) Ok((cnt, true))
} else { } else {
Ok((cnt, false)) Ok((cnt, false))

View file

@ -107,7 +107,7 @@ impl Worker for RepairWorker {
for entry in self for entry in self
.manager .manager
.rc .rc
.rc .rc_table
.range::<&[u8], _>((start_bound, Bound::Unbounded))? .range::<&[u8], _>((start_bound, Bound::Unbounded))?
{ {
let (hash, _) = entry?; let (hash, _) = entry?;

View file

@ -337,7 +337,7 @@ impl Worker for BlockRcRepair {
let next1 = self let next1 = self
.block_manager .block_manager
.rc .rc
.rc .rc_table
.range(self.cursor.as_slice()..)? .range(self.cursor.as_slice()..)?
.next() .next()
.transpose()? .transpose()?