Multipart improvements

- support part_number for HeadObject
- add checks in complete_multipart_upload
This commit is contained in:
Alex 2022-01-12 12:43:33 +01:00
parent 513a6b15f9
commit 6dab836f3a
No known key found for this signature in database
GPG key ID: EDABF9711E244EB1
5 changed files with 129 additions and 34 deletions

View file

@ -157,8 +157,12 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
let resp = match endpoint { let resp = match endpoint {
Endpoint::Options => handle_options(&req, &bucket).await, Endpoint::Options => handle_options(&req, &bucket).await,
Endpoint::HeadObject { key, .. } => handle_head(garage, &req, bucket_id, &key).await, Endpoint::HeadObject {
Endpoint::GetObject { key, .. } => handle_get(garage, &req, bucket_id, &key).await, key, part_number, ..
} => handle_head(garage, &req, bucket_id, &key, part_number).await,
Endpoint::GetObject {
key, part_number, ..
} => handle_get(garage, &req, bucket_id, &key, part_number).await,
Endpoint::UploadPart { Endpoint::UploadPart {
key, key,
part_number, part_number,

View file

@ -58,6 +58,19 @@ pub enum Error {
#[error(display = "At least one of the preconditions you specified did not hold")] #[error(display = "At least one of the preconditions you specified did not hold")]
PreconditionFailed, PreconditionFailed,
/// Parts specified in CMU request do not match parts actually uploaded
#[error(display = "Parts given to CompleteMultipartUpload do not match uploaded parts")]
InvalidPart,
/// Parts given to CompleteMultipartUpload were not in ascending order
#[error(display = "Parts given to CompleteMultipartUpload were not in ascending order")]
InvalidPartOrder,
/// In CompleteMultipartUpload: not enough data
/// (here we are more lenient than AWS S3)
#[error(display = "Proposed upload is smaller than the minimum allowed object size")]
EntityTooSmall,
// Category: bad request // Category: bad request
/// The request contained an invalid UTF-8 sequence in its path or in other parameters /// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)] #[error(display = "Invalid UTF-8: {}", _0)]
@ -143,6 +156,9 @@ impl Error {
Error::BucketAlreadyExists => "BucketAlreadyExists", Error::BucketAlreadyExists => "BucketAlreadyExists",
Error::BucketNotEmpty => "BucketNotEmpty", Error::BucketNotEmpty => "BucketNotEmpty",
Error::PreconditionFailed => "PreconditionFailed", Error::PreconditionFailed => "PreconditionFailed",
Error::InvalidPart => "InvalidPart",
Error::InvalidPartOrder => "InvalidPartOrder",
Error::EntityTooSmall => "EntityTooSmall",
Error::Forbidden(_) => "AccessDenied", Error::Forbidden(_) => "AccessDenied",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
Error::NotImplemented(_) => "NotImplemented", Error::NotImplemented(_) => "NotImplemented",

View file

@ -3,6 +3,10 @@ use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH}; use std::time::{Duration, UNIX_EPOCH};
use futures::stream::*; use futures::stream::*;
use http::header::{
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
IF_NONE_MATCH, LAST_MODIFIED,
};
use hyper::body::Bytes; use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
@ -24,15 +28,12 @@ fn object_headers(
let date_str = httpdate::fmt_http_date(date); let date_str = httpdate::fmt_http_date(date);
let mut resp = Response::builder() let mut resp = Response::builder()
.header( .header(CONTENT_TYPE, version_meta.headers.content_type.to_string())
"Content-Type", .header(LAST_MODIFIED, date_str)
version_meta.headers.content_type.to_string(), .header(ACCEPT_RANGES, "bytes".to_string());
)
.header("Last-Modified", date_str)
.header("Accept-Ranges", "bytes".to_string());
if !version_meta.etag.is_empty() { if !version_meta.etag.is_empty() {
resp = resp.header("ETag", format!("\"{}\"", version_meta.etag)); resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag));
} }
for (k, v) in version_meta.headers.other.iter() { for (k, v) in version_meta.headers.other.iter() {
@ -52,7 +53,7 @@ fn try_answer_cached(
// precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational // precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational
// being that etag based matching is more accurate, it has no issue with sub-second precision // being that etag based matching is more accurate, it has no issue with sub-second precision
// for instance (in case of very fast updates) // for instance (in case of very fast updates)
let cached = if let Some(none_match) = req.headers().get(http::header::IF_NONE_MATCH) { let cached = if let Some(none_match) = req.headers().get(IF_NONE_MATCH) {
let none_match = none_match.to_str().ok()?; let none_match = none_match.to_str().ok()?;
let expected = format!("\"{}\"", version_meta.etag); let expected = format!("\"{}\"", version_meta.etag);
let found = none_match let found = none_match
@ -60,7 +61,7 @@ fn try_answer_cached(
.map(str::trim) .map(str::trim)
.any(|etag| etag == expected || etag == "\"*\""); .any(|etag| etag == expected || etag == "\"*\"");
found found
} else if let Some(modified_since) = req.headers().get(http::header::IF_MODIFIED_SINCE) { } else if let Some(modified_since) = req.headers().get(IF_MODIFIED_SINCE) {
let modified_since = modified_since.to_str().ok()?; let modified_since = modified_since.to_str().ok()?;
let client_date = httpdate::parse_http_date(modified_since).ok()?; let client_date = httpdate::parse_http_date(modified_since).ok()?;
let server_date = UNIX_EPOCH + Duration::from_millis(version.timestamp); let server_date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
@ -87,6 +88,7 @@ pub async fn handle_head(
req: &Request<Body>, req: &Request<Body>,
bucket_id: Uuid, bucket_id: Uuid,
key: &str, key: &str,
part_number: Option<u64>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let object = garage let object = garage
.object_table .object_table
@ -94,30 +96,68 @@ pub async fn handle_head(
.await? .await?
.ok_or(Error::NoSuchKey)?; .ok_or(Error::NoSuchKey)?;
let version = object let object_version = object
.versions() .versions()
.iter() .iter()
.rev() .rev()
.find(|v| v.is_data()) .find(|v| v.is_data())
.ok_or(Error::NoSuchKey)?; .ok_or(Error::NoSuchKey)?;
let version_meta = match &version.state { let version_data = match &object_version.state {
ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, ObjectVersionState::Complete(c) => c,
ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
_ => unreachable!(), _ => unreachable!(),
}; };
if let Some(cached) = try_answer_cached(version, version_meta, req) { let version_meta = match version_data {
ObjectVersionData::Inline(meta, _) => meta,
ObjectVersionData::FirstBlock(meta, _) => meta,
_ => unreachable!(),
};
if let Some(cached) = try_answer_cached(object_version, version_meta, req) {
return Ok(cached); return Ok(cached);
} }
let body: Body = Body::empty(); if let Some(pn) = part_number {
let response = object_headers(version, version_meta) if let ObjectVersionData::Inline(_, _) = version_data {
.header("Content-Length", format!("{}", version_meta.size)) // Not a multipart upload
.status(StatusCode::OK) return Err(Error::BadRequest(
.body(body) "Cannot process part_number argument: not a multipart upload".into(),
.unwrap(); ));
Ok(response) }
let version = garage
.version_table
.get(&object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?;
if !version.has_part_number(pn) {
return Err(Error::BadRequest(format!(
"Part number {} does not exist",
pn
)));
}
let part_size: u64 = version
.blocks
.items()
.iter()
.filter(|(k, _)| k.part_number == pn)
.map(|(_, b)| b.size)
.sum();
let n_parts = version.parts_etags.items().len();
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", part_size))
.header("x-amz-mp-parts-count", format!("{}", n_parts))
.status(StatusCode::OK)
.body(Body::empty())?)
} else {
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK)
.body(Body::empty())?)
}
} }
/// Handle GET request /// Handle GET request
@ -126,7 +166,14 @@ pub async fn handle_get(
req: &Request<Body>, req: &Request<Body>,
bucket_id: Uuid, bucket_id: Uuid,
key: &str, key: &str,
part_number: Option<u64>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
if part_number.is_some() {
return Err(Error::NotImplemented(
"part_number not supported for GetObject".into(),
));
}
let object = garage let object = garage
.object_table .object_table
.get(&bucket_id, &key.to_string()) .get(&bucket_id, &key.to_string())
@ -182,7 +229,7 @@ pub async fn handle_get(
} }
let resp_builder = object_headers(last_v, last_v_meta) let resp_builder = object_headers(last_v, last_v_meta)
.header("Content-Length", format!("{}", last_v_meta.size)) .header(CONTENT_LENGTH, format!("{}", last_v_meta.size))
.status(StatusCode::OK); .status(StatusCode::OK);
match &last_v_data { match &last_v_data {
@ -238,9 +285,9 @@ async fn handle_get_range(
end: u64, end: u64,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let resp_builder = object_headers(version, version_meta) let resp_builder = object_headers(version, version_meta)
.header("Content-Length", format!("{}", end - begin)) .header(CONTENT_LENGTH, format!("{}", end - begin))
.header( .header(
"Content-Range", CONTENT_RANGE,
format!("bytes {}-{}/{}", begin, end - 1, version_meta.size), format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
) )
.status(StatusCode::PARTIAL_CONTENT); .status(StatusCode::PARTIAL_CONTENT);

View file

@ -1,4 +1,4 @@
use std::collections::{BTreeMap, VecDeque}; use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::Arc; use std::sync::Arc;
use chrono::{DateTime, NaiveDateTime, Utc}; use chrono::{DateTime, NaiveDateTime, Utc};
@ -520,6 +520,7 @@ pub async fn handle_complete_multipart_upload(
let version_uuid = decode_upload_id(upload_id)?; let version_uuid = decode_upload_id(upload_id)?;
// Get object and version
let key = key.to_string(); let key = key.to_string();
let (object, version) = futures::try_join!( let (object, version) = futures::try_join!(
garage.object_table.get(&bucket_id, &key), garage.object_table.get(&bucket_id, &key),
@ -544,6 +545,20 @@ pub async fn handle_complete_multipart_upload(
_ => unreachable!(), _ => unreachable!(),
}; };
// Check that part numbers are an increasing sequence.
// (it doesn't need to start at 1 nor to be a continuous sequence,
// see discussion in #192)
if body_list_of_parts.is_empty() {
return Err(Error::EntityTooSmall);
}
if !body_list_of_parts
.iter()
.zip(body_list_of_parts.iter().skip(1))
.all(|(p1, p2)| p1.part_number < p2.part_number)
{
return Err(Error::InvalidPartOrder);
}
// Check that the list of parts they gave us corresponds to the parts we have here // Check that the list of parts they gave us corresponds to the parts we have here
debug!("Expected parts from request: {:?}", body_list_of_parts); debug!("Expected parts from request: {:?}", body_list_of_parts);
debug!("Parts stored in version: {:?}", version.parts_etags.items()); debug!("Parts stored in version: {:?}", version.parts_etags.items());
@ -556,18 +571,31 @@ pub async fn handle_complete_multipart_upload(
.iter() .iter()
.map(|x| (&x.part_number, &x.etag)) .map(|x| (&x.part_number, &x.etag))
.eq(parts); .eq(parts);
if !same_parts {
return Err(Error::InvalidPart);
}
// Check that all blocks belong to one of the parts
let block_parts = version
.blocks
.items()
.iter()
.map(|(bk, _)| bk.part_number)
.collect::<BTreeSet<_>>();
let same_parts = body_list_of_parts
.iter()
.map(|x| x.part_number)
.eq(block_parts.into_iter());
if !same_parts { if !same_parts {
return Err(Error::BadRequest( return Err(Error::BadRequest(
"We don't have the same parts".to_string(), "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again.".into(),
)); ));
} }
// Calculate etag of final object // Calculate etag of final object
// To understand how etags are calculated, read more here: // To understand how etags are calculated, read more here:
// https://teppen.io/2018/06/23/aws_s3_etags/ // https://teppen.io/2018/06/23/aws_s3_etags/
let num_parts = version.blocks.items().last().unwrap().0.part_number let num_parts = body_list_of_parts.len();
- version.blocks.items().first().unwrap().0.part_number
+ 1;
let mut etag_md5_hasher = Md5::new(); let mut etag_md5_hasher = Md5::new();
for (_, etag) in version.parts_etags.items().iter() { for (_, etag) in version.parts_etags.items().iter() {
etag_md5_hasher.update(etag.as_bytes()); etag_md5_hasher.update(etag.as_bytes());

View file

@ -134,8 +134,8 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
let ret_doc = match *req.method() { let ret_doc = match *req.method() {
Method::OPTIONS => handle_options(req, &bucket).await, Method::OPTIONS => handle_options(req, &bucket).await,
Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key).await, Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await,
Method::GET => handle_get(garage.clone(), req, bucket_id, &key).await, Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await,
_ => Err(ApiError::BadRequest("HTTP method not supported".into())), _ => Err(ApiError::BadRequest("HTTP method not supported".into())),
} }
.map_err(Error::from); .map_err(Error::from);
@ -166,7 +166,7 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
.body(Body::empty()) .body(Body::empty())
.unwrap(); .unwrap();
match handle_get(garage, &req2, bucket_id, &error_document).await { match handle_get(garage, &req2, bucket_id, &error_document, None).await {
Ok(mut error_doc) => { Ok(mut error_doc) => {
// The error won't be logged back in handle_request, // The error won't be logged back in handle_request,
// so log it here // so log it here