use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use tokio::sync::watch; use garage_block::repair::ScrubWorkerCommand; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_table::replication::*; use garage_table::*; use garage_util::background::*; use garage_util::error::Error; use garage_util::migrate::Migrate; use crate::*; pub async fn launch_online_repair( garage: &Arc, bg: &BackgroundRunner, opt: RepairOpt, ) -> Result<(), Error> { match opt.what { RepairWhat::Tables => { info!("Launching a full sync of tables"); garage.bucket_table.syncer.add_full_sync()?; garage.object_table.syncer.add_full_sync()?; garage.version_table.syncer.add_full_sync()?; garage.block_ref_table.syncer.add_full_sync()?; garage.key_table.syncer.add_full_sync()?; } RepairWhat::Versions => { info!("Repairing the versions table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); bg.spawn_worker(garage_block::repair::RepairWorker::new( garage.block_manager.clone(), )); } RepairWhat::Scrub { cmd } => { let cmd = match cmd { ScrubCmd::Start => ScrubWorkerCommand::Start, ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)), ScrubCmd::Resume => ScrubWorkerCommand::Resume, ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, ScrubCmd::SetTranquility { tranquility } => { garage .block_manager .scrub_persister .set_with(|x| x.tranquility = tranquility)?; return Ok(()); } }; info!("Sending command to scrub worker: {:?}", cmd); garage.block_manager.send_scrub_command(cmd).await?; } } Ok(()) } // ---- #[async_trait] trait TableRepair: Send + Sync + 'static { type T: TableSchema; fn table(garage: &Garage) -> &Table; async fn process( &mut self, garage: &Garage, entry: <::T as TableSchema>::E, ) -> Result; } struct TableRepairWorker { garage: Arc, pos: Vec, counter: usize, repairs: usize, inner: T, } impl TableRepairWorker { fn new(garage: Arc, inner: R) -> Self { Self { garage, inner, pos: vec![], counter: 0, repairs: 0, } } } #[async_trait] impl Worker for TableRepairWorker { fn name(&self) -> String { format!("{} repair worker", R::T::TABLE_NAME) } fn status(&self) -> WorkerStatus { WorkerStatus { progress: Some(format!("{} ({})", self.counter, self.repairs)), ..Default::default() } } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { Some((k, v)) => (v, k), None => { info!( "{}: finished, done {}, fixed {}", self.name(), self.counter, self.repairs ); return Ok(WorkerState::Done); } }; let entry = ::E::decode(&item_bytes) .ok_or_message("Cannot decode table entry")?; if self.inner.process(&self.garage, entry).await? { self.repairs += 1; } self.counter += 1; self.pos = next_pos; Ok(WorkerState::Busy) } async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } // ---- struct RepairVersions; #[async_trait] impl TableRepair for RepairVersions { type T = VersionTable; fn table(garage: &Garage) -> &Table { &garage.version_table } async fn process(&mut self, garage: &Garage, version: Version) -> Result { if !version.deleted.get() { let version_exists = match &version.backlink { VersionBacklink::Object { bucket_id, key } => { let object = garage.object_table.get(&bucket_id, &key).await?; match object { Some(o) => o.versions().iter().any(|x| { x.uuid == version.uuid && x.state != ObjectVersionState::Aborted }), None => false, } } VersionBacklink::MultipartUpload { upload_id } => { let mpu = garage.mpu_table.get(&upload_id, &EmptyKey).await?; match mpu { Some(u) => !u.deleted.get(), None => false, } } }; if !version_exists { info!("Repair versions: marking version as deleted: {:?}", version); garage .version_table .insert(&Version::new(version.uuid, version.backlink, true)) .await?; return Ok(true); } } Ok(false) } } // ---- struct RepairBlockRefs; #[async_trait] impl TableRepair for RepairBlockRefs { type T = BlockRefTable; fn table(garage: &Garage) -> &Table { &garage.block_ref_table } async fn process(&mut self, garage: &Garage, block_ref: BlockRef) -> Result { if !block_ref.deleted.get() { let version = garage .version_table .get(&block_ref.version, &EmptyKey) .await?; // The version might not exist if it has been GC'ed let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); if !ref_exists { info!( "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); garage .block_ref_table .insert(&BlockRef { block: block_ref.block, version: block_ref.version, deleted: true.into(), }) .await?; return Ok(true); } } Ok(false) } }