Handle part_number in HeadObject and GetObject (#191) #192

Merged
lx merged 2 commits from get-head-part-number into main 2022-01-24 21:00:34 +00:00
5 changed files with 268 additions and 100 deletions

View file

@ -157,8 +157,12 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
let resp = match endpoint { let resp = match endpoint {
Endpoint::Options => handle_options(&req, &bucket).await, Endpoint::Options => handle_options(&req, &bucket).await,
Endpoint::HeadObject { key, .. } => handle_head(garage, &req, bucket_id, &key).await, Endpoint::HeadObject {
Endpoint::GetObject { key, .. } => handle_get(garage, &req, bucket_id, &key).await, key, part_number, ..
} => handle_head(garage, &req, bucket_id, &key, part_number).await,
Endpoint::GetObject {
key, part_number, ..
} => handle_get(garage, &req, bucket_id, &key, part_number).await,
Endpoint::UploadPart { Endpoint::UploadPart {
key, key,
part_number, part_number,

View file

@ -58,6 +58,19 @@ pub enum Error {
#[error(display = "At least one of the preconditions you specified did not hold")] #[error(display = "At least one of the preconditions you specified did not hold")]
PreconditionFailed, PreconditionFailed,
/// Parts specified in CMU request do not match parts actually uploaded
#[error(display = "Parts given to CompleteMultipartUpload do not match uploaded parts")]
InvalidPart,
/// Parts given to CompleteMultipartUpload were not in ascending order
#[error(display = "Parts given to CompleteMultipartUpload were not in ascending order")]
InvalidPartOrder,
/// In CompleteMultipartUpload: not enough data
/// (here we are more lenient than AWS S3)
#[error(display = "Proposed upload is smaller than the minimum allowed object size")]
EntityTooSmall,
// Category: bad request // Category: bad request
/// The request contained an invalid UTF-8 sequence in its path or in other parameters /// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)] #[error(display = "Invalid UTF-8: {}", _0)]
@ -143,6 +156,9 @@ impl Error {
Error::BucketAlreadyExists => "BucketAlreadyExists", Error::BucketAlreadyExists => "BucketAlreadyExists",
Error::BucketNotEmpty => "BucketNotEmpty", Error::BucketNotEmpty => "BucketNotEmpty",
Error::PreconditionFailed => "PreconditionFailed", Error::PreconditionFailed => "PreconditionFailed",
Error::InvalidPart => "InvalidPart",
Error::InvalidPartOrder => "InvalidPartOrder",
Error::EntityTooSmall => "EntityTooSmall",
Error::Forbidden(_) => "AccessDenied", Error::Forbidden(_) => "AccessDenied",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
Error::NotImplemented(_) => "NotImplemented", Error::NotImplemented(_) => "NotImplemented",

View file

@ -3,6 +3,10 @@ use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH}; use std::time::{Duration, UNIX_EPOCH};
use futures::stream::*; use futures::stream::*;
use http::header::{
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
IF_NONE_MATCH, LAST_MODIFIED, RANGE,
};
use hyper::body::Bytes; use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
@ -11,6 +15,7 @@ use garage_util::data::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::object_table::*; use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::error::*; use crate::error::*;
@ -24,15 +29,12 @@ fn object_headers(
let date_str = httpdate::fmt_http_date(date); let date_str = httpdate::fmt_http_date(date);
let mut resp = Response::builder() let mut resp = Response::builder()
.header( .header(CONTENT_TYPE, version_meta.headers.content_type.to_string())
"Content-Type", .header(LAST_MODIFIED, date_str)
version_meta.headers.content_type.to_string(), .header(ACCEPT_RANGES, "bytes".to_string());
)
.header("Last-Modified", date_str)
.header("Accept-Ranges", "bytes".to_string());
if !version_meta.etag.is_empty() { if !version_meta.etag.is_empty() {
resp = resp.header("ETag", format!("\"{}\"", version_meta.etag)); resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag));
} }
for (k, v) in version_meta.headers.other.iter() { for (k, v) in version_meta.headers.other.iter() {
@ -52,7 +54,7 @@ fn try_answer_cached(
// precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational // precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational
// being that etag based matching is more accurate, it has no issue with sub-second precision // being that etag based matching is more accurate, it has no issue with sub-second precision
// for instance (in case of very fast updates) // for instance (in case of very fast updates)
let cached = if let Some(none_match) = req.headers().get(http::header::IF_NONE_MATCH) { let cached = if let Some(none_match) = req.headers().get(IF_NONE_MATCH) {
let none_match = none_match.to_str().ok()?; let none_match = none_match.to_str().ok()?;
let expected = format!("\"{}\"", version_meta.etag); let expected = format!("\"{}\"", version_meta.etag);
let found = none_match let found = none_match
@ -60,7 +62,7 @@ fn try_answer_cached(
.map(str::trim) .map(str::trim)
.any(|etag| etag == expected || etag == "\"*\""); .any(|etag| etag == expected || etag == "\"*\"");
found found
} else if let Some(modified_since) = req.headers().get(http::header::IF_MODIFIED_SINCE) { } else if let Some(modified_since) = req.headers().get(IF_MODIFIED_SINCE) {
let modified_since = modified_since.to_str().ok()?; let modified_since = modified_since.to_str().ok()?;
let client_date = httpdate::parse_http_date(modified_since).ok()?; let client_date = httpdate::parse_http_date(modified_since).ok()?;
let server_date = UNIX_EPOCH + Duration::from_millis(version.timestamp); let server_date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
@ -87,6 +89,7 @@ pub async fn handle_head(
req: &Request<Body>, req: &Request<Body>,
bucket_id: Uuid, bucket_id: Uuid,
key: &str, key: &str,
part_number: Option<u64>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let object = garage let object = garage
.object_table .object_table
@ -94,30 +97,68 @@ pub async fn handle_head(
.await? .await?
.ok_or(Error::NoSuchKey)?; .ok_or(Error::NoSuchKey)?;
let version = object let object_version = object
.versions() .versions()
.iter() .iter()
.rev() .rev()
.find(|v| v.is_data()) .find(|v| v.is_data())
.ok_or(Error::NoSuchKey)?; .ok_or(Error::NoSuchKey)?;
let version_meta = match &version.state { let version_data = match &object_version.state {
ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, ObjectVersionState::Complete(c) => c,
ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
_ => unreachable!(), _ => unreachable!(),
}; };
if let Some(cached) = try_answer_cached(version, version_meta, req) { let version_meta = match version_data {
ObjectVersionData::Inline(meta, _) => meta,
ObjectVersionData::FirstBlock(meta, _) => meta,
_ => unreachable!(),
};
if let Some(cached) = try_answer_cached(object_version, version_meta, req) {
return Ok(cached); return Ok(cached);
} }
let body: Body = Body::empty(); if let Some(pn) = part_number {
let response = object_headers(version, version_meta) if let ObjectVersionData::Inline(_, _) = version_data {
.header("Content-Length", format!("{}", version_meta.size)) // Not a multipart upload
return Err(Error::BadRequest(
"Cannot process part_number argument: not a multipart upload".into(),
Review

minor missmatch with AWS : if not multipart, AWS consider everything is contained in part 1.

minor missmatch with AWS : if not multipart, AWS consider everything is contained in part 1.
));
}
let version = garage
.version_table
.get(&object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?;
if !version.has_part_number(pn) {
return Err(Error::BadRequest(format!(
"Part number {} does not exist",
pn
)));
}
let part_size: u64 = version
.blocks
.items()
.iter()
.filter(|(k, _)| k.part_number == pn)
.map(|(_, b)| b.size)
.sum();
let n_parts = version.parts_etags.items().len();
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", part_size))
.header("x-amz-mp-parts-count", format!("{}", n_parts))
.status(StatusCode::OK) .status(StatusCode::OK)
Review

AWS returns byte range headers and HTTP 206 (Partial Content) as if it was a range request

Accept-Ranges: bytes
Content-Range: bytes 0-147/148
AWS returns byte range headers and HTTP 206 (Partial Content) as if it was a range request ``` Accept-Ranges: bytes Content-Range: bytes 0-147/148 ```
.body(body) .body(Body::empty())?)
.unwrap(); } else {
Ok(response) Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK)
.body(Body::empty())?)
}
} }
/// Handle GET request /// Handle GET request
@ -126,6 +167,7 @@ pub async fn handle_get(
req: &Request<Body>, req: &Request<Body>,
bucket_id: Uuid, bucket_id: Uuid,
key: &str, key: &str,
part_number: Option<u64>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let object = garage let object = garage
.object_table .object_table
@ -154,22 +196,13 @@ pub async fn handle_get(
return Ok(cached); return Ok(cached);
} }
let range = match req.headers().get("range") { if let Some(pn) = part_number {
Some(range) => { return handle_get_part(garage, req, last_v, last_v_data, last_v_meta, pn).await;
let range_str = range.to_str()?;
let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size)
.map_err(|e| (e, last_v_meta.size))?;
if ranges.len() > 1 {
// garage does not support multi-range requests yet, so we respond with the entire
// object when multiple ranges are requested
None
} else {
ranges.pop()
} }
}
None => None, // No part_number specified, it's a normal get object
};
if let Some(range) = range { if let Some(range) = parse_range_header(req, last_v_meta.size)? {
return handle_get_range( return handle_get_range(
garage, garage,
last_v, last_v,
@ -182,7 +215,7 @@ pub async fn handle_get(
} }
let resp_builder = object_headers(last_v, last_v_meta) let resp_builder = object_headers(last_v, last_v_meta)
.header("Content-Length", format!("{}", last_v_meta.size)) .header(CONTENT_LENGTH, format!("{}", last_v_meta.size))
.status(StatusCode::OK); .status(StatusCode::OK);
match &last_v_data { match &last_v_data {
@ -238,9 +271,9 @@ async fn handle_get_range(
end: u64, end: u64,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let resp_builder = object_headers(version, version_meta) let resp_builder = object_headers(version, version_meta)
.header("Content-Length", format!("{}", end - begin)) .header(CONTENT_LENGTH, format!("{}", end - begin))
.header( .header(
"Content-Range", CONTENT_RANGE,
format!("bytes {}-{}/{}", begin, end - 1, version_meta.size), format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
) )
.status(StatusCode::PARTIAL_CONTENT); .status(StatusCode::PARTIAL_CONTENT);
@ -258,23 +291,113 @@ async fn handle_get_range(
} }
} }
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => { ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
let version = garage.version_table.get(&version.uuid, &EmptyKey).await?; let version = garage
let version = match version { .version_table
Some(v) => v, .get(&version.uuid, &EmptyKey)
None => return Err(Error::NoSuchKey), .await?
.ok_or(Error::NoSuchKey)?;
let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
Ok(resp_builder.body(body)?)
}
}
}
async fn handle_get_part(
garage: Arc<Garage>,
req: &Request<Body>,
object_version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
part_number: u64,
) -> Result<Response<Body>, Error> {
let version = if let ObjectVersionData::FirstBlock(_, _) = version_data {
garage
.version_table
.get(&object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?
} else {
return Err(Error::BadRequest(
"Cannot handle part_number: not a multipart upload.".into(),
Review

Same as above

Same as above
));
}; };
let blocks = version
.blocks
.items()
.iter()
.filter(|(k, _)| k.part_number == part_number)
.cloned()
.collect::<Vec<_>>();
if blocks.is_empty() {
return Err(Error::BadRequest(format!("No such part: {}", part_number)));
}
let part_size = blocks.iter().map(|(_, b)| b.size).sum();
if let Some(r) = parse_range_header(req, part_size)? {
Review

AWS does not support this kind of request (InvalidRequest: Cannot specify both Range header and partNumber query parameter).
I'm not sure if we should support it, S3 libraries won't ever call that, and it's additional code to maitain. On the other hand, it's not a lot of code, so this message is purely informative, and not necessary requiring any changes

AWS does not support this kind of request (InvalidRequest: Cannot specify both Range header and partNumber query parameter). I'm not sure if we should support it, S3 libraries won't ever call that, and it's additional code to maitain. On the other hand, it's not a lot of code, so this message is purely informative, and not necessary requiring any changes
let range_begin = r.start;
let range_end = r.start + r.length;
let body = body_from_blocks_range(garage, &blocks[..], range_begin, range_end);
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", range_end - range_begin))
.header(
CONTENT_RANGE,
format!("bytes {}-{}/{}", range_begin, range_end - 1, part_size),
)
.status(StatusCode::PARTIAL_CONTENT)
.body(body)?)
} else {
let body = body_from_blocks_range(garage, &blocks[..], 0, part_size);
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", part_size))
.status(StatusCode::OK)
Review

Same as above

Same as above
.body(body)?)
}
}
fn parse_range_header(
req: &Request<Body>,
total_size: u64,
) -> Result<Option<http_range::HttpRange>, Error> {
let range = match req.headers().get(RANGE) {
Some(range) => {
let range_str = range.to_str()?;
let mut ranges =
http_range::HttpRange::parse(range_str, total_size).map_err(|e| (e, total_size))?;
if ranges.len() > 1 {
// garage does not support multi-range requests yet, so we respond with the entire
// object when multiple ranges are requested
None
} else {
ranges.pop()
}
}
None => None,
};
Ok(range)
}
fn body_from_blocks_range(
garage: Arc<Garage>,
all_blocks: &[(VersionBlockKey, VersionBlock)],
begin: u64,
end: u64,
) -> Body {
// We will store here the list of blocks that have an intersection with the requested // We will store here the list of blocks that have an intersection with the requested
// range, as well as their "true offset", which is their actual offset in the complete // range, as well as their "true offset", which is their actual offset in the complete
// file (whereas block.offset designates the offset of the block WITHIN THE PART // file (whereas block.offset designates the offset of the block WITHIN THE PART
// block.part_number, which is not the same in the case of a multipart upload) // block.part_number, which is not the same in the case of a multipart upload)
let mut blocks = Vec::with_capacity(std::cmp::min( let mut blocks: Vec<(VersionBlock, u64)> = Vec::with_capacity(std::cmp::min(
version.blocks.len(), all_blocks.len(),
4 + ((end - begin) / std::cmp::max(version.blocks.items()[0].1.size as u64, 1024)) 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize,
as usize,
)); ));
let mut true_offset = 0; let mut true_offset = 0;
for (_, b) in version.blocks.items().iter() { for (_, b) in all_blocks.iter() {
if true_offset >= end { if true_offset >= end {
break; break;
} }
@ -308,8 +431,5 @@ async fn handle_get_range(
}) })
.buffered(2); .buffered(2);
let body = hyper::body::Body::wrap_stream(body_stream); hyper::body::Body::wrap_stream(body_stream)
Ok(resp_builder.body(body)?)
}
}
} }

View file

@ -1,4 +1,4 @@
use std::collections::{BTreeMap, VecDeque}; use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::Arc; use std::sync::Arc;
use chrono::{DateTime, NaiveDateTime, Utc}; use chrono::{DateTime, NaiveDateTime, Utc};
@ -520,6 +520,7 @@ pub async fn handle_complete_multipart_upload(
let version_uuid = decode_upload_id(upload_id)?; let version_uuid = decode_upload_id(upload_id)?;
// Get object and version
let key = key.to_string(); let key = key.to_string();
let (object, version) = futures::try_join!( let (object, version) = futures::try_join!(
garage.object_table.get(&bucket_id, &key), garage.object_table.get(&bucket_id, &key),
@ -544,6 +545,20 @@ pub async fn handle_complete_multipart_upload(
_ => unreachable!(), _ => unreachable!(),
}; };
// Check that part numbers are an increasing sequence.
// (it doesn't need to start at 1 nor to be a continuous sequence,
// see discussion in #192)
if body_list_of_parts.is_empty() {
return Err(Error::EntityTooSmall);
}
if !body_list_of_parts
.iter()
.zip(body_list_of_parts.iter().skip(1))
.all(|(p1, p2)| p1.part_number < p2.part_number)
{
return Err(Error::InvalidPartOrder);
}
// Check that the list of parts they gave us corresponds to the parts we have here // Check that the list of parts they gave us corresponds to the parts we have here
debug!("Expected parts from request: {:?}", body_list_of_parts); debug!("Expected parts from request: {:?}", body_list_of_parts);
debug!("Parts stored in version: {:?}", version.parts_etags.items()); debug!("Parts stored in version: {:?}", version.parts_etags.items());
@ -556,18 +571,31 @@ pub async fn handle_complete_multipart_upload(
.iter() .iter()
.map(|x| (&x.part_number, &x.etag)) .map(|x| (&x.part_number, &x.etag))
.eq(parts); .eq(parts);
if !same_parts {
return Err(Error::InvalidPart);
}
// Check that all blocks belong to one of the parts
let block_parts = version
.blocks
.items()
.iter()
.map(|(bk, _)| bk.part_number)
.collect::<BTreeSet<_>>();
let same_parts = body_list_of_parts
.iter()
.map(|x| x.part_number)
.eq(block_parts.into_iter());
if !same_parts { if !same_parts {
return Err(Error::BadRequest( return Err(Error::BadRequest(
"We don't have the same parts".to_string(), "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again.".into(),
)); ));
} }
// Calculate etag of final object // Calculate etag of final object
// To understand how etags are calculated, read more here: // To understand how etags are calculated, read more here:
// https://teppen.io/2018/06/23/aws_s3_etags/ // https://teppen.io/2018/06/23/aws_s3_etags/
let num_parts = version.blocks.items().last().unwrap().0.part_number let num_parts = body_list_of_parts.len();
- version.blocks.items().first().unwrap().0.part_number
+ 1;
let mut etag_md5_hasher = Md5::new(); let mut etag_md5_hasher = Md5::new();
for (_, etag) in version.parts_etags.items().iter() { for (_, etag) in version.parts_etags.items().iter() {
etag_md5_hasher.update(etag.as_bytes()); etag_md5_hasher.update(etag.as_bytes());

View file

@ -134,8 +134,8 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
let ret_doc = match *req.method() { let ret_doc = match *req.method() {
Method::OPTIONS => handle_options(req, &bucket).await, Method::OPTIONS => handle_options(req, &bucket).await,
Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key).await, Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await,
Method::GET => handle_get(garage.clone(), req, bucket_id, &key).await, Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await,
_ => Err(ApiError::BadRequest("HTTP method not supported".into())), _ => Err(ApiError::BadRequest("HTTP method not supported".into())),
} }
.map_err(Error::from); .map_err(Error::from);
@ -166,7 +166,7 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
.body(Body::empty()) .body(Body::empty())
.unwrap(); .unwrap();
match handle_get(garage, &req2, bucket_id, &error_document).await { match handle_get(garage, &req2, bucket_id, &error_document, None).await {
Ok(mut error_doc) => { Ok(mut error_doc) => {
// The error won't be logged back in handle_request, // The error won't be logged back in handle_request,
// so log it here // so log it here