use std::sync::Arc; use async_trait::async_trait; use tokio::sync::watch; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_table::*; use garage_util::background::*; use garage_util::error::Error; use crate::*; pub fn launch_online_repair(garage: Arc, opt: RepairOpt) { match opt.what { RepairWhat::Tables => { info!("Launching a full sync of tables"); garage.bucket_table.syncer.add_full_sync(); garage.object_table.syncer.add_full_sync(); garage.version_table.syncer.add_full_sync(); garage.block_ref_table.syncer.add_full_sync(); garage.key_table.syncer.add_full_sync(); } RepairWhat::Versions => { info!("Repairing the versions table"); garage .background .spawn_worker(RepairVersionsWorker::new(garage.clone())); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); garage .background .spawn_worker(RepairBlockrefsWorker::new(garage.clone())); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); garage .background .spawn_worker(garage_block::repair::RepairWorker::new( garage.block_manager.clone(), )); } RepairWhat::Scrub { tranquility } => { info!("Verifying integrity of stored blocks"); garage .background .spawn_worker(garage_block::repair::ScrubWorker::new( garage.block_manager.clone(), tranquility, )); } } } // ---- struct RepairVersionsWorker { garage: Arc, pos: Vec, counter: usize, } impl RepairVersionsWorker { fn new(garage: Arc) -> Self { Self { garage, pos: vec![], counter: 0, } } } #[async_trait] impl Worker for RepairVersionsWorker { fn name(&self) -> String { "Version repair worker".into() } async fn work( &mut self, _must_exit: &mut watch::Receiver, ) -> Result { let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? { Some((k, v)) => { self.pos = k; v } None => { info!("repair_versions: finished, done {}", self.counter); return Ok(WorkerStatus::Done); } }; self.counter += 1; if self.counter % 1000 == 0 { info!("repair_versions: {}", self.counter); } let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; if !version.deleted.get() { let object = self .garage .object_table .get(&version.bucket_id, &version.key) .await?; let version_exists = match object { Some(o) => o .versions() .iter() .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), None => false, }; if !version_exists { info!("Repair versions: marking version as deleted: {:?}", version); self.garage .version_table .insert(&Version::new( version.uuid, version.bucket_id, version.key, true, )) .await?; } } Ok(WorkerStatus::Busy) } async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerStatus { unreachable!() } } // ---- struct RepairBlockrefsWorker { garage: Arc, pos: Vec, counter: usize, } impl RepairBlockrefsWorker { fn new(garage: Arc) -> Self { Self { garage, pos: vec![], counter: 0, } } } #[async_trait] impl Worker for RepairBlockrefsWorker { fn name(&self) -> String { "Block refs repair worker".into() } async fn work( &mut self, _must_exit: &mut watch::Receiver, ) -> Result { let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { Some((k, v)) => { self.pos = k; v } None => { info!("repair_block_ref: finished, done {}", self.counter); return Ok(WorkerStatus::Done); } }; self.counter += 1; if self.counter % 1000 == 0 { info!("repair_block_ref: {}", self.counter); } let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; if !block_ref.deleted.get() { let version = self .garage .version_table .get(&block_ref.version, &EmptyKey) .await?; // The version might not exist if it has been GC'ed let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); if !ref_exists { info!( "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); self.garage .block_ref_table .insert(&BlockRef { block: block_ref.block, version: block_ref.version, deleted: true.into(), }) .await?; } } Ok(WorkerStatus::Busy) } async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerStatus { unreachable!() } }