From 3d477906d4ff418de10973db7bd3e940f2e47b2d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 9 Jun 2023 17:13:27 +0200 Subject: [PATCH] properly delete multipart uploads after completion --- src/model/s3/object_table.rs | 54 +++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index a69069b2..db5ccf96 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -397,34 +397,20 @@ impl TableSchema for ObjectTable { // 2. Enqueue propagation deletions to version table if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of old versions for v in old_v.versions.iter() { - let newly_deleted = match new_v + let new_v_id = new_v .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - { + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())); + + // Propagate deletion of old versions to the Version table + let delete_version = match new_v_id { Err(_) => true, Ok(i) => { new_v.versions[i].state == ObjectVersionState::Aborted && v.state != ObjectVersionState::Aborted } }; - if newly_deleted { - if let ObjectVersionState::Uploading { - multipart: true, .. - } = &v.state - { - let deleted_mpu = - MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); - let res = self.mpu_table.queue_insert(tx, &deleted_mpu); - if let Err(e) = db::unabort(res)? { - error!( - "Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.", - e - ); - } - } - + if delete_version { let deleted_version = Version::new( v.uuid, VersionBacklink::Object { @@ -441,6 +427,34 @@ impl TableSchema for ObjectTable { ); } } + + // After abortion or completion of multipart uploads, delete MPU table entry + if matches!( + v.state, + ObjectVersionState::Uploading { + multipart: true, + .. + } + ) { + let delete_mpu = match new_v_id { + Err(_) => true, + Ok(i) => !matches!( + new_v.versions[i].state, + ObjectVersionState::Uploading { .. } + ), + }; + if delete_mpu { + let deleted_mpu = + MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + let res = self.mpu_table.queue_insert(tx, &deleted_mpu); + if let Err(e) = db::unabort(res)? { + error!( + "Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.", + e + ); + } + } + } } }