Merge pull request 'block refcount repair' (#782) from block-ref-repair into next-0.10
All checks were successful
ci/woodpecker/push/debug Pipeline was successful
ci/woodpecker/pr/debug Pipeline was successful

Reviewed-on: #782
This commit is contained in:
Alex 2024-03-19 15:59:19 +00:00
commit 65853a4863
13 changed files with 302 additions and 22 deletions

View file

@ -141,4 +141,7 @@ blocks may still be held by Garage. If you suspect that such corruption has occu
in your cluster, you can run one of the following repair procedures:
- `garage repair versions`: checks that all versions belong to a non-deleted object, and purges any orphan version
- `garage repair block_refs`: checks that all block references belong to a non-deleted object version, and purges any orphan block reference (this will then allow the blocks to be garbage-collected)
- `garage repair block-refs`: checks that all block references belong to a non-deleted object version, and purges any orphan block reference (this will then allow the blocks to be garbage-collected)
- `garage repair block-rc`: checks that the reference counters for blocks are in sync with the actual number of non-deleted entries in the block reference table

View file

@ -11,3 +11,4 @@ mod metrics;
mod rc;
pub use block::zstd_encode;
pub use rc::CalculateRefcount;

View file

@ -88,7 +88,7 @@ pub struct BlockManager {
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
pub(crate) rc: BlockRc,
pub rc: BlockRc,
pub resync: BlockResyncManager,
pub(crate) system: Arc<System>,
@ -156,7 +156,7 @@ impl BlockManager {
let metrics = BlockManagerMetrics::new(
config.compression_level,
rc.rc.clone(),
rc.rc_table.clone(),
resync.queue.clone(),
resync.errors.clone(),
);
@ -229,6 +229,12 @@ impl BlockManager {
}
}
/// Initialization: set how block references are recalculated
/// for repair operations
pub fn set_recalc_rc(&self, recalc: Vec<CalculateRefcount>) {
self.rc.recalc_rc.store(Some(Arc::new(recalc)));
}
/// Ask nodes that might have a (possibly compressed) block for it
/// Return it as a stream with a header
async fn rpc_get_raw_block_streaming(
@ -316,9 +322,9 @@ impl BlockManager {
};
}
let msg = format!("Get block {:?}: no node returned a valid block", hash);
debug!("{}", msg);
Err(Error::Message(msg))
let err = Error::MissingBlock(*hash);
debug!("{}", err);
Err(err)
}
// ---- Public interface ----
@ -381,7 +387,7 @@ impl BlockManager {
/// Get number of items in the refcount table
pub fn rc_len(&self) -> Result<usize, Error> {
Ok(self.rc.rc.len()?)
Ok(self.rc.rc_table.len()?)
}
/// Send command to start/stop/manager scrub worker

View file

@ -1,5 +1,7 @@
use std::convert::TryInto;
use arc_swap::ArcSwapOption;
use garage_db as db;
use garage_util::data::*;
@ -8,13 +10,20 @@ use garage_util::time::*;
use crate::manager::BLOCK_GC_DELAY;
pub type CalculateRefcount =
Box<dyn Fn(&db::Transaction, &Hash) -> db::TxResult<usize, Error> + Send + Sync>;
pub struct BlockRc {
pub(crate) rc: db::Tree,
pub rc_table: db::Tree,
pub(crate) recalc_rc: ArcSwapOption<Vec<CalculateRefcount>>,
}
impl BlockRc {
pub(crate) fn new(rc: db::Tree) -> Self {
Self { rc }
Self {
rc_table: rc,
recalc_rc: ArcSwapOption::new(None),
}
}
/// Increment the reference counter associated to a hash.
@ -24,9 +33,9 @@ impl BlockRc {
tx: &mut db::Transaction,
hash: &Hash,
) -> db::TxOpResult<bool> {
let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
let old_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
match old_rc.increment().serialize() {
Some(x) => tx.insert(&self.rc, hash, x)?,
Some(x) => tx.insert(&self.rc_table, hash, x)?,
None => unreachable!(),
};
Ok(old_rc.is_zero())
@ -39,28 +48,28 @@ impl BlockRc {
tx: &mut db::Transaction,
hash: &Hash,
) -> db::TxOpResult<bool> {
let new_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?).decrement();
let new_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?).decrement();
match new_rc.serialize() {
Some(x) => tx.insert(&self.rc, hash, x)?,
None => tx.remove(&self.rc, hash)?,
Some(x) => tx.insert(&self.rc_table, hash, x)?,
None => tx.remove(&self.rc_table, hash)?,
};
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
}
/// Read a block's reference count
pub(crate) fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> {
Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?))
Ok(RcEntry::parse_opt(self.rc_table.get(hash.as_ref())?))
}
/// Delete an entry in the RC table if it is deletable and the
/// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
self.rc.db().transaction(|tx| {
let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
self.rc_table.db().transaction(|tx| {
let rcval = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
match rcval {
RcEntry::Deletable { at_time } if now > at_time => {
tx.remove(&self.rc, hash)?;
tx.remove(&self.rc_table, hash)?;
}
_ => (),
};
@ -68,6 +77,58 @@ impl BlockRc {
})?;
Ok(())
}
/// Recalculate the reference counter of a block
/// to fix potential inconsistencies
pub fn recalculate_rc(&self, hash: &Hash) -> Result<(usize, bool), Error> {
if let Some(recalc_fns) = self.recalc_rc.load().as_ref() {
trace!("Repair block RC for {:?}", hash);
let res = self
.rc_table
.db()
.transaction(|tx| {
let mut cnt = 0;
for f in recalc_fns.iter() {
cnt += f(&tx, hash)?;
}
let old_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
trace!(
"Block RC for {:?}: stored={}, calculated={}",
hash,
old_rc.as_u64(),
cnt
);
if cnt as u64 != old_rc.as_u64() {
warn!(
"Fixing inconsistent block RC for {:?}: was {}, should be {}",
hash,
old_rc.as_u64(),
cnt
);
let new_rc = if cnt > 0 {
RcEntry::Present { count: cnt as u64 }
} else {
RcEntry::Deletable {
at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64,
}
};
tx.insert(&self.rc_table, hash, new_rc.serialize().unwrap())?;
Ok((cnt, true))
} else {
Ok((cnt, false))
}
})
.map_err(Error::from);
if let Err(e) = &res {
error!("Failed to fix RC for block {:?}: {}", hash, e);
}
res
} else {
Err(Error::Message(
"Block RC recalculation is not available at this point".into(),
))
}
}
}
/// Describes the state of the reference counter for a block

View file

@ -107,7 +107,7 @@ impl Worker for RepairWorker {
for entry in self
.manager
.rc
.rc
.rc_table
.range::<&[u8], _>((start_bound, Bound::Unbounded))?
{
let (hash, _) = entry?;

View file

@ -367,6 +367,13 @@ impl BlockResyncManager {
}
if exists && rc.is_deletable() {
if manager.rc.recalculate_rc(hash)?.0 > 0 {
return Err(Error::Message(format!(
"Refcount for block {:?} was inconsistent, retrying later",
hash
)));
}
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
@ -453,7 +460,15 @@ impl BlockResyncManager {
hash
);
let block_data = manager.rpc_get_raw_block(hash, None).await?;
let block_data = manager.rpc_get_raw_block(hash, None).await;
if matches!(block_data, Err(Error::MissingBlock(_))) {
warn!(
"Could not fetch needed block {:?}, no node returned valid data. Checking that refcount is correct.",
hash
);
manager.rc.recalculate_rc(hash)?;
}
let block_data = block_data?;
manager.metrics.resync_recv_counter.add(1);

View file

@ -473,8 +473,11 @@ pub enum RepairWhat {
#[structopt(name = "mpu", version = garage_version())]
MultipartUploads,
/// Repropagate version deletions to the block ref table
#[structopt(name = "block_refs", version = garage_version())]
#[structopt(name = "block-refs", version = garage_version())]
BlockRefs,
/// Recalculate block reference counters
#[structopt(name = "block-rc", version = garage_version())]
BlockRc,
/// Verify integrity of all blocks on disc
#[structopt(name = "scrub", version = garage_version())]
Scrub {

View file

@ -451,7 +451,7 @@ pub fn print_block_info(
if refcount != nondeleted_count {
println!();
println!(
"Warning: refcount does not match number of non-deleted versions (see issue #644)."
"Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`."
);
}
}

View file

@ -4,6 +4,7 @@ use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::watch;
use garage_block::manager::BlockManager;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::garage::Garage;
@ -16,11 +17,14 @@ use garage_table::replication::*;
use garage_table::*;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::migrate::Migrate;
use crate::*;
const RC_REPAIR_ITER_COUNT: usize = 64;
pub async fn launch_online_repair(
garage: &Arc<Garage>,
bg: &BackgroundRunner,
@ -47,6 +51,13 @@ pub async fn launch_online_repair(
info!("Repairing the block refs table");
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
}
RepairWhat::BlockRc => {
info!("Repairing the block reference counters");
bg.spawn_worker(BlockRcRepair::new(
garage.block_manager.clone(),
garage.block_ref_table.clone(),
));
}
RepairWhat::Blocks => {
info!("Repairing the stored blocks");
bg.spawn_worker(garage_block::repair::RepairWorker::new(
@ -282,3 +293,98 @@ impl TableRepair for RepairMpu {
Ok(false)
}
}
// ===== block reference counter repair =====
pub struct BlockRcRepair {
block_manager: Arc<BlockManager>,
block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
cursor: Hash,
counter: u64,
repairs: u64,
}
impl BlockRcRepair {
fn new(
block_manager: Arc<BlockManager>,
block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
) -> Self {
Self {
block_manager,
block_ref_table,
cursor: [0u8; 32].into(),
counter: 0,
repairs: 0,
}
}
}
#[async_trait]
impl Worker for BlockRcRepair {
fn name(&self) -> String {
format!("Block refcount repair worker")
}
fn status(&self) -> WorkerStatus {
WorkerStatus {
progress: Some(format!("{} ({})", self.counter, self.repairs)),
..Default::default()
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
for _i in 0..RC_REPAIR_ITER_COUNT {
let next1 = self
.block_manager
.rc
.rc_table
.range(self.cursor.as_slice()..)?
.next()
.transpose()?
.map(|(k, _)| Hash::try_from(k.as_slice()).unwrap());
let next2 = self
.block_ref_table
.data
.store
.range(self.cursor.as_slice()..)?
.next()
.transpose()?
.map(|(k, _)| Hash::try_from(&k[..32]).unwrap());
let next = match (next1, next2) {
(Some(k1), Some(k2)) => std::cmp::min(k1, k2),
(Some(k), None) | (None, Some(k)) => k,
(None, None) => {
info!(
"{}: finished, done {}, fixed {}",
self.name(),
self.counter,
self.repairs
);
return Ok(WorkerState::Done);
}
};
if self.block_manager.rc.recalculate_rc(&next)?.1 {
self.repairs += 1;
}
self.counter += 1;
if let Some(next_incr) = next.increment() {
self.cursor = next_incr;
} else {
info!(
"{}: finished, done {}, fixed {}",
self.name(),
self.counter,
self.repairs
);
return Ok(WorkerState::Done);
}
}
Ok(WorkerState::Busy)
}
async fn wait_for_work(&mut self) -> WorkerState {
unreachable!()
}
}

View file

@ -247,6 +247,14 @@ impl Garage {
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
// ---- setup block refcount recalculation ----
// this function can be used to fix inconsistencies in the RC table
block_manager.set_recalc_rc(vec![
block_ref_recount_fn(&block_ref_table),
// other functions could be added here if we had other tables
// that hold references to data blocks
]);
// -- done --
Ok(Arc::new(Self {
config,

View file

@ -3,8 +3,12 @@ use std::sync::Arc;
use garage_db as db;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::migrate::Migrate;
use garage_block::CalculateRefcount;
use garage_table::crdt::Crdt;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
use garage_block::manager::*;
@ -84,3 +88,38 @@ impl TableSchema for BlockRefTable {
filter.apply(entry.deleted.get())
}
}
pub fn block_ref_recount_fn(
block_ref_table: &Arc<Table<BlockRefTable, TableShardedReplication>>,
) -> CalculateRefcount {
let table = Arc::downgrade(block_ref_table);
Box::new(move |tx: &db::Transaction, block: &Hash| {
let table = table
.upgrade()
.ok_or_message("cannot upgrade weak ptr to block_ref_table")
.map_err(db::TxError::Abort)?;
Ok(calculate_refcount(&table, tx, block)?)
})
}
fn calculate_refcount(
block_ref_table: &Table<BlockRefTable, TableShardedReplication>,
tx: &db::Transaction,
block: &Hash,
) -> db::TxResult<usize, Error> {
let mut result = 0;
for entry in tx.range(&block_ref_table.data.store, block.as_slice()..)? {
let (key, value) = entry?;
if &key[..32] != block.as_slice() {
break;
}
let value = BlockRef::decode(&value)
.ok_or_message("could not decode block_ref")
.map_err(db::TxError::Abort)?;
assert_eq!(value.block, *block);
if !value.deleted.get() {
result += 1;
}
}
Ok(result)
}

View file

@ -83,6 +83,19 @@ impl FixedBytes32 {
ret.copy_from_slice(by);
Some(Self(ret))
}
/// Return the next hash
pub fn increment(&self) -> Option<Self> {
let mut ret = *self;
for byte in ret.0.iter_mut().rev() {
if *byte == u8::MAX {
*byte = 0;
} else {
*byte = *byte + 1;
return Some(ret);
}
}
return None;
}
}
impl From<garage_net::NodeID> for FixedBytes32 {
@ -140,3 +153,25 @@ pub fn fasthash(data: &[u8]) -> FastHash {
pub fn gen_uuid() -> Uuid {
rand::thread_rng().gen::<[u8; 32]>().into()
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_increment() {
let zero: FixedBytes32 = [0u8; 32].into();
let mut one: FixedBytes32 = [0u8; 32].into();
one.0[31] = 1;
let max: FixedBytes32 = [0xFFu8; 32].into();
assert_eq!(zero.increment(), Some(one));
assert_eq!(max.increment(), None);
let mut test: FixedBytes32 = [0u8; 32].into();
let i = 0x198DF97209F8FFFFu64;
test.0[24..32].copy_from_slice(&u64::to_be_bytes(i));
let mut test2: FixedBytes32 = [0u8; 32].into();
test2.0[24..32].copy_from_slice(&u64::to_be_bytes(i + 1));
assert_eq!(test.increment(), Some(test2));
}
}

View file

@ -70,6 +70,9 @@ pub enum Error {
#[error(display = "Corrupt data: does not match hash {:?}", _0)]
CorruptData(Hash),
#[error(display = "Missing block {:?}: no node returned a valid block", _0)]
MissingBlock(Hash),
#[error(display = "{}", _0)]
Message(String),
}