Handle part_number in HeadObject and GetObject (#191) #192
5 changed files with 268 additions and 100 deletions
|
@ -157,8 +157,12 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
|
||||||
|
|
||||||
let resp = match endpoint {
|
let resp = match endpoint {
|
||||||
Endpoint::Options => handle_options(&req, &bucket).await,
|
Endpoint::Options => handle_options(&req, &bucket).await,
|
||||||
Endpoint::HeadObject { key, .. } => handle_head(garage, &req, bucket_id, &key).await,
|
Endpoint::HeadObject {
|
||||||
Endpoint::GetObject { key, .. } => handle_get(garage, &req, bucket_id, &key).await,
|
key, part_number, ..
|
||||||
|
} => handle_head(garage, &req, bucket_id, &key, part_number).await,
|
||||||
|
Endpoint::GetObject {
|
||||||
|
key, part_number, ..
|
||||||
|
} => handle_get(garage, &req, bucket_id, &key, part_number).await,
|
||||||
Endpoint::UploadPart {
|
Endpoint::UploadPart {
|
||||||
key,
|
key,
|
||||||
part_number,
|
part_number,
|
||||||
|
|
|
@ -58,6 +58,19 @@ pub enum Error {
|
||||||
#[error(display = "At least one of the preconditions you specified did not hold")]
|
#[error(display = "At least one of the preconditions you specified did not hold")]
|
||||||
PreconditionFailed,
|
PreconditionFailed,
|
||||||
|
|
||||||
|
/// Parts specified in CMU request do not match parts actually uploaded
|
||||||
|
#[error(display = "Parts given to CompleteMultipartUpload do not match uploaded parts")]
|
||||||
|
InvalidPart,
|
||||||
|
|
||||||
|
/// Parts given to CompleteMultipartUpload were not in ascending order
|
||||||
|
#[error(display = "Parts given to CompleteMultipartUpload were not in ascending order")]
|
||||||
|
InvalidPartOrder,
|
||||||
|
|
||||||
|
/// In CompleteMultipartUpload: not enough data
|
||||||
|
/// (here we are more lenient than AWS S3)
|
||||||
|
#[error(display = "Proposed upload is smaller than the minimum allowed object size")]
|
||||||
|
EntityTooSmall,
|
||||||
|
|
||||||
// Category: bad request
|
// Category: bad request
|
||||||
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
|
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
|
||||||
#[error(display = "Invalid UTF-8: {}", _0)]
|
#[error(display = "Invalid UTF-8: {}", _0)]
|
||||||
|
@ -143,6 +156,9 @@ impl Error {
|
||||||
Error::BucketAlreadyExists => "BucketAlreadyExists",
|
Error::BucketAlreadyExists => "BucketAlreadyExists",
|
||||||
Error::BucketNotEmpty => "BucketNotEmpty",
|
Error::BucketNotEmpty => "BucketNotEmpty",
|
||||||
Error::PreconditionFailed => "PreconditionFailed",
|
Error::PreconditionFailed => "PreconditionFailed",
|
||||||
|
Error::InvalidPart => "InvalidPart",
|
||||||
|
Error::InvalidPartOrder => "InvalidPartOrder",
|
||||||
|
Error::EntityTooSmall => "EntityTooSmall",
|
||||||
Error::Forbidden(_) => "AccessDenied",
|
Error::Forbidden(_) => "AccessDenied",
|
||||||
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
|
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
|
||||||
Error::NotImplemented(_) => "NotImplemented",
|
Error::NotImplemented(_) => "NotImplemented",
|
||||||
|
|
|
@ -3,6 +3,10 @@ use std::sync::Arc;
|
||||||
use std::time::{Duration, UNIX_EPOCH};
|
use std::time::{Duration, UNIX_EPOCH};
|
||||||
|
|
||||||
use futures::stream::*;
|
use futures::stream::*;
|
||||||
|
use http::header::{
|
||||||
|
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
|
||||||
|
IF_NONE_MATCH, LAST_MODIFIED, RANGE,
|
||||||
|
};
|
||||||
use hyper::body::Bytes;
|
use hyper::body::Bytes;
|
||||||
use hyper::{Body, Request, Response, StatusCode};
|
use hyper::{Body, Request, Response, StatusCode};
|
||||||
|
|
||||||
|
@ -11,6 +15,7 @@ use garage_util::data::*;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::object_table::*;
|
use garage_model::object_table::*;
|
||||||
|
use garage_model::version_table::*;
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
|
|
||||||
|
@ -24,15 +29,12 @@ fn object_headers(
|
||||||
let date_str = httpdate::fmt_http_date(date);
|
let date_str = httpdate::fmt_http_date(date);
|
||||||
|
|
||||||
let mut resp = Response::builder()
|
let mut resp = Response::builder()
|
||||||
.header(
|
.header(CONTENT_TYPE, version_meta.headers.content_type.to_string())
|
||||||
"Content-Type",
|
.header(LAST_MODIFIED, date_str)
|
||||||
version_meta.headers.content_type.to_string(),
|
.header(ACCEPT_RANGES, "bytes".to_string());
|
||||||
)
|
|
||||||
.header("Last-Modified", date_str)
|
|
||||||
.header("Accept-Ranges", "bytes".to_string());
|
|
||||||
|
|
||||||
if !version_meta.etag.is_empty() {
|
if !version_meta.etag.is_empty() {
|
||||||
resp = resp.header("ETag", format!("\"{}\"", version_meta.etag));
|
resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (k, v) in version_meta.headers.other.iter() {
|
for (k, v) in version_meta.headers.other.iter() {
|
||||||
|
@ -52,7 +54,7 @@ fn try_answer_cached(
|
||||||
// precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational
|
// precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational
|
||||||
// being that etag based matching is more accurate, it has no issue with sub-second precision
|
// being that etag based matching is more accurate, it has no issue with sub-second precision
|
||||||
// for instance (in case of very fast updates)
|
// for instance (in case of very fast updates)
|
||||||
let cached = if let Some(none_match) = req.headers().get(http::header::IF_NONE_MATCH) {
|
let cached = if let Some(none_match) = req.headers().get(IF_NONE_MATCH) {
|
||||||
let none_match = none_match.to_str().ok()?;
|
let none_match = none_match.to_str().ok()?;
|
||||||
let expected = format!("\"{}\"", version_meta.etag);
|
let expected = format!("\"{}\"", version_meta.etag);
|
||||||
let found = none_match
|
let found = none_match
|
||||||
|
@ -60,7 +62,7 @@ fn try_answer_cached(
|
||||||
.map(str::trim)
|
.map(str::trim)
|
||||||
.any(|etag| etag == expected || etag == "\"*\"");
|
.any(|etag| etag == expected || etag == "\"*\"");
|
||||||
found
|
found
|
||||||
} else if let Some(modified_since) = req.headers().get(http::header::IF_MODIFIED_SINCE) {
|
} else if let Some(modified_since) = req.headers().get(IF_MODIFIED_SINCE) {
|
||||||
let modified_since = modified_since.to_str().ok()?;
|
let modified_since = modified_since.to_str().ok()?;
|
||||||
let client_date = httpdate::parse_http_date(modified_since).ok()?;
|
let client_date = httpdate::parse_http_date(modified_since).ok()?;
|
||||||
let server_date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
|
let server_date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
|
||||||
|
@ -87,6 +89,7 @@ pub async fn handle_head(
|
||||||
req: &Request<Body>,
|
req: &Request<Body>,
|
||||||
bucket_id: Uuid,
|
bucket_id: Uuid,
|
||||||
key: &str,
|
key: &str,
|
||||||
|
part_number: Option<u64>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let object = garage
|
let object = garage
|
||||||
.object_table
|
.object_table
|
||||||
|
@ -94,30 +97,68 @@ pub async fn handle_head(
|
||||||
.await?
|
.await?
|
||||||
.ok_or(Error::NoSuchKey)?;
|
.ok_or(Error::NoSuchKey)?;
|
||||||
|
|
||||||
let version = object
|
let object_version = object
|
||||||
.versions()
|
.versions()
|
||||||
.iter()
|
.iter()
|
||||||
.rev()
|
.rev()
|
||||||
.find(|v| v.is_data())
|
.find(|v| v.is_data())
|
||||||
.ok_or(Error::NoSuchKey)?;
|
.ok_or(Error::NoSuchKey)?;
|
||||||
|
|
||||||
let version_meta = match &version.state {
|
let version_data = match &object_version.state {
|
||||||
ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
|
ObjectVersionState::Complete(c) => c,
|
||||||
ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
|
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(cached) = try_answer_cached(version, version_meta, req) {
|
let version_meta = match version_data {
|
||||||
|
ObjectVersionData::Inline(meta, _) => meta,
|
||||||
|
ObjectVersionData::FirstBlock(meta, _) => meta,
|
||||||
|
_ => unreachable!(),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(cached) = try_answer_cached(object_version, version_meta, req) {
|
||||||
return Ok(cached);
|
return Ok(cached);
|
||||||
}
|
}
|
||||||
|
|
||||||
let body: Body = Body::empty();
|
if let Some(pn) = part_number {
|
||||||
let response = object_headers(version, version_meta)
|
if let ObjectVersionData::Inline(_, _) = version_data {
|
||||||
.header("Content-Length", format!("{}", version_meta.size))
|
// Not a multipart upload
|
||||||
|
return Err(Error::BadRequest(
|
||||||
|
"Cannot process part_number argument: not a multipart upload".into(),
|
||||||
|
|||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let version = garage
|
||||||
|
.version_table
|
||||||
|
.get(&object_version.uuid, &EmptyKey)
|
||||||
|
.await?
|
||||||
|
.ok_or(Error::NoSuchKey)?;
|
||||||
|
if !version.has_part_number(pn) {
|
||||||
|
return Err(Error::BadRequest(format!(
|
||||||
|
"Part number {} does not exist",
|
||||||
|
pn
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let part_size: u64 = version
|
||||||
|
.blocks
|
||||||
|
.items()
|
||||||
|
.iter()
|
||||||
|
.filter(|(k, _)| k.part_number == pn)
|
||||||
|
.map(|(_, b)| b.size)
|
||||||
|
.sum();
|
||||||
|
let n_parts = version.parts_etags.items().len();
|
||||||
|
|
||||||
|
Ok(object_headers(object_version, version_meta)
|
||||||
|
.header(CONTENT_LENGTH, format!("{}", part_size))
|
||||||
|
.header("x-amz-mp-parts-count", format!("{}", n_parts))
|
||||||
.status(StatusCode::OK)
|
.status(StatusCode::OK)
|
||||||
trinity-1686a
commented
AWS returns byte range headers and HTTP 206 (Partial Content) as if it was a range request
AWS returns byte range headers and HTTP 206 (Partial Content) as if it was a range request
```
Accept-Ranges: bytes
Content-Range: bytes 0-147/148
```
|
|||||||
.body(body)
|
.body(Body::empty())?)
|
||||||
.unwrap();
|
} else {
|
||||||
Ok(response)
|
Ok(object_headers(object_version, version_meta)
|
||||||
|
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.body(Body::empty())?)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle GET request
|
/// Handle GET request
|
||||||
|
@ -126,6 +167,7 @@ pub async fn handle_get(
|
||||||
req: &Request<Body>,
|
req: &Request<Body>,
|
||||||
bucket_id: Uuid,
|
bucket_id: Uuid,
|
||||||
key: &str,
|
key: &str,
|
||||||
|
part_number: Option<u64>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let object = garage
|
let object = garage
|
||||||
.object_table
|
.object_table
|
||||||
|
@ -154,22 +196,13 @@ pub async fn handle_get(
|
||||||
return Ok(cached);
|
return Ok(cached);
|
||||||
}
|
}
|
||||||
|
|
||||||
let range = match req.headers().get("range") {
|
if let Some(pn) = part_number {
|
||||||
Some(range) => {
|
return handle_get_part(garage, req, last_v, last_v_data, last_v_meta, pn).await;
|
||||||
let range_str = range.to_str()?;
|
|
||||||
let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size)
|
|
||||||
.map_err(|e| (e, last_v_meta.size))?;
|
|
||||||
if ranges.len() > 1 {
|
|
||||||
// garage does not support multi-range requests yet, so we respond with the entire
|
|
||||||
// object when multiple ranges are requested
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
ranges.pop()
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
None => None,
|
// No part_number specified, it's a normal get object
|
||||||
};
|
|
||||||
if let Some(range) = range {
|
if let Some(range) = parse_range_header(req, last_v_meta.size)? {
|
||||||
return handle_get_range(
|
return handle_get_range(
|
||||||
garage,
|
garage,
|
||||||
last_v,
|
last_v,
|
||||||
|
@ -182,7 +215,7 @@ pub async fn handle_get(
|
||||||
}
|
}
|
||||||
|
|
||||||
let resp_builder = object_headers(last_v, last_v_meta)
|
let resp_builder = object_headers(last_v, last_v_meta)
|
||||||
.header("Content-Length", format!("{}", last_v_meta.size))
|
.header(CONTENT_LENGTH, format!("{}", last_v_meta.size))
|
||||||
.status(StatusCode::OK);
|
.status(StatusCode::OK);
|
||||||
|
|
||||||
match &last_v_data {
|
match &last_v_data {
|
||||||
|
@ -238,9 +271,9 @@ async fn handle_get_range(
|
||||||
end: u64,
|
end: u64,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let resp_builder = object_headers(version, version_meta)
|
let resp_builder = object_headers(version, version_meta)
|
||||||
.header("Content-Length", format!("{}", end - begin))
|
.header(CONTENT_LENGTH, format!("{}", end - begin))
|
||||||
.header(
|
.header(
|
||||||
"Content-Range",
|
CONTENT_RANGE,
|
||||||
format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
|
format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
|
||||||
)
|
)
|
||||||
.status(StatusCode::PARTIAL_CONTENT);
|
.status(StatusCode::PARTIAL_CONTENT);
|
||||||
|
@ -258,23 +291,113 @@ async fn handle_get_range(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
|
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
|
||||||
let version = garage.version_table.get(&version.uuid, &EmptyKey).await?;
|
let version = garage
|
||||||
let version = match version {
|
.version_table
|
||||||
Some(v) => v,
|
.get(&version.uuid, &EmptyKey)
|
||||||
None => return Err(Error::NoSuchKey),
|
.await?
|
||||||
|
.ok_or(Error::NoSuchKey)?;
|
||||||
|
|
||||||
|
let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
|
||||||
|
Ok(resp_builder.body(body)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_get_part(
|
||||||
|
garage: Arc<Garage>,
|
||||||
|
req: &Request<Body>,
|
||||||
|
object_version: &ObjectVersion,
|
||||||
|
version_data: &ObjectVersionData,
|
||||||
|
version_meta: &ObjectVersionMeta,
|
||||||
|
part_number: u64,
|
||||||
|
) -> Result<Response<Body>, Error> {
|
||||||
|
let version = if let ObjectVersionData::FirstBlock(_, _) = version_data {
|
||||||
|
garage
|
||||||
|
.version_table
|
||||||
|
.get(&object_version.uuid, &EmptyKey)
|
||||||
|
.await?
|
||||||
|
.ok_or(Error::NoSuchKey)?
|
||||||
|
} else {
|
||||||
|
return Err(Error::BadRequest(
|
||||||
|
"Cannot handle part_number: not a multipart upload.".into(),
|
||||||
trinity-1686a
commented
Same as above Same as above
|
|||||||
|
));
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let blocks = version
|
||||||
|
.blocks
|
||||||
|
.items()
|
||||||
|
.iter()
|
||||||
|
.filter(|(k, _)| k.part_number == part_number)
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
if blocks.is_empty() {
|
||||||
|
return Err(Error::BadRequest(format!("No such part: {}", part_number)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let part_size = blocks.iter().map(|(_, b)| b.size).sum();
|
||||||
|
|
||||||
|
if let Some(r) = parse_range_header(req, part_size)? {
|
||||||
trinity-1686a
commented
AWS does not support this kind of request (InvalidRequest: Cannot specify both Range header and partNumber query parameter). AWS does not support this kind of request (InvalidRequest: Cannot specify both Range header and partNumber query parameter).
I'm not sure if we should support it, S3 libraries won't ever call that, and it's additional code to maitain. On the other hand, it's not a lot of code, so this message is purely informative, and not necessary requiring any changes
|
|||||||
|
let range_begin = r.start;
|
||||||
|
let range_end = r.start + r.length;
|
||||||
|
let body = body_from_blocks_range(garage, &blocks[..], range_begin, range_end);
|
||||||
|
|
||||||
|
Ok(object_headers(object_version, version_meta)
|
||||||
|
.header(CONTENT_LENGTH, format!("{}", range_end - range_begin))
|
||||||
|
.header(
|
||||||
|
CONTENT_RANGE,
|
||||||
|
format!("bytes {}-{}/{}", range_begin, range_end - 1, part_size),
|
||||||
|
)
|
||||||
|
.status(StatusCode::PARTIAL_CONTENT)
|
||||||
|
.body(body)?)
|
||||||
|
} else {
|
||||||
|
let body = body_from_blocks_range(garage, &blocks[..], 0, part_size);
|
||||||
|
|
||||||
|
Ok(object_headers(object_version, version_meta)
|
||||||
|
.header(CONTENT_LENGTH, format!("{}", part_size))
|
||||||
|
.status(StatusCode::OK)
|
||||||
trinity-1686a
commented
Same as above Same as above
|
|||||||
|
.body(body)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_range_header(
|
||||||
|
req: &Request<Body>,
|
||||||
|
total_size: u64,
|
||||||
|
) -> Result<Option<http_range::HttpRange>, Error> {
|
||||||
|
let range = match req.headers().get(RANGE) {
|
||||||
|
Some(range) => {
|
||||||
|
let range_str = range.to_str()?;
|
||||||
|
let mut ranges =
|
||||||
|
http_range::HttpRange::parse(range_str, total_size).map_err(|e| (e, total_size))?;
|
||||||
|
if ranges.len() > 1 {
|
||||||
|
// garage does not support multi-range requests yet, so we respond with the entire
|
||||||
|
// object when multiple ranges are requested
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
ranges.pop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
Ok(range)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn body_from_blocks_range(
|
||||||
|
garage: Arc<Garage>,
|
||||||
|
all_blocks: &[(VersionBlockKey, VersionBlock)],
|
||||||
|
begin: u64,
|
||||||
|
end: u64,
|
||||||
|
) -> Body {
|
||||||
// We will store here the list of blocks that have an intersection with the requested
|
// We will store here the list of blocks that have an intersection with the requested
|
||||||
// range, as well as their "true offset", which is their actual offset in the complete
|
// range, as well as their "true offset", which is their actual offset in the complete
|
||||||
// file (whereas block.offset designates the offset of the block WITHIN THE PART
|
// file (whereas block.offset designates the offset of the block WITHIN THE PART
|
||||||
// block.part_number, which is not the same in the case of a multipart upload)
|
// block.part_number, which is not the same in the case of a multipart upload)
|
||||||
let mut blocks = Vec::with_capacity(std::cmp::min(
|
let mut blocks: Vec<(VersionBlock, u64)> = Vec::with_capacity(std::cmp::min(
|
||||||
version.blocks.len(),
|
all_blocks.len(),
|
||||||
4 + ((end - begin) / std::cmp::max(version.blocks.items()[0].1.size as u64, 1024))
|
4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize,
|
||||||
as usize,
|
|
||||||
));
|
));
|
||||||
let mut true_offset = 0;
|
let mut true_offset = 0;
|
||||||
for (_, b) in version.blocks.items().iter() {
|
for (_, b) in all_blocks.iter() {
|
||||||
if true_offset >= end {
|
if true_offset >= end {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -308,8 +431,5 @@ async fn handle_get_range(
|
||||||
})
|
})
|
||||||
.buffered(2);
|
.buffered(2);
|
||||||
|
|
||||||
let body = hyper::body::Body::wrap_stream(body_stream);
|
hyper::body::Body::wrap_stream(body_stream)
|
||||||
Ok(resp_builder.body(body)?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::collections::{BTreeMap, VecDeque};
|
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||||
|
@ -520,6 +520,7 @@ pub async fn handle_complete_multipart_upload(
|
||||||
|
|
||||||
let version_uuid = decode_upload_id(upload_id)?;
|
let version_uuid = decode_upload_id(upload_id)?;
|
||||||
|
|
||||||
|
// Get object and version
|
||||||
let key = key.to_string();
|
let key = key.to_string();
|
||||||
let (object, version) = futures::try_join!(
|
let (object, version) = futures::try_join!(
|
||||||
garage.object_table.get(&bucket_id, &key),
|
garage.object_table.get(&bucket_id, &key),
|
||||||
|
@ -544,6 +545,20 @@ pub async fn handle_complete_multipart_upload(
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Check that part numbers are an increasing sequence.
|
||||||
|
// (it doesn't need to start at 1 nor to be a continuous sequence,
|
||||||
|
// see discussion in #192)
|
||||||
|
if body_list_of_parts.is_empty() {
|
||||||
|
return Err(Error::EntityTooSmall);
|
||||||
|
}
|
||||||
|
if !body_list_of_parts
|
||||||
|
.iter()
|
||||||
|
.zip(body_list_of_parts.iter().skip(1))
|
||||||
|
.all(|(p1, p2)| p1.part_number < p2.part_number)
|
||||||
|
{
|
||||||
|
return Err(Error::InvalidPartOrder);
|
||||||
|
}
|
||||||
|
|
||||||
// Check that the list of parts they gave us corresponds to the parts we have here
|
// Check that the list of parts they gave us corresponds to the parts we have here
|
||||||
debug!("Expected parts from request: {:?}", body_list_of_parts);
|
debug!("Expected parts from request: {:?}", body_list_of_parts);
|
||||||
debug!("Parts stored in version: {:?}", version.parts_etags.items());
|
debug!("Parts stored in version: {:?}", version.parts_etags.items());
|
||||||
|
@ -556,18 +571,31 @@ pub async fn handle_complete_multipart_upload(
|
||||||
.iter()
|
.iter()
|
||||||
.map(|x| (&x.part_number, &x.etag))
|
.map(|x| (&x.part_number, &x.etag))
|
||||||
.eq(parts);
|
.eq(parts);
|
||||||
|
if !same_parts {
|
||||||
|
return Err(Error::InvalidPart);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that all blocks belong to one of the parts
|
||||||
|
let block_parts = version
|
||||||
|
.blocks
|
||||||
|
.items()
|
||||||
|
.iter()
|
||||||
|
.map(|(bk, _)| bk.part_number)
|
||||||
|
.collect::<BTreeSet<_>>();
|
||||||
|
let same_parts = body_list_of_parts
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.part_number)
|
||||||
|
.eq(block_parts.into_iter());
|
||||||
if !same_parts {
|
if !same_parts {
|
||||||
return Err(Error::BadRequest(
|
return Err(Error::BadRequest(
|
||||||
"We don't have the same parts".to_string(),
|
"Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again.".into(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate etag of final object
|
// Calculate etag of final object
|
||||||
// To understand how etags are calculated, read more here:
|
// To understand how etags are calculated, read more here:
|
||||||
// https://teppen.io/2018/06/23/aws_s3_etags/
|
// https://teppen.io/2018/06/23/aws_s3_etags/
|
||||||
let num_parts = version.blocks.items().last().unwrap().0.part_number
|
let num_parts = body_list_of_parts.len();
|
||||||
- version.blocks.items().first().unwrap().0.part_number
|
|
||||||
+ 1;
|
|
||||||
let mut etag_md5_hasher = Md5::new();
|
let mut etag_md5_hasher = Md5::new();
|
||||||
for (_, etag) in version.parts_etags.items().iter() {
|
for (_, etag) in version.parts_etags.items().iter() {
|
||||||
etag_md5_hasher.update(etag.as_bytes());
|
etag_md5_hasher.update(etag.as_bytes());
|
||||||
|
|
|
@ -134,8 +134,8 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
|
||||||
|
|
||||||
let ret_doc = match *req.method() {
|
let ret_doc = match *req.method() {
|
||||||
Method::OPTIONS => handle_options(req, &bucket).await,
|
Method::OPTIONS => handle_options(req, &bucket).await,
|
||||||
Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key).await,
|
Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await,
|
||||||
Method::GET => handle_get(garage.clone(), req, bucket_id, &key).await,
|
Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await,
|
||||||
_ => Err(ApiError::BadRequest("HTTP method not supported".into())),
|
_ => Err(ApiError::BadRequest("HTTP method not supported".into())),
|
||||||
}
|
}
|
||||||
.map_err(Error::from);
|
.map_err(Error::from);
|
||||||
|
@ -166,7 +166,7 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
|
||||||
.body(Body::empty())
|
.body(Body::empty())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
match handle_get(garage, &req2, bucket_id, &error_document).await {
|
match handle_get(garage, &req2, bucket_id, &error_document, None).await {
|
||||||
Ok(mut error_doc) => {
|
Ok(mut error_doc) => {
|
||||||
// The error won't be logged back in handle_request,
|
// The error won't be logged back in handle_request,
|
||||||
// so log it here
|
// so log it here
|
||||||
|
|
Loading…
Reference in a new issue
minor missmatch with AWS : if not multipart, AWS consider everything is contained in part 1.