Handle part_number in HeadObject and GetObject (#191) #192

Merged
lx merged 2 commits from get-head-part-number into main 2022-01-24 21:00:34 +00:00
5 changed files with 268 additions and 100 deletions

View file

@ -157,8 +157,12 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
let resp = match endpoint {
Endpoint::Options => handle_options(&req, &bucket).await,
Endpoint::HeadObject { key, .. } => handle_head(garage, &req, bucket_id, &key).await,
Endpoint::GetObject { key, .. } => handle_get(garage, &req, bucket_id, &key).await,
Endpoint::HeadObject {
key, part_number, ..
} => handle_head(garage, &req, bucket_id, &key, part_number).await,
Endpoint::GetObject {
key, part_number, ..
} => handle_get(garage, &req, bucket_id, &key, part_number).await,
Endpoint::UploadPart {
key,
part_number,

View file

@ -58,6 +58,19 @@ pub enum Error {
#[error(display = "At least one of the preconditions you specified did not hold")]
PreconditionFailed,
/// Parts specified in CMU request do not match parts actually uploaded
#[error(display = "Parts given to CompleteMultipartUpload do not match uploaded parts")]
InvalidPart,
/// Parts given to CompleteMultipartUpload were not in ascending order
#[error(display = "Parts given to CompleteMultipartUpload were not in ascending order")]
InvalidPartOrder,
/// In CompleteMultipartUpload: not enough data
/// (here we are more lenient than AWS S3)
#[error(display = "Proposed upload is smaller than the minimum allowed object size")]
EntityTooSmall,
// Category: bad request
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)]
@ -143,6 +156,9 @@ impl Error {
Error::BucketAlreadyExists => "BucketAlreadyExists",
Error::BucketNotEmpty => "BucketNotEmpty",
Error::PreconditionFailed => "PreconditionFailed",
Error::InvalidPart => "InvalidPart",
Error::InvalidPartOrder => "InvalidPartOrder",
Error::EntityTooSmall => "EntityTooSmall",
Error::Forbidden(_) => "AccessDenied",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
Error::NotImplemented(_) => "NotImplemented",

View file

@ -3,6 +3,10 @@ use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
use futures::stream::*;
use http::header::{
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
IF_NONE_MATCH, LAST_MODIFIED, RANGE,
};
use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode};
@ -11,6 +15,7 @@ use garage_util::data::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::error::*;
@ -24,15 +29,12 @@ fn object_headers(
let date_str = httpdate::fmt_http_date(date);
let mut resp = Response::builder()
.header(
"Content-Type",
version_meta.headers.content_type.to_string(),
)
.header("Last-Modified", date_str)
.header("Accept-Ranges", "bytes".to_string());
.header(CONTENT_TYPE, version_meta.headers.content_type.to_string())
.header(LAST_MODIFIED, date_str)
.header(ACCEPT_RANGES, "bytes".to_string());
if !version_meta.etag.is_empty() {
resp = resp.header("ETag", format!("\"{}\"", version_meta.etag));
resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag));
}
for (k, v) in version_meta.headers.other.iter() {
@ -52,7 +54,7 @@ fn try_answer_cached(
// precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational
// being that etag based matching is more accurate, it has no issue with sub-second precision
// for instance (in case of very fast updates)
let cached = if let Some(none_match) = req.headers().get(http::header::IF_NONE_MATCH) {
let cached = if let Some(none_match) = req.headers().get(IF_NONE_MATCH) {
let none_match = none_match.to_str().ok()?;
let expected = format!("\"{}\"", version_meta.etag);
let found = none_match
@ -60,7 +62,7 @@ fn try_answer_cached(
.map(str::trim)
.any(|etag| etag == expected || etag == "\"*\"");
found
} else if let Some(modified_since) = req.headers().get(http::header::IF_MODIFIED_SINCE) {
} else if let Some(modified_since) = req.headers().get(IF_MODIFIED_SINCE) {
let modified_since = modified_since.to_str().ok()?;
let client_date = httpdate::parse_http_date(modified_since).ok()?;
let server_date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
@ -87,6 +89,7 @@ pub async fn handle_head(
req: &Request<Body>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
) -> Result<Response<Body>, Error> {
let object = garage
.object_table
@ -94,30 +97,68 @@ pub async fn handle_head(
.await?
.ok_or(Error::NoSuchKey)?;
let version = object
let object_version = object
.versions()
.iter()
.rev()
.find(|v| v.is_data())
.ok_or(Error::NoSuchKey)?;
let version_meta = match &version.state {
ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
let version_data = match &object_version.state {
ObjectVersionState::Complete(c) => c,
_ => unreachable!(),
};
if let Some(cached) = try_answer_cached(version, version_meta, req) {
let version_meta = match version_data {
ObjectVersionData::Inline(meta, _) => meta,
ObjectVersionData::FirstBlock(meta, _) => meta,
_ => unreachable!(),
};
if let Some(cached) = try_answer_cached(object_version, version_meta, req) {
return Ok(cached);
}
let body: Body = Body::empty();
let response = object_headers(version, version_meta)
.header("Content-Length", format!("{}", version_meta.size))
.status(StatusCode::OK)
.body(body)
.unwrap();
Ok(response)
if let Some(pn) = part_number {
if let ObjectVersionData::Inline(_, _) = version_data {
// Not a multipart upload
return Err(Error::BadRequest(
"Cannot process part_number argument: not a multipart upload".into(),
Review

minor missmatch with AWS : if not multipart, AWS consider everything is contained in part 1.

minor missmatch with AWS : if not multipart, AWS consider everything is contained in part 1.
));
}
let version = garage
.version_table
.get(&object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?;
if !version.has_part_number(pn) {
return Err(Error::BadRequest(format!(
"Part number {} does not exist",
pn
)));
}
let part_size: u64 = version
.blocks
.items()
.iter()
.filter(|(k, _)| k.part_number == pn)
.map(|(_, b)| b.size)
.sum();
let n_parts = version.parts_etags.items().len();
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", part_size))
.header("x-amz-mp-parts-count", format!("{}", n_parts))
.status(StatusCode::OK)
Review

AWS returns byte range headers and HTTP 206 (Partial Content) as if it was a range request

Accept-Ranges: bytes
Content-Range: bytes 0-147/148
AWS returns byte range headers and HTTP 206 (Partial Content) as if it was a range request ``` Accept-Ranges: bytes Content-Range: bytes 0-147/148 ```
.body(Body::empty())?)
} else {
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK)
.body(Body::empty())?)
}
}
/// Handle GET request
@ -126,6 +167,7 @@ pub async fn handle_get(
req: &Request<Body>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
) -> Result<Response<Body>, Error> {
let object = garage
.object_table
@ -154,22 +196,13 @@ pub async fn handle_get(
return Ok(cached);
}
let range = match req.headers().get("range") {
Some(range) => {
let range_str = range.to_str()?;
let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size)
.map_err(|e| (e, last_v_meta.size))?;
if ranges.len() > 1 {
// garage does not support multi-range requests yet, so we respond with the entire
// object when multiple ranges are requested
None
} else {
ranges.pop()
}
}
None => None,
};
if let Some(range) = range {
if let Some(pn) = part_number {
return handle_get_part(garage, req, last_v, last_v_data, last_v_meta, pn).await;
}
// No part_number specified, it's a normal get object
if let Some(range) = parse_range_header(req, last_v_meta.size)? {
return handle_get_range(
garage,
last_v,
@ -182,7 +215,7 @@ pub async fn handle_get(
}
let resp_builder = object_headers(last_v, last_v_meta)
.header("Content-Length", format!("{}", last_v_meta.size))
.header(CONTENT_LENGTH, format!("{}", last_v_meta.size))
.status(StatusCode::OK);
match &last_v_data {
@ -238,9 +271,9 @@ async fn handle_get_range(
end: u64,
) -> Result<Response<Body>, Error> {
let resp_builder = object_headers(version, version_meta)
.header("Content-Length", format!("{}", end - begin))
.header(CONTENT_LENGTH, format!("{}", end - begin))
.header(
"Content-Range",
CONTENT_RANGE,
format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
)
.status(StatusCode::PARTIAL_CONTENT);
@ -258,58 +291,145 @@ async fn handle_get_range(
}
}
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
let version = garage.version_table.get(&version.uuid, &EmptyKey).await?;
let version = match version {
Some(v) => v,
None => return Err(Error::NoSuchKey),
};
let version = garage
.version_table
.get(&version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?;
// We will store here the list of blocks that have an intersection with the requested
// range, as well as their "true offset", which is their actual offset in the complete
// file (whereas block.offset designates the offset of the block WITHIN THE PART
// block.part_number, which is not the same in the case of a multipart upload)
let mut blocks = Vec::with_capacity(std::cmp::min(
version.blocks.len(),
4 + ((end - begin) / std::cmp::max(version.blocks.items()[0].1.size as u64, 1024))
as usize,
));
let mut true_offset = 0;
for (_, b) in version.blocks.items().iter() {
if true_offset >= end {
break;
}
// Keep only blocks that have an intersection with the requested range
if true_offset < end && true_offset + b.size > begin {
blocks.push((*b, true_offset));
}
true_offset += b.size;
}
let body_stream = futures::stream::iter(blocks)
.map(move |(block, true_offset)| {
let garage = garage.clone();
async move {
let data = garage.block_manager.rpc_get_block(&block.hash).await?;
let data = Bytes::from(data);
let start_in_block = if true_offset > begin {
0
} else {
begin - true_offset
};
let end_in_block = if true_offset + block.size < end {
block.size
} else {
end - true_offset
};
Result::<Bytes, Error>::Ok(
data.slice(start_in_block as usize..end_in_block as usize),
)
}
})
.buffered(2);
let body = hyper::body::Body::wrap_stream(body_stream);
let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
Ok(resp_builder.body(body)?)
}
}
}
async fn handle_get_part(
garage: Arc<Garage>,
req: &Request<Body>,
object_version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
part_number: u64,
) -> Result<Response<Body>, Error> {
let version = if let ObjectVersionData::FirstBlock(_, _) = version_data {
garage
.version_table
.get(&object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?
} else {
return Err(Error::BadRequest(
"Cannot handle part_number: not a multipart upload.".into(),
Review

Same as above

Same as above
));
};
let blocks = version
.blocks
.items()
.iter()
.filter(|(k, _)| k.part_number == part_number)
.cloned()
.collect::<Vec<_>>();
if blocks.is_empty() {
return Err(Error::BadRequest(format!("No such part: {}", part_number)));
}
let part_size = blocks.iter().map(|(_, b)| b.size).sum();
if let Some(r) = parse_range_header(req, part_size)? {
Review

AWS does not support this kind of request (InvalidRequest: Cannot specify both Range header and partNumber query parameter).
I'm not sure if we should support it, S3 libraries won't ever call that, and it's additional code to maitain. On the other hand, it's not a lot of code, so this message is purely informative, and not necessary requiring any changes

AWS does not support this kind of request (InvalidRequest: Cannot specify both Range header and partNumber query parameter). I'm not sure if we should support it, S3 libraries won't ever call that, and it's additional code to maitain. On the other hand, it's not a lot of code, so this message is purely informative, and not necessary requiring any changes
let range_begin = r.start;
let range_end = r.start + r.length;
let body = body_from_blocks_range(garage, &blocks[..], range_begin, range_end);
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", range_end - range_begin))
.header(
CONTENT_RANGE,
format!("bytes {}-{}/{}", range_begin, range_end - 1, part_size),
)
.status(StatusCode::PARTIAL_CONTENT)
.body(body)?)
} else {
let body = body_from_blocks_range(garage, &blocks[..], 0, part_size);
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", part_size))
.status(StatusCode::OK)
Review

Same as above

Same as above
.body(body)?)
}
}
fn parse_range_header(
req: &Request<Body>,
total_size: u64,
) -> Result<Option<http_range::HttpRange>, Error> {
let range = match req.headers().get(RANGE) {
Some(range) => {
let range_str = range.to_str()?;
let mut ranges =
http_range::HttpRange::parse(range_str, total_size).map_err(|e| (e, total_size))?;
if ranges.len() > 1 {
// garage does not support multi-range requests yet, so we respond with the entire
// object when multiple ranges are requested
None
} else {
ranges.pop()
}
}
None => None,
};
Ok(range)
}
fn body_from_blocks_range(
garage: Arc<Garage>,
all_blocks: &[(VersionBlockKey, VersionBlock)],
begin: u64,
end: u64,
) -> Body {
// We will store here the list of blocks that have an intersection with the requested
// range, as well as their "true offset", which is their actual offset in the complete
// file (whereas block.offset designates the offset of the block WITHIN THE PART
// block.part_number, which is not the same in the case of a multipart upload)
let mut blocks: Vec<(VersionBlock, u64)> = Vec::with_capacity(std::cmp::min(
all_blocks.len(),
4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize,
));
let mut true_offset = 0;
for (_, b) in all_blocks.iter() {
if true_offset >= end {
break;
}
// Keep only blocks that have an intersection with the requested range
if true_offset < end && true_offset + b.size > begin {
blocks.push((*b, true_offset));
}
true_offset += b.size;
}
let body_stream = futures::stream::iter(blocks)
.map(move |(block, true_offset)| {
let garage = garage.clone();
async move {
let data = garage.block_manager.rpc_get_block(&block.hash).await?;
let data = Bytes::from(data);
let start_in_block = if true_offset > begin {
0
} else {
begin - true_offset
};
let end_in_block = if true_offset + block.size < end {
block.size
} else {
end - true_offset
};
Result::<Bytes, Error>::Ok(
data.slice(start_in_block as usize..end_in_block as usize),
)
}
})
.buffered(2);
hyper::body::Body::wrap_stream(body_stream)
}

View file

@ -1,4 +1,4 @@
use std::collections::{BTreeMap, VecDeque};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::Arc;
use chrono::{DateTime, NaiveDateTime, Utc};
@ -520,6 +520,7 @@ pub async fn handle_complete_multipart_upload(
let version_uuid = decode_upload_id(upload_id)?;
// Get object and version
let key = key.to_string();
let (object, version) = futures::try_join!(
garage.object_table.get(&bucket_id, &key),
@ -544,6 +545,20 @@ pub async fn handle_complete_multipart_upload(
_ => unreachable!(),
};
// Check that part numbers are an increasing sequence.
// (it doesn't need to start at 1 nor to be a continuous sequence,
// see discussion in #192)
if body_list_of_parts.is_empty() {
return Err(Error::EntityTooSmall);
}
if !body_list_of_parts
.iter()
.zip(body_list_of_parts.iter().skip(1))
.all(|(p1, p2)| p1.part_number < p2.part_number)
{
return Err(Error::InvalidPartOrder);
}
// Check that the list of parts they gave us corresponds to the parts we have here
debug!("Expected parts from request: {:?}", body_list_of_parts);
debug!("Parts stored in version: {:?}", version.parts_etags.items());
@ -556,18 +571,31 @@ pub async fn handle_complete_multipart_upload(
.iter()
.map(|x| (&x.part_number, &x.etag))
.eq(parts);
if !same_parts {
return Err(Error::InvalidPart);
}
// Check that all blocks belong to one of the parts
let block_parts = version
.blocks
.items()
.iter()
.map(|(bk, _)| bk.part_number)
.collect::<BTreeSet<_>>();
let same_parts = body_list_of_parts
.iter()
.map(|x| x.part_number)
.eq(block_parts.into_iter());
if !same_parts {
return Err(Error::BadRequest(
"We don't have the same parts".to_string(),
"Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again.".into(),
));
}
// Calculate etag of final object
// To understand how etags are calculated, read more here:
// https://teppen.io/2018/06/23/aws_s3_etags/
let num_parts = version.blocks.items().last().unwrap().0.part_number
- version.blocks.items().first().unwrap().0.part_number
+ 1;
let num_parts = body_list_of_parts.len();
let mut etag_md5_hasher = Md5::new();
for (_, etag) in version.parts_etags.items().iter() {
etag_md5_hasher.update(etag.as_bytes());

View file

@ -134,8 +134,8 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
let ret_doc = match *req.method() {
Method::OPTIONS => handle_options(req, &bucket).await,
Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key).await,
Method::GET => handle_get(garage.clone(), req, bucket_id, &key).await,
Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await,
Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await,
_ => Err(ApiError::BadRequest("HTTP method not supported".into())),
}
.map_err(Error::from);
@ -166,7 +166,7 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
.body(Body::empty())
.unwrap();
match handle_get(garage, &req2, bucket_id, &error_document).await {
match handle_get(garage, &req2, bucket_id, &error_document, None).await {
Ok(mut error_doc) => {
// The error won't be logged back in handle_request,
// so log it here