UploadPart: automatic cleanup of version (and reference blocked) when interrupted

This commit is contained in:
Alex 2023-06-06 15:18:45 +02:00
parent c14d3735e5
commit a6cc563bdd
2 changed files with 66 additions and 15 deletions

View file

@ -96,18 +96,27 @@ pub async fn handle_put_part(
let first_block = first_block.ok_or_bad_request("Empty body")?; let first_block = first_block.ok_or_bad_request("Empty body")?;
// Calculate part identity: timestamp, version id // Calculate part identity: timestamp, version id
let version_id = gen_uuid(); let version_uuid = gen_uuid();
let mpu_part_key = MpuPartKey { let mpu_part_key = MpuPartKey {
part_number, part_number,
timestamp: mpu.next_timestamp(part_number), timestamp: mpu.next_timestamp(part_number),
}; };
// The following consists in many steps that can each fail.
// Keep track that some cleanup will be needed if things fail
// before everything is finished (cleanup is done using the Drop trait).
let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
garage: garage.clone(),
upload_id,
version_uuid,
}));
// Create version and link version from MPU // Create version and link version from MPU
mpu.parts.clear(); mpu.parts.clear();
mpu.parts.put( mpu.parts.put(
mpu_part_key, mpu_part_key,
MpuPart { MpuPart {
version: version_id, version: version_uuid,
etag: None, etag: None,
size: None, size: None,
}, },
@ -115,7 +124,7 @@ pub async fn handle_put_part(
garage.mpu_table.insert(&mpu).await?; garage.mpu_table.insert(&mpu).await?;
let version = Version::new( let version = Version::new(
version_id, version_uuid,
VersionBacklink::MultipartUpload { upload_id }, VersionBacklink::MultipartUpload { upload_id },
false, false,
); );
@ -147,13 +156,17 @@ pub async fn handle_put_part(
mpu.parts.put( mpu.parts.put(
mpu_part_key, mpu_part_key,
MpuPart { MpuPart {
version: version_id, version: version_uuid,
etag: Some(data_md5sum_hex.clone()), etag: Some(data_md5sum_hex.clone()),
size: Some(total_size), size: Some(total_size),
}, },
); );
garage.mpu_table.insert(&mpu).await?; garage.mpu_table.insert(&mpu).await?;
// We were not interrupted, everything went fine.
// We won't have to clean up on drop.
interrupted_cleanup.cancel();
let response = Response::builder() let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex)) .header("ETag", format!("\"{}\"", data_md5sum_hex))
.body(Body::empty()) .body(Body::empty())
@ -161,6 +174,37 @@ pub async fn handle_put_part(
Ok(response) Ok(response)
} }
struct InterruptedCleanup(Option<InterruptedCleanupInner>);
struct InterruptedCleanupInner {
garage: Arc<Garage>,
upload_id: Uuid,
version_uuid: Uuid,
}
impl InterruptedCleanup {
fn cancel(&mut self) {
drop(self.0.take());
}
}
impl Drop for InterruptedCleanup {
fn drop(&mut self) {
if let Some(info) = self.0.take() {
tokio::spawn(async move {
let version = Version::new(
info.version_uuid,
VersionBacklink::MultipartUpload {
upload_id: info.upload_id,
},
true,
);
if let Err(e) = info.garage.version_table.insert(&version).await {
warn!("Cannot cleanup after aborted UploadPart: {}", e);
}
});
}
}
}
pub async fn handle_complete_multipart_upload( pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>, garage: Arc<Garage>,
req: Request<Body>, req: Request<Body>,

View file

@ -121,13 +121,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// The following consists in many steps that can each fail. // The following consists in many steps that can each fail.
// Keep track that some cleanup will be needed if things fail // Keep track that some cleanup will be needed if things fail
// before everything is finished (cleanup is done using the Drop trait). // before everything is finished (cleanup is done using the Drop trait).
let mut interrupted_cleanup = InterruptedCleanup(Some(( let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
garage.clone(), garage: garage.clone(),
bucket.id, bucket_id: bucket.id,
key.into(), key: key.into(),
version_uuid, version_uuid,
version_timestamp, version_timestamp,
))); }));
// Write version identifier in object table so that we have a trace // Write version identifier in object table so that we have a trace
// that we are uploading something // that we are uploading something
@ -433,7 +433,14 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
.unwrap() .unwrap()
} }
struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>); struct InterruptedCleanup(Option<InterruptedCleanupInner>);
struct InterruptedCleanupInner {
garage: Arc<Garage>,
bucket_id: Uuid,
key: String,
version_uuid: Uuid,
version_timestamp: u64,
}
impl InterruptedCleanup { impl InterruptedCleanup {
fn cancel(&mut self) { fn cancel(&mut self) {
@ -442,15 +449,15 @@ impl InterruptedCleanup {
} }
impl Drop for InterruptedCleanup { impl Drop for InterruptedCleanup {
fn drop(&mut self) { fn drop(&mut self) {
if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() { if let Some(info) = self.0.take() {
tokio::spawn(async move { tokio::spawn(async move {
let object_version = ObjectVersion { let object_version = ObjectVersion {
uuid: version_uuid, uuid: info.version_uuid,
timestamp: version_ts, timestamp: info.version_timestamp,
state: ObjectVersionState::Aborted, state: ObjectVersionState::Aborted,
}; };
let object = Object::new(bucket_id, key, vec![object_version]); let object = Object::new(info.bucket_id, info.key, vec![object_version]);
if let Err(e) = garage.object_table.insert(&object).await { if let Err(e) = info.garage.object_table.insert(&object).await {
warn!("Cannot cleanup after aborted PutObject: {}", e); warn!("Cannot cleanup after aborted PutObject: {}", e);
} }
}); });