diff --git a/src/block/lib.rs b/src/block/lib.rs index 6c4711ef0..944f0d834 100644 --- a/src/block/lib.rs +++ b/src/block/lib.rs @@ -11,3 +11,4 @@ mod metrics; mod rc; pub use block::zstd_encode; +pub use rc::CalculateRefcount; diff --git a/src/block/manager.rs b/src/block/manager.rs index eeacf8b9b..628ffc713 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -88,7 +88,7 @@ pub struct BlockManager { mutation_lock: Vec>, - pub(crate) rc: BlockRc, + pub rc: BlockRc, pub resync: BlockResyncManager, pub(crate) system: Arc, @@ -229,6 +229,12 @@ impl BlockManager { } } + /// Initialization: set how block references are recalculated + /// for repair operations + pub fn set_recalc_rc(&self, recalc: Vec) { + self.rc.recalc_rc.store(Some(Arc::new(recalc))); + } + /// Ask nodes that might have a (possibly compressed) block for it /// Return it as a stream with a header async fn rpc_get_raw_block_streaming( @@ -316,9 +322,9 @@ impl BlockManager { }; } - let msg = format!("Get block {:?}: no node returned a valid block", hash); - debug!("{}", msg); - Err(Error::Message(msg)) + let err = Error::MissingBlock(*hash); + debug!("{}", err); + Err(err) } // ---- Public interface ---- diff --git a/src/block/rc.rs b/src/block/rc.rs index b6afb277a..bf5aeced6 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -1,5 +1,7 @@ use std::convert::TryInto; +use arc_swap::ArcSwapOption; + use garage_db as db; use garage_util::data::*; @@ -8,13 +10,20 @@ use garage_util::time::*; use crate::manager::BLOCK_GC_DELAY; +pub type CalculateRefcount = + Box db::TxResult + Send + Sync>; + pub struct BlockRc { - pub(crate) rc: db::Tree, + pub rc: db::Tree, + pub(crate) recalc_rc: ArcSwapOption>, } impl BlockRc { pub(crate) fn new(rc: db::Tree) -> Self { - Self { rc } + Self { + rc, + recalc_rc: ArcSwapOption::new(None), + } } /// Increment the reference counter associated to a hash. @@ -68,6 +77,58 @@ impl BlockRc { })?; Ok(()) } + + /// Recalculate the reference counter of a block + /// to fix potential inconsistencies + pub fn recalculate_rc(&self, hash: &Hash) -> Result<(usize, bool), Error> { + if let Some(recalc_fns) = self.recalc_rc.load().as_ref() { + trace!("Repair block RC for {:?}", hash); + let res = self + .rc + .db() + .transaction(|tx| { + let mut cnt = 0; + for f in recalc_fns.iter() { + cnt += f(&tx, hash)?; + } + let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?); + trace!( + "Block RC for {:?}: stored={}, calculated={}", + hash, + old_rc.as_u64(), + cnt + ); + if cnt as u64 != old_rc.as_u64() { + warn!( + "Fixing inconsistent block RC for {:?}: was {}, should be {}", + hash, + old_rc.as_u64(), + cnt + ); + let new_rc = if cnt > 0 { + RcEntry::Present { count: cnt as u64 } + } else { + RcEntry::Deletable { + at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64, + } + }; + tx.insert(&self.rc, hash, new_rc.serialize().unwrap())?; + Ok((cnt, true)) + } else { + Ok((cnt, false)) + } + }) + .map_err(Error::from); + if let Err(e) = &res { + error!("Failed to fix RC for block {:?}: {}", hash, e); + } + res + } else { + Err(Error::Message( + "Block RC recalculation is not available at this point".into(), + )) + } + } } /// Describes the state of the reference counter for a block diff --git a/src/block/resync.rs b/src/block/resync.rs index 48c2cef15..b41082133 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -367,6 +367,13 @@ impl BlockResyncManager { } if exists && rc.is_deletable() { + if manager.rc.recalculate_rc(hash)?.0 > 0 { + return Err(Error::Message(format!( + "Refcount for block {:?} was inconsistent, retrying later", + hash + ))); + } + info!("Resync block {:?}: offloading and deleting", hash); let existing_path = existing_path.unwrap(); @@ -453,7 +460,15 @@ impl BlockResyncManager { hash ); - let block_data = manager.rpc_get_raw_block(hash, None).await?; + let block_data = manager.rpc_get_raw_block(hash, None).await; + if matches!(block_data, Err(Error::MissingBlock(_))) { + warn!( + "Could not fetch needed block {:?}, no node returned valid data. Checking that refcount is correct.", + hash + ); + manager.rc.recalculate_rc(hash)?; + } + let block_data = block_data?; manager.metrics.resync_recv_counter.add(1); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 1f572a9ad..8380b5e22 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -473,8 +473,11 @@ pub enum RepairWhat { #[structopt(name = "mpu", version = garage_version())] MultipartUploads, /// Repropagate version deletions to the block ref table - #[structopt(name = "block_refs", version = garage_version())] + #[structopt(name = "block-refs", version = garage_version())] BlockRefs, + /// Recalculate block reference counters + #[structopt(name = "block-rc", version = garage_version())] + BlockRc, /// Verify integrity of all blocks on disc #[structopt(name = "scrub", version = garage_version())] Scrub { diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 9e4de873c..ecccdf6d8 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -4,6 +4,7 @@ use std::time::Duration; use async_trait::async_trait; use tokio::sync::watch; +use garage_block::manager::BlockManager; use garage_block::repair::ScrubWorkerCommand; use garage_model::garage::Garage; @@ -16,11 +17,14 @@ use garage_table::replication::*; use garage_table::*; use garage_util::background::*; +use garage_util::data::*; use garage_util::error::Error; use garage_util::migrate::Migrate; use crate::*; +const RC_REPAIR_ITER_COUNT: usize = 64; + pub async fn launch_online_repair( garage: &Arc, bg: &BackgroundRunner, @@ -47,6 +51,13 @@ pub async fn launch_online_repair( info!("Repairing the block refs table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); } + RepairWhat::BlockRc => { + info!("Repairing the block reference counters"); + bg.spawn_worker(BlockRcRepair::new( + garage.block_manager.clone(), + garage.block_ref_table.clone(), + )); + } RepairWhat::Blocks => { info!("Repairing the stored blocks"); bg.spawn_worker(garage_block::repair::RepairWorker::new( @@ -282,3 +293,98 @@ impl TableRepair for RepairMpu { Ok(false) } } + +// ===== block reference counter repair ===== + +pub struct BlockRcRepair { + block_manager: Arc, + block_ref_table: Arc>, + cursor: Hash, + counter: u64, + repairs: u64, +} + +impl BlockRcRepair { + fn new( + block_manager: Arc, + block_ref_table: Arc>, + ) -> Self { + Self { + block_manager, + block_ref_table, + cursor: [0u8; 32].into(), + counter: 0, + repairs: 0, + } + } +} + +#[async_trait] +impl Worker for BlockRcRepair { + fn name(&self) -> String { + format!("Block refcount repair worker") + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{} ({})", self.counter, self.repairs)), + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + for _i in 0..RC_REPAIR_ITER_COUNT { + let next1 = self + .block_manager + .rc + .rc + .range(self.cursor.as_slice()..)? + .next() + .transpose()? + .map(|(k, _)| Hash::try_from(k.as_slice()).unwrap()); + let next2 = self + .block_ref_table + .data + .store + .range(self.cursor.as_slice()..)? + .next() + .transpose()? + .map(|(k, _)| Hash::try_from(&k[..32]).unwrap()); + let next = match (next1, next2) { + (Some(k1), Some(k2)) => std::cmp::min(k1, k2), + (Some(k), None) | (None, Some(k)) => k, + (None, None) => { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + }; + + if self.block_manager.rc.recalculate_rc(&next)?.1 { + self.repairs += 1; + } + self.counter += 1; + if let Some(next_incr) = next.increment() { + self.cursor = next_incr; + } else { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + } + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} diff --git a/src/model/garage.rs b/src/model/garage.rs index 4405d22d3..273690db9 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -247,6 +247,14 @@ impl Garage { #[cfg(feature = "k2v")] let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); + // ---- setup block refcount recalculation ---- + // this function can be used to fix inconsistencies in the RC table + block_manager.set_recalc_rc(vec![ + block_ref_recount_fn(&block_ref_table), + // other functions could be added here if we had other tables + // that hold references to data blocks + ]); + // -- done -- Ok(Arc::new(Self { config, diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index 7b023d87b..57eb7b165 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -3,8 +3,12 @@ use std::sync::Arc; use garage_db as db; use garage_util::data::*; +use garage_util::error::*; +use garage_util::migrate::Migrate; +use garage_block::CalculateRefcount; use garage_table::crdt::Crdt; +use garage_table::replication::TableShardedReplication; use garage_table::*; use garage_block::manager::*; @@ -84,3 +88,38 @@ impl TableSchema for BlockRefTable { filter.apply(entry.deleted.get()) } } + +pub fn block_ref_recount_fn( + block_ref_table: &Arc>, +) -> CalculateRefcount { + let table = Arc::downgrade(block_ref_table); + Box::new(move |tx: &db::Transaction, block: &Hash| { + let table = table + .upgrade() + .ok_or_message("cannot upgrade weak ptr to block_ref_table") + .map_err(db::TxError::Abort)?; + Ok(calculate_refcount(&table, tx, block)?) + }) +} + +fn calculate_refcount( + block_ref_table: &Table, + tx: &db::Transaction, + block: &Hash, +) -> db::TxResult { + let mut result = 0; + for entry in tx.range(&block_ref_table.data.store, block.as_slice()..)? { + let (key, value) = entry?; + if &key[..32] != block.as_slice() { + break; + } + let value = BlockRef::decode(&value) + .ok_or_message("could not decode block_ref") + .map_err(db::TxError::Abort)?; + assert_eq!(value.block, *block); + if !value.deleted.get() { + result += 1; + } + } + Ok(result) +} diff --git a/src/util/data.rs b/src/util/data.rs index 2579fd1b3..1fe7dfe09 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -83,6 +83,19 @@ impl FixedBytes32 { ret.copy_from_slice(by); Some(Self(ret)) } + /// Return the next hash + pub fn increment(&self) -> Option { + let mut ret = *self; + for byte in ret.0.iter_mut().rev() { + if *byte == u8::MAX { + *byte = 0; + } else { + *byte = *byte + 1; + return Some(ret); + } + } + return None; + } } impl From for FixedBytes32 { @@ -140,3 +153,25 @@ pub fn fasthash(data: &[u8]) -> FastHash { pub fn gen_uuid() -> Uuid { rand::thread_rng().gen::<[u8; 32]>().into() } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_increment() { + let zero: FixedBytes32 = [0u8; 32].into(); + let mut one: FixedBytes32 = [0u8; 32].into(); + one.0[31] = 1; + let max: FixedBytes32 = [0xFFu8; 32].into(); + assert_eq!(zero.increment(), Some(one)); + assert_eq!(max.increment(), None); + + let mut test: FixedBytes32 = [0u8; 32].into(); + let i = 0x198DF97209F8FFFFu64; + test.0[24..32].copy_from_slice(&u64::to_be_bytes(i)); + let mut test2: FixedBytes32 = [0u8; 32].into(); + test2.0[24..32].copy_from_slice(&u64::to_be_bytes(i + 1)); + assert_eq!(test.increment(), Some(test2)); + } +} diff --git a/src/util/error.rs b/src/util/error.rs index da9eda103..75fd3f9c2 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -70,6 +70,9 @@ pub enum Error { #[error(display = "Corrupt data: does not match hash {:?}", _0)] CorruptData(Hash), + #[error(display = "Missing block {:?}: no node returned a valid block", _0)] + MissingBlock(Hash), + #[error(display = "{}", _0)] Message(String), }