From 0877a5500cca4f3aa85da4fff4225b154a159a09 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sun, 26 Apr 2020 20:46:21 +0000 Subject: [PATCH] Abort multipart upload --- TODO | 2 +- src/api/api_server.rs | 16 ++++++++++++---- src/api/s3_put.rs | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/TODO b/TODO index 5d74e6c08..5a995d468 100644 --- a/TODO +++ b/TODO @@ -9,7 +9,7 @@ Attaining S3 compatibility -------------------------- - test & fix multipart uploads -- abort multipart upload + - fix sync not working in some cases ? (when starting from empty?) - api_server following the S3 semantics for head/get/put/list/delete: verify more that it works as intended diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 947d1a918..b92f44038 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -120,6 +120,7 @@ async fn handler_inner( .iter() .all(|x| params.contains_key(&x.to_string())) { + // UploadPart query let part_number = params.get("partnumber").unwrap(); let upload_id = params.get("uploadid").unwrap(); Ok(handle_put_part(garage, req, &bucket, &key, part_number, upload_id).await?) @@ -129,16 +130,23 @@ async fn handler_inner( } } &Method::DELETE => { - // DeleteObject query - let version_uuid = handle_delete(garage, &bucket, &key).await?; - let response = format!("{}\n", hex::encode(version_uuid)); - Ok(Response::new(Box::new(BytesBody::from(response)))) + if params.contains_key(&"uploadid".to_string()) { + // AbortMultipartUpload query + let upload_id = params.get("uploadid").unwrap(); + Ok(handle_abort_multipart_upload(garage, &bucket, &key, upload_id).await?) + } else { + // DeleteObject query + let version_uuid = handle_delete(garage, &bucket, &key).await?; + let response = format!("{}\n", hex::encode(version_uuid)); + Ok(Response::new(Box::new(BytesBody::from(response)))) + } } &Method::POST => { if params.contains_key(&"uploads".to_string()) { // CreateMultipartUpload call Ok(handle_create_multipart_upload(garage, &req, &bucket, &key).await?) } else if params.contains_key(&"uploadid".to_string()) { + // CompleteMultipartUpload call let upload_id = params.get("uploadid").unwrap(); Ok(handle_complete_multipart_upload(garage, req, &bucket, &key, upload_id).await?) } else { diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index f1755c172..70a467a89 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -333,6 +333,38 @@ pub async fn handle_complete_multipart_upload( Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes())))) } +pub async fn handle_abort_multipart_upload( + garage: Arc, + bucket: &str, + key: &str, + upload_id: &str, +) -> Result, Error> { + let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; + + let object = garage.object_table.get(&bucket.to_string(), &key.to_string()).await?; + let object = match object { + None => return Err(Error::BadRequest(format!("Object not found"))), + Some(x) => x, + }; + let object_version = object.versions().iter().find(|v| { + v.uuid == version_uuid + && v.state == ObjectVersionState::Uploading + && v.data == ObjectVersionData::Uploading + }); + let mut object_version = match object_version { + None => return Err(Error::BadRequest(format!( + "Multipart upload does not exist or has already been completed" + ))), + Some(x) => x.clone(), + }; + + object_version.state = ObjectVersionState::Aborted; + let final_object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]); + garage.object_table.insert(&final_object).await?; + + Ok(Response::new(Box::new(BytesBody::from(vec![])))) +} + fn get_mime_type(req: &Request) -> Result { Ok(req .headers()