Fix partNumber issues (fixes #201) #202
3 changed files with 135 additions and 98 deletions
|
@ -257,15 +257,15 @@ if [ -z "$SKIP_AWS" ]; then
|
||||||
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
|
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
|
||||||
[ $(jq '.Parts | length' $CMDOUT) == 0 ]
|
[ $(jq '.Parts | length' $CMDOUT) == 0 ]
|
||||||
[ $(jq -r '.StorageClass' $CMDOUT) == 'STANDARD' ] # check that the result is not empty
|
[ $(jq -r '.StorageClass' $CMDOUT) == 'STANDARD' ] # check that the result is not empty
|
||||||
ETAG1=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 10 --body /tmp/garage.2.rnd | jq .ETag)
|
ETAG1=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 1 --body /tmp/garage.2.rnd | jq .ETag)
|
||||||
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
|
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
|
||||||
[ $(jq '.Parts | length' $CMDOUT) == 1 ]
|
[ $(jq '.Parts | length' $CMDOUT) == 1 ]
|
||||||
[ $(jq '.Parts[0].PartNumber' $CMDOUT) == 10 ]
|
[ $(jq '.Parts[0].PartNumber' $CMDOUT) == 1 ]
|
||||||
[ $(jq '.Parts[0].Size' $CMDOUT) == 5242880 ]
|
[ $(jq '.Parts[0].Size' $CMDOUT) == 5242880 ]
|
||||||
[ $(jq '.Parts[0].ETag' $CMDOUT) == $ETAG1 ]
|
[ $(jq '.Parts[0].ETag' $CMDOUT) == $ETAG1 ]
|
||||||
|
|
||||||
ETAG2=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 9999 --body /tmp/garage.3.rnd | jq .ETag)
|
ETAG2=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 3 --body /tmp/garage.3.rnd | jq .ETag)
|
||||||
ETAG3=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 30 --body /tmp/garage.2.rnd | jq .ETag)
|
ETAG3=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 2 --body /tmp/garage.2.rnd | jq .ETag)
|
||||||
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
|
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
|
||||||
[ $(jq '.Parts | length' $CMDOUT) == 3 ]
|
[ $(jq '.Parts | length' $CMDOUT) == 3 ]
|
||||||
[ $(jq '.Parts[1].ETag' $CMDOUT) == $ETAG3 ]
|
[ $(jq '.Parts[1].ETag' $CMDOUT) == $ETAG3 ]
|
||||||
|
@ -279,15 +279,15 @@ if [ -z "$SKIP_AWS" ]; then
|
||||||
"Parts": [
|
"Parts": [
|
||||||
{
|
{
|
||||||
"ETag": $ETAG1,
|
"ETag": $ETAG1,
|
||||||
"PartNumber": 10
|
"PartNumber": 1
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ETag": $ETAG3,
|
"ETag": $ETAG3,
|
||||||
"PartNumber": 30
|
"PartNumber": 2
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"ETag": $ETAG2,
|
"ETag": $ETAG2,
|
||||||
"PartNumber": 9999
|
"PartNumber": 3
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,8 @@ use garage_model::version_table::*;
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
|
|
||||||
|
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
|
||||||
|
|
||||||
fn object_headers(
|
fn object_headers(
|
||||||
version: &ObjectVersion,
|
version: &ObjectVersion,
|
||||||
version_meta: &ObjectVersionMeta,
|
version_meta: &ObjectVersionMeta,
|
||||||
|
@ -120,39 +122,49 @@ pub async fn handle_head(
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(pn) = part_number {
|
if let Some(pn) = part_number {
|
||||||
if let ObjectVersionData::Inline(_, _) = version_data {
|
match version_data {
|
||||||
// Not a multipart upload
|
ObjectVersionData::Inline(_, bytes) => {
|
||||||
return Err(Error::BadRequest(
|
if pn != 1 {
|
||||||
"Cannot process part_number argument: not a multipart upload".into(),
|
return Err(Error::InvalidPart);
|
||||||
));
|
}
|
||||||
|
Ok(object_headers(object_version, version_meta)
|
||||||
|
.header(CONTENT_LENGTH, format!("{}", bytes.len()))
|
||||||
|
.header(
|
||||||
|
CONTENT_RANGE,
|
||||||
|
format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()),
|
||||||
|
)
|
||||||
|
.header(X_AMZ_MP_PARTS_COUNT, "1")
|
||||||
|
.status(StatusCode::PARTIAL_CONTENT)
|
||||||
|
.body(Body::empty())?)
|
||||||
|
}
|
||||||
|
ObjectVersionData::FirstBlock(_, _) => {
|
||||||
|
let version = garage
|
||||||
|
.version_table
|
||||||
|
.get(&object_version.uuid, &EmptyKey)
|
||||||
|
.await?
|
||||||
|
.ok_or(Error::NoSuchKey)?;
|
||||||
|
|
||||||
|
let (part_offset, part_end) =
|
||||||
|
calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?;
|
||||||
|
let n_parts = version.parts_etags.items().len();
|
||||||
|
|
||||||
|
Ok(object_headers(object_version, version_meta)
|
||||||
|
.header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
|
||||||
|
.header(
|
||||||
|
CONTENT_RANGE,
|
||||||
|
format!(
|
||||||
|
"bytes {}-{}/{}",
|
||||||
|
part_offset,
|
||||||
|
part_end - 1,
|
||||||
|
version_meta.size
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts))
|
||||||
|
.status(StatusCode::PARTIAL_CONTENT)
|
||||||
|
.body(Body::empty())?)
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
|
|
||||||
let version = garage
|
|
||||||
.version_table
|
|
||||||
.get(&object_version.uuid, &EmptyKey)
|
|
||||||
.await?
|
|
||||||
.ok_or(Error::NoSuchKey)?;
|
|
||||||
if !version.has_part_number(pn) {
|
|
||||||
return Err(Error::BadRequest(format!(
|
|
||||||
"Part number {} does not exist",
|
|
||||||
pn
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let part_size: u64 = version
|
|
||||||
.blocks
|
|
||||||
.items()
|
|
||||||
.iter()
|
|
||||||
.filter(|(k, _)| k.part_number == pn)
|
|
||||||
.map(|(_, b)| b.size)
|
|
||||||
.sum();
|
|
||||||
let n_parts = version.parts_etags.items().len();
|
|
||||||
|
|
||||||
Ok(object_headers(object_version, version_meta)
|
|
||||||
.header(CONTENT_LENGTH, format!("{}", part_size))
|
|
||||||
.header("x-amz-mp-parts-count", format!("{}", n_parts))
|
|
||||||
.status(StatusCode::OK)
|
|
||||||
.body(Body::empty())?)
|
|
||||||
} else {
|
} else {
|
||||||
Ok(object_headers(object_version, version_meta)
|
Ok(object_headers(object_version, version_meta)
|
||||||
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
|
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
|
||||||
|
@ -196,22 +208,27 @@ pub async fn handle_get(
|
||||||
return Ok(cached);
|
return Ok(cached);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(pn) = part_number {
|
match (part_number, parse_range_header(req, last_v_meta.size)?) {
|
||||||
return handle_get_part(garage, req, last_v, last_v_data, last_v_meta, pn).await;
|
(Some(_), Some(_)) => {
|
||||||
}
|
return Err(Error::BadRequest(
|
||||||
|
"Cannot specify both partNumber and Range header".into(),
|
||||||
// No part_number specified, it's a normal get object
|
));
|
||||||
|
}
|
||||||
if let Some(range) = parse_range_header(req, last_v_meta.size)? {
|
(Some(pn), None) => {
|
||||||
return handle_get_range(
|
return handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await;
|
||||||
garage,
|
}
|
||||||
last_v,
|
(None, Some(range)) => {
|
||||||
last_v_data,
|
return handle_get_range(
|
||||||
last_v_meta,
|
garage,
|
||||||
range.start,
|
last_v,
|
||||||
range.start + range.length,
|
last_v_data,
|
||||||
)
|
last_v_meta,
|
||||||
.await;
|
range.start,
|
||||||
|
range.start + range.length,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
(None, None) => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
let resp_builder = object_headers(last_v, last_v_meta)
|
let resp_builder = object_headers(last_v, last_v_meta)
|
||||||
|
@ -305,58 +322,51 @@ async fn handle_get_range(
|
||||||
|
|
||||||
async fn handle_get_part(
|
async fn handle_get_part(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: &Request<Body>,
|
|
||||||
object_version: &ObjectVersion,
|
object_version: &ObjectVersion,
|
||||||
version_data: &ObjectVersionData,
|
version_data: &ObjectVersionData,
|
||||||
version_meta: &ObjectVersionMeta,
|
version_meta: &ObjectVersionMeta,
|
||||||
part_number: u64,
|
part_number: u64,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let version = if let ObjectVersionData::FirstBlock(_, _) = version_data {
|
let resp_builder =
|
||||||
garage
|
object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT);
|
||||||
.version_table
|
|
||||||
.get(&object_version.uuid, &EmptyKey)
|
|
||||||
.await?
|
|
||||||
.ok_or(Error::NoSuchKey)?
|
|
||||||
} else {
|
|
||||||
return Err(Error::BadRequest(
|
|
||||||
"Cannot handle part_number: not a multipart upload.".into(),
|
|
||||||
));
|
|
||||||
};
|
|
||||||
|
|
||||||
let blocks = version
|
match version_data {
|
||||||
.blocks
|
ObjectVersionData::Inline(_, bytes) => {
|
||||||
.items()
|
if part_number != 1 {
|
||||||
.iter()
|
return Err(Error::InvalidPart);
|
||||||
.filter(|(k, _)| k.part_number == part_number)
|
}
|
||||||
.cloned()
|
Ok(resp_builder
|
||||||
.collect::<Vec<_>>();
|
.header(CONTENT_LENGTH, format!("{}", bytes.len()))
|
||||||
|
.header(
|
||||||
|
CONTENT_RANGE,
|
||||||
|
format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()),
|
||||||
|
)
|
||||||
|
.header(X_AMZ_MP_PARTS_COUNT, "1")
|
||||||
|
.body(Body::from(bytes.to_vec()))?)
|
||||||
|
}
|
||||||
|
ObjectVersionData::FirstBlock(_, _) => {
|
||||||
|
let version = garage
|
||||||
|
.version_table
|
||||||
|
.get(&object_version.uuid, &EmptyKey)
|
||||||
|
.await?
|
||||||
|
.ok_or(Error::NoSuchKey)?;
|
||||||
|
|
||||||
if blocks.is_empty() {
|
let (begin, end) =
|
||||||
return Err(Error::BadRequest(format!("No such part: {}", part_number)));
|
calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?;
|
||||||
}
|
let n_parts = version.parts_etags.items().len();
|
||||||
|
|
||||||
let part_size = blocks.iter().map(|(_, b)| b.size).sum();
|
let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
|
||||||
|
|
||||||
if let Some(r) = parse_range_header(req, part_size)? {
|
Ok(resp_builder
|
||||||
let range_begin = r.start;
|
.header(CONTENT_LENGTH, format!("{}", end - begin))
|
||||||
let range_end = r.start + r.length;
|
.header(
|
||||||
let body = body_from_blocks_range(garage, &blocks[..], range_begin, range_end);
|
CONTENT_RANGE,
|
||||||
|
format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
|
||||||
Ok(object_headers(object_version, version_meta)
|
)
|
||||||
.header(CONTENT_LENGTH, format!("{}", range_end - range_begin))
|
.header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts))
|
||||||
.header(
|
.body(body)?)
|
||||||
CONTENT_RANGE,
|
}
|
||||||
format!("bytes {}-{}/{}", range_begin, range_end - 1, part_size),
|
_ => unreachable!(),
|
||||||
)
|
|
||||||
.status(StatusCode::PARTIAL_CONTENT)
|
|
||||||
.body(body)?)
|
|
||||||
} else {
|
|
||||||
let body = body_from_blocks_range(garage, &blocks[..], 0, part_size);
|
|
||||||
|
|
||||||
Ok(object_headers(object_version, version_meta)
|
|
||||||
.header(CONTENT_LENGTH, format!("{}", part_size))
|
|
||||||
.status(StatusCode::OK)
|
|
||||||
.body(body)?)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -382,6 +392,22 @@ fn parse_range_header(
|
||||||
Ok(range)
|
Ok(range)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> {
|
||||||
|
let mut offset = 0;
|
||||||
|
for (i, (bk, bv)) in v.blocks.items().iter().enumerate() {
|
||||||
|
if bk.part_number == part_number {
|
||||||
|
let size: u64 = v.blocks.items()[i..]
|
||||||
|
.iter()
|
||||||
|
.take_while(|(k, _)| k.part_number == part_number)
|
||||||
|
.map(|(_, v)| v.size)
|
||||||
|
.sum();
|
||||||
|
return Some((offset, offset + size));
|
||||||
|
}
|
||||||
|
offset += bv.size;
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
fn body_from_blocks_range(
|
fn body_from_blocks_range(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
all_blocks: &[(VersionBlockKey, VersionBlock)],
|
all_blocks: &[(VersionBlockKey, VersionBlock)],
|
||||||
|
|
|
@ -559,6 +559,17 @@ pub async fn handle_complete_multipart_upload(
|
||||||
return Err(Error::InvalidPartOrder);
|
return Err(Error::InvalidPartOrder);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Garage-specific restriction, see #204: part numbers must be
|
||||||
|
// consecutive starting at 1
|
||||||
|
if body_list_of_parts[0].part_number != 1
|
||||||
|
|| !body_list_of_parts
|
||||||
|
.iter()
|
||||||
|
.zip(body_list_of_parts.iter().skip(1))
|
||||||
|
.all(|(p1, p2)| p1.part_number + 1 == p2.part_number)
|
||||||
|
{
|
||||||
|
return Err(Error::NotImplemented("Garage does not support completing a Multipart upload with non-consecutive part numbers. This is a restriction of Garage's data model, which might be fixed in a future release. See issue #204 for more information on this topic.".into()));
|
||||||
|
}
|
||||||
|
|
||||||
// Check that the list of parts they gave us corresponds to the parts we have here
|
// Check that the list of parts they gave us corresponds to the parts we have here
|
||||||
debug!("Expected parts from request: {:?}", body_list_of_parts);
|
debug!("Expected parts from request: {:?}", body_list_of_parts);
|
||||||
debug!("Parts stored in version: {:?}", version.parts_etags.items());
|
debug!("Parts stored in version: {:?}", version.parts_etags.items());
|
||||||
|
|
Loading…
Reference in a new issue