From 776294ebbe3b54314bbc3b6d18511a51ebe2a710 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 19 Mar 2025 12:39:32 +0100 Subject: [PATCH] implement repair procedure to fix inconsistent bucket aliases --- src/garage/cli/structs.rs | 3 + src/garage/repair/online.rs | 4 + src/model/helper/locked.rs | 193 ++++++++++++++++++++++++++++++++++++ 3 files changed, 200 insertions(+) diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 4ec35e68..3652ef6b 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -478,6 +478,9 @@ pub enum RepairWhat { /// Recalculate block reference counters #[structopt(name = "block-rc", version = garage_version())] BlockRc, + /// Fix inconsistency in bucket aliases (WARNING: EXPERIMENTAL) + #[structopt(name = "aliases", version = garage_version())] + Aliases, /// Verify integrity of all blocks on disc #[structopt(name = "scrub", version = garage_version())] Scrub { diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 47883f97..950cd5f7 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -88,6 +88,10 @@ pub async fn launch_online_repair( garage.block_manager.clone(), )); } + RepairWhat::Aliases => { + info!("Repairing bucket aliases (foreground)"); + garage.locked_helper().await.repair_aliases().await?; + } } Ok(()) } diff --git a/src/model/helper/locked.rs b/src/model/helper/locked.rs index a36a05e0..7088cdce 100644 --- a/src/model/helper/locked.rs +++ b/src/model/helper/locked.rs @@ -1,3 +1,7 @@ +use std::collections::{HashMap, HashSet}; + +use garage_db as db; + use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::{Error as GarageError, OkOrMessage}; @@ -459,4 +463,193 @@ impl<'a> LockedHelper<'a> { Ok(()) } + + // ================================================ + // repair procedure + // ================================================ + + pub async fn repair_aliases(&self) -> Result<(), GarageError> { + self.0.db.transaction(|tx| { + info!("--- begin repair_aliases transaction ----"); + + // 1. List all non-deleted buckets, so that we can fix bad aliases + let mut all_buckets: HashSet = HashSet::new(); + + for item in tx.range::<&[u8], _>(&self.0.bucket_table.data.store, ..)? { + let bucket = self + .0 + .bucket_table + .data + .decode_entry(&(item?.1)) + .map_err(db::TxError::Abort)?; + if !bucket.is_deleted() { + all_buckets.insert(bucket.id); + } + } + + info!("number of buckets: {}", all_buckets.len()); + + // 2. List all aliases declared in bucket_alias_table and key_table + // Take note of aliases that point to non-existing buckets + let mut global_aliases: HashMap = HashMap::new(); + + { + let mut delete_global = vec![]; + for item in tx.range::<&[u8], _>(&self.0.bucket_alias_table.data.store, ..)? { + let mut alias = self + .0 + .bucket_alias_table + .data + .decode_entry(&(item?.1)) + .map_err(db::TxError::Abort)?; + if let Some(id) = alias.state.get() { + if all_buckets.contains(id) { + // keep aliases + global_aliases.insert(alias.name().to_string(), *id); + } else { + // delete alias + warn!( + "global alias: remove {} -> {:?} (bucket is deleted)", + alias.name(), + id + ); + alias.state.update(None); + delete_global.push(alias); + } + } + } + + info!("number of global aliases: {}", global_aliases.len()); + + info!("global alias table: {} entries fixed", delete_global.len()); + for ga in delete_global { + debug!("Enqueue update to global alias table: {:?}", ga); + self.0.bucket_alias_table.queue_insert(tx, &ga)?; + } + } + + let mut local_aliases: HashMap<(String, String), Uuid> = HashMap::new(); + + { + let mut delete_local = vec![]; + + for item in tx.range::<&[u8], _>(&self.0.key_table.data.store, ..)? { + let mut key = self + .0 + .key_table + .data + .decode_entry(&(item?.1)) + .map_err(db::TxError::Abort)?; + let Some(p) = key.state.as_option_mut() else { + continue; + }; + let mut has_changes = false; + for (name, _, to) in p.local_aliases.items().to_vec() { + if let Some(id) = to { + if all_buckets.contains(&id) { + local_aliases.insert((key.key_id.clone(), name), id); + } else { + warn!( + "local alias: remove ({}, {}) -> {:?} (bucket is deleted)", + key.key_id, name, id + ); + p.local_aliases.update_in_place(name, None); + has_changes = true; + } + } + } + if has_changes { + delete_local.push(key); + } + } + + info!("number of local aliases: {}", local_aliases.len()); + + info!("key table: {} entries fixed", delete_local.len()); + for la in delete_local { + debug!("Enqueue update to key table: {:?}", la); + self.0.key_table.queue_insert(tx, &la)?; + } + } + + // 4. Reverse the alias maps to determine the aliases per-bucket + let mut bucket_global: HashMap> = HashMap::new(); + let mut bucket_local: HashMap> = HashMap::new(); + + for (name, bucket) in global_aliases { + bucket_global.entry(bucket).or_default().push(name); + } + for ((key, name), bucket) in local_aliases { + bucket_local.entry(bucket).or_default().push((key, name)); + } + + // 5. Fix the bucket table to ensure consistency + let mut bucket_updates = vec![]; + + for item in tx.range::<&[u8], _>(&self.0.bucket_table.data.store, ..)? { + let bucket = self + .0 + .bucket_table + .data + .decode_entry(&(item?.1)) + .map_err(db::TxError::Abort)?; + let mut bucket2 = bucket.clone(); + let Some(param) = bucket2.state.as_option_mut() else { + continue; + }; + + // fix global aliases + { + let ga = bucket_global.remove(&bucket.id).unwrap_or_default(); + for (name, _, active) in param.aliases.items().to_vec() { + if active && !ga.contains(&name) { + warn!("bucket {:?}: remove global alias {}", bucket.id, name); + param.aliases.update_in_place(name, false); + } + } + for name in ga { + if param.aliases.get(&name).copied().unwrap_or(false) == false { + warn!("bucket {:?}: add global alias {}", bucket.id, name); + param.aliases.update_in_place(name, true); + } + } + } + + // fix local aliases + { + let la = bucket_local.remove(&bucket.id).unwrap_or_default(); + for (pair, _, active) in param.local_aliases.items().to_vec() { + if active && !la.contains(&pair) { + warn!("bucket {:?}: remove local alias {:?}", bucket.id, pair); + param.local_aliases.update_in_place(pair, false); + } + } + for pair in la { + if param.local_aliases.get(&pair).copied().unwrap_or(false) == false { + warn!("bucket {:?}: add local alias {:?}", bucket.id, pair); + param.local_aliases.update_in_place(pair, true); + } + } + } + + if bucket2 != bucket { + bucket_updates.push(bucket2); + } + } + + info!("bucket table: {} entries fixed", bucket_updates.len()); + for b in bucket_updates { + debug!("Enqueue update to bucket table: {:?}", b); + self.0.bucket_table.queue_insert(tx, &b)?; + } + + info!("--- end repair_aliases transaction ----"); + + Ok(()) + })?; + + info!("repair_aliases is done"); + + Ok(()) + } }