diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 986592aec..6444d3744 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -452,6 +452,9 @@ pub enum RepairWhat { /// Only redo the propagation of object deletions to the version table (slow) #[structopt(name = "versions", version = garage_version())] Versions, + /// Only redo the propagation of object deletions to the multipart upload table (slow) + #[structopt(name = "mpu", version = garage_version())] + MultipartUploads, /// Only redo the propagation of version deletions to the block ref table (extremely slow) #[structopt(name = "block_refs", version = garage_version())] BlockRefs, diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 6d8a91fe7..9de6166d2 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -8,6 +8,7 @@ use garage_block::repair::ScrubWorkerCommand; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; @@ -38,6 +39,10 @@ pub async fn launch_online_repair( info!("Repairing the versions table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); } + RepairWhat::MultipartUploads => { + info!("Repairing the multipart uploads table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); + } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); @@ -162,25 +167,26 @@ impl TableRepair for RepairVersions { async fn process(&mut self, garage: &Garage, version: Version) -> Result { if !version.deleted.get() { - let version_exists = match &version.backlink { - VersionBacklink::Object { bucket_id, key } => { - let object = garage.object_table.get(&bucket_id, &key).await?; - match object { - Some(o) => o.versions().iter().any(|x| { + let ref_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => garage + .object_table + .get(&bucket_id, &key) + .await? + .map(|o| { + o.versions().iter().any(|x| { x.uuid == version.uuid && x.state != ObjectVersionState::Aborted - }), - None => false, - } - } - VersionBacklink::MultipartUpload { upload_id } => { - let mpu = garage.mpu_table.get(&upload_id, &EmptyKey).await?; - match mpu { - Some(u) => !u.deleted.get(), - None => false, - } - } + }) + }) + .unwrap_or(false), + VersionBacklink::MultipartUpload { upload_id } => garage + .mpu_table + .get(&upload_id, &EmptyKey) + .await? + .map(|u| !u.deleted.get()) + .unwrap_or(false), }; - if !version_exists { + + if !ref_exists { info!("Repair versions: marking version as deleted: {:?}", version); garage .version_table @@ -206,27 +212,63 @@ impl TableRepair for RepairBlockRefs { &garage.block_ref_table } - async fn process(&mut self, garage: &Garage, block_ref: BlockRef) -> Result { + async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result { if !block_ref.deleted.get() { - let version = garage + let ref_exists = garage .version_table .get(&block_ref.version, &EmptyKey) - .await?; - // The version might not exist if it has been GC'ed - let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); + .await? + .map(|v| !v.deleted.get()) + .unwrap_or(false); + if !ref_exists { info!( "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); - garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true.into(), - }) - .await?; + block_ref.deleted.set(); + garage.block_ref_table.insert(&block_ref).await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ---- + +struct RepairMpu; + +#[async_trait] +impl TableRepair for RepairMpu { + type T = MultipartUploadTable; + + fn table(garage: &Garage) -> &Table { + &garage.mpu_table + } + + async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result { + if !mpu.deleted.get() { + let ref_exists = garage + .object_table + .get(&mpu.bucket_id, &mpu.key) + .await? + .map(|o| { + o.versions() + .iter() + .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true))) + }) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair multipart uploads: marking mpu as deleted: {:?}", + mpu + ); + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; return Ok(true); } }