From 058518c22b701d5d2dc3e838518d88ce9a4cc875 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 10:36:48 +0200 Subject: [PATCH] refactor repair workers with a trait --- src/garage/repair/online.rs | 175 +++++++++++++++++++----------------- 1 file changed, 94 insertions(+), 81 deletions(-) diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 9b6b3cad..6d8a91fe 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -5,11 +5,15 @@ use async_trait::async_trait; use tokio::sync::watch; use garage_block::repair::ScrubWorkerCommand; + use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; + +use garage_table::replication::*; use garage_table::*; + use garage_util::background::*; use garage_util::error::Error; use garage_util::migrate::Migrate; @@ -32,11 +36,11 @@ pub async fn launch_online_repair( } RepairWhat::Versions => { info!("Repairing the versions table"); - bg.spawn_worker(RepairVersionsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); - bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); @@ -67,71 +71,70 @@ pub async fn launch_online_repair( // ---- -struct RepairVersionsWorker { +#[async_trait] +trait TableRepair: Send + Sync + 'static { + type T: TableSchema; + + fn table(garage: &Garage) -> &Table; + + async fn process( + &mut self, + garage: &Garage, + entry: <::T as TableSchema>::E, + ) -> Result; +} + +struct TableRepairWorker { garage: Arc, pos: Vec, counter: usize, + repairs: usize, + inner: T, } -impl RepairVersionsWorker { - fn new(garage: Arc) -> Self { +impl TableRepairWorker { + fn new(garage: Arc, inner: R) -> Self { Self { garage, + inner, pos: vec![], counter: 0, + repairs: 0, } } } #[async_trait] -impl Worker for RepairVersionsWorker { +impl Worker for TableRepairWorker { fn name(&self) -> String { - "Version repair worker".into() + format!("{} repair worker", R::T::TABLE_NAME) } fn status(&self) -> WorkerStatus { WorkerStatus { - progress: Some(self.counter.to_string()), + progress: Some(format!("{} ({})", self.counter, self.repairs)), ..Default::default() } } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? { + let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { Some((k, v)) => (v, k), None => { - info!("repair_versions: finished, done {}", self.counter); + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); return Ok(WorkerState::Done); } }; - let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; - if !version.deleted.get() { - let version_exists = match &version.backlink { - VersionBacklink::Object { bucket_id, key } => { - let object = self.garage.object_table.get(&bucket_id, &key).await?; - match object { - Some(o) => o.versions().iter().any(|x| { - x.uuid == version.uuid && x.state != ObjectVersionState::Aborted - }), - None => false, - } - } - VersionBacklink::MultipartUpload { upload_id } => { - let mpu = self.garage.mpu_table.get(&upload_id, &EmptyKey).await?; - match mpu { - Some(u) => !u.deleted.get(), - None => false, - } - } - }; - if !version_exists { - info!("Repair versions: marking version as deleted: {:?}", version); - self.garage - .version_table - .insert(&Version::new(version.uuid, version.backlink, true)) - .await?; - } + let entry = ::E::decode(&item_bytes) + .ok_or_message("Cannot decode table entry")?; + if self.inner.process(&self.garage, entry).await? { + self.repairs += 1; } self.counter += 1; @@ -147,49 +150,65 @@ impl Worker for RepairVersionsWorker { // ---- -struct RepairBlockrefsWorker { - garage: Arc, - pos: Vec, - counter: usize, -} - -impl RepairBlockrefsWorker { - fn new(garage: Arc) -> Self { - Self { - garage, - pos: vec![], - counter: 0, - } - } -} +struct RepairVersions; #[async_trait] -impl Worker for RepairBlockrefsWorker { - fn name(&self) -> String { - "Block refs repair worker".into() +impl TableRepair for RepairVersions { + type T = VersionTable; + + fn table(garage: &Garage) -> &Table { + &garage.version_table } - fn status(&self) -> WorkerStatus { - WorkerStatus { - progress: Some(self.counter.to_string()), - ..Default::default() - } - } - - async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let (item_bytes, next_pos) = - match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => (v, k), - None => { - info!("repair_block_ref: finished, done {}", self.counter); - return Ok(WorkerState::Done); + async fn process(&mut self, garage: &Garage, version: Version) -> Result { + if !version.deleted.get() { + let version_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => { + let object = garage.object_table.get(&bucket_id, &key).await?; + match object { + Some(o) => o.versions().iter().any(|x| { + x.uuid == version.uuid && x.state != ObjectVersionState::Aborted + }), + None => false, + } + } + VersionBacklink::MultipartUpload { upload_id } => { + let mpu = garage.mpu_table.get(&upload_id, &EmptyKey).await?; + match mpu { + Some(u) => !u.deleted.get(), + None => false, + } } }; + if !version_exists { + info!("Repair versions: marking version as deleted: {:?}", version); + garage + .version_table + .insert(&Version::new(version.uuid, version.backlink, true)) + .await?; + return Ok(true); + } + } - let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?; + Ok(false) + } +} + +// ---- + +struct RepairBlockRefs; + +#[async_trait] +impl TableRepair for RepairBlockRefs { + type T = BlockRefTable; + + fn table(garage: &Garage) -> &Table { + &garage.block_ref_table + } + + async fn process(&mut self, garage: &Garage, block_ref: BlockRef) -> Result { if !block_ref.deleted.get() { - let version = self - .garage + let version = garage .version_table .get(&block_ref.version, &EmptyKey) .await?; @@ -200,7 +219,7 @@ impl Worker for RepairBlockrefsWorker { "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); - self.garage + garage .block_ref_table .insert(&BlockRef { block: block_ref.block, @@ -208,16 +227,10 @@ impl Worker for RepairBlockrefsWorker { deleted: true.into(), }) .await?; + return Ok(true); } } - self.counter += 1; - self.pos = next_pos; - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() + Ok(false) } }