Abort multipart upload
This commit is contained in:
parent
81ecc4999e
commit
0877a5500c
3 changed files with 45 additions and 5 deletions
2
TODO
2
TODO
|
@ -9,7 +9,7 @@ Attaining S3 compatibility
|
||||||
--------------------------
|
--------------------------
|
||||||
|
|
||||||
- test & fix multipart uploads
|
- test & fix multipart uploads
|
||||||
- abort multipart upload
|
|
||||||
- fix sync not working in some cases ? (when starting from empty?)
|
- fix sync not working in some cases ? (when starting from empty?)
|
||||||
|
|
||||||
- api_server following the S3 semantics for head/get/put/list/delete: verify more that it works as intended
|
- api_server following the S3 semantics for head/get/put/list/delete: verify more that it works as intended
|
||||||
|
|
|
@ -120,6 +120,7 @@ async fn handler_inner(
|
||||||
.iter()
|
.iter()
|
||||||
.all(|x| params.contains_key(&x.to_string()))
|
.all(|x| params.contains_key(&x.to_string()))
|
||||||
{
|
{
|
||||||
|
// UploadPart query
|
||||||
let part_number = params.get("partnumber").unwrap();
|
let part_number = params.get("partnumber").unwrap();
|
||||||
let upload_id = params.get("uploadid").unwrap();
|
let upload_id = params.get("uploadid").unwrap();
|
||||||
Ok(handle_put_part(garage, req, &bucket, &key, part_number, upload_id).await?)
|
Ok(handle_put_part(garage, req, &bucket, &key, part_number, upload_id).await?)
|
||||||
|
@ -129,16 +130,23 @@ async fn handler_inner(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
&Method::DELETE => {
|
&Method::DELETE => {
|
||||||
// DeleteObject query
|
if params.contains_key(&"uploadid".to_string()) {
|
||||||
let version_uuid = handle_delete(garage, &bucket, &key).await?;
|
// AbortMultipartUpload query
|
||||||
let response = format!("{}\n", hex::encode(version_uuid));
|
let upload_id = params.get("uploadid").unwrap();
|
||||||
Ok(Response::new(Box::new(BytesBody::from(response))))
|
Ok(handle_abort_multipart_upload(garage, &bucket, &key, upload_id).await?)
|
||||||
|
} else {
|
||||||
|
// DeleteObject query
|
||||||
|
let version_uuid = handle_delete(garage, &bucket, &key).await?;
|
||||||
|
let response = format!("{}\n", hex::encode(version_uuid));
|
||||||
|
Ok(Response::new(Box::new(BytesBody::from(response))))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
&Method::POST => {
|
&Method::POST => {
|
||||||
if params.contains_key(&"uploads".to_string()) {
|
if params.contains_key(&"uploads".to_string()) {
|
||||||
// CreateMultipartUpload call
|
// CreateMultipartUpload call
|
||||||
Ok(handle_create_multipart_upload(garage, &req, &bucket, &key).await?)
|
Ok(handle_create_multipart_upload(garage, &req, &bucket, &key).await?)
|
||||||
} else if params.contains_key(&"uploadid".to_string()) {
|
} else if params.contains_key(&"uploadid".to_string()) {
|
||||||
|
// CompleteMultipartUpload call
|
||||||
let upload_id = params.get("uploadid").unwrap();
|
let upload_id = params.get("uploadid").unwrap();
|
||||||
Ok(handle_complete_multipart_upload(garage, req, &bucket, &key, upload_id).await?)
|
Ok(handle_complete_multipart_upload(garage, req, &bucket, &key, upload_id).await?)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -333,6 +333,38 @@ pub async fn handle_complete_multipart_upload(
|
||||||
Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes()))))
|
Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes()))))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn handle_abort_multipart_upload(
|
||||||
|
garage: Arc<Garage>,
|
||||||
|
bucket: &str,
|
||||||
|
key: &str,
|
||||||
|
upload_id: &str,
|
||||||
|
) -> Result<Response<BodyType>, Error> {
|
||||||
|
let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
|
||||||
|
|
||||||
|
let object = garage.object_table.get(&bucket.to_string(), &key.to_string()).await?;
|
||||||
|
let object = match object {
|
||||||
|
None => return Err(Error::BadRequest(format!("Object not found"))),
|
||||||
|
Some(x) => x,
|
||||||
|
};
|
||||||
|
let object_version = object.versions().iter().find(|v| {
|
||||||
|
v.uuid == version_uuid
|
||||||
|
&& v.state == ObjectVersionState::Uploading
|
||||||
|
&& v.data == ObjectVersionData::Uploading
|
||||||
|
});
|
||||||
|
let mut object_version = match object_version {
|
||||||
|
None => return Err(Error::BadRequest(format!(
|
||||||
|
"Multipart upload does not exist or has already been completed"
|
||||||
|
))),
|
||||||
|
Some(x) => x.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
object_version.state = ObjectVersionState::Aborted;
|
||||||
|
let final_object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
|
||||||
|
garage.object_table.insert(&final_object).await?;
|
||||||
|
|
||||||
|
Ok(Response::new(Box::new(BytesBody::from(vec![]))))
|
||||||
|
}
|
||||||
|
|
||||||
fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
|
fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
|
||||||
Ok(req
|
Ok(req
|
||||||
.headers()
|
.headers()
|
||||||
|
|
Loading…
Reference in a new issue