Fix partNumber issues (fixes #201) #202

Merged
lx merged 2 commits from get-head-part-number into main 2022-01-25 12:05:47 +00:00
3 changed files with 135 additions and 98 deletions

View file

@ -257,15 +257,15 @@ if [ -z "$SKIP_AWS" ]; then
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
[ $(jq '.Parts | length' $CMDOUT) == 0 ] [ $(jq '.Parts | length' $CMDOUT) == 0 ]
[ $(jq -r '.StorageClass' $CMDOUT) == 'STANDARD' ] # check that the result is not empty [ $(jq -r '.StorageClass' $CMDOUT) == 'STANDARD' ] # check that the result is not empty
ETAG1=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 10 --body /tmp/garage.2.rnd | jq .ETag) ETAG1=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 1 --body /tmp/garage.2.rnd | jq .ETag)
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
[ $(jq '.Parts | length' $CMDOUT) == 1 ] [ $(jq '.Parts | length' $CMDOUT) == 1 ]
[ $(jq '.Parts[0].PartNumber' $CMDOUT) == 10 ] [ $(jq '.Parts[0].PartNumber' $CMDOUT) == 1 ]
[ $(jq '.Parts[0].Size' $CMDOUT) == 5242880 ] [ $(jq '.Parts[0].Size' $CMDOUT) == 5242880 ]
[ $(jq '.Parts[0].ETag' $CMDOUT) == $ETAG1 ] [ $(jq '.Parts[0].ETag' $CMDOUT) == $ETAG1 ]
ETAG2=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 9999 --body /tmp/garage.3.rnd | jq .ETag) ETAG2=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 3 --body /tmp/garage.3.rnd | jq .ETag)
ETAG3=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 30 --body /tmp/garage.2.rnd | jq .ETag) ETAG3=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 2 --body /tmp/garage.2.rnd | jq .ETag)
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
[ $(jq '.Parts | length' $CMDOUT) == 3 ] [ $(jq '.Parts | length' $CMDOUT) == 3 ]
[ $(jq '.Parts[1].ETag' $CMDOUT) == $ETAG3 ] [ $(jq '.Parts[1].ETag' $CMDOUT) == $ETAG3 ]
@ -279,15 +279,15 @@ if [ -z "$SKIP_AWS" ]; then
"Parts": [ "Parts": [
{ {
"ETag": $ETAG1, "ETag": $ETAG1,
"PartNumber": 10 "PartNumber": 1
}, },
{ {
"ETag": $ETAG3, "ETag": $ETAG3,
"PartNumber": 30 "PartNumber": 2
}, },
{ {
"ETag": $ETAG2, "ETag": $ETAG2,
"PartNumber": 9999 "PartNumber": 3
} }
] ]
} }

View file

@ -19,6 +19,8 @@ use garage_model::version_table::*;
use crate::error::*; use crate::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
fn object_headers( fn object_headers(
version: &ObjectVersion, version: &ObjectVersion,
version_meta: &ObjectVersionMeta, version_meta: &ObjectVersionMeta,
@ -120,39 +122,49 @@ pub async fn handle_head(
} }
if let Some(pn) = part_number { if let Some(pn) = part_number {
if let ObjectVersionData::Inline(_, _) = version_data { match version_data {
// Not a multipart upload ObjectVersionData::Inline(_, bytes) => {
return Err(Error::BadRequest( if pn != 1 {
"Cannot process part_number argument: not a multipart upload".into(), return Err(Error::InvalidPart);
));
} }
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", bytes.len()))
.header(
CONTENT_RANGE,
format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()),
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
.status(StatusCode::PARTIAL_CONTENT)
.body(Body::empty())?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage let version = garage
.version_table .version_table
.get(&object_version.uuid, &EmptyKey) .get(&object_version.uuid, &EmptyKey)
.await? .await?
.ok_or(Error::NoSuchKey)?; .ok_or(Error::NoSuchKey)?;
if !version.has_part_number(pn) {
return Err(Error::BadRequest(format!(
"Part number {} does not exist",
pn
)));
}
let part_size: u64 = version let (part_offset, part_end) =
.blocks calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?;
.items()
.iter()
.filter(|(k, _)| k.part_number == pn)
.map(|(_, b)| b.size)
.sum();
let n_parts = version.parts_etags.items().len(); let n_parts = version.parts_etags.items().len();
Ok(object_headers(object_version, version_meta) Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", part_size)) .header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
.header("x-amz-mp-parts-count", format!("{}", n_parts)) .header(
.status(StatusCode::OK) CONTENT_RANGE,
format!(
"bytes {}-{}/{}",
part_offset,
part_end - 1,
version_meta.size
),
)
.header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts))
.status(StatusCode::PARTIAL_CONTENT)
.body(Body::empty())?) .body(Body::empty())?)
}
_ => unreachable!(),
}
} else { } else {
Ok(object_headers(object_version, version_meta) Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", version_meta.size)) .header(CONTENT_LENGTH, format!("{}", version_meta.size))
@ -196,13 +208,16 @@ pub async fn handle_get(
return Ok(cached); return Ok(cached);
} }
if let Some(pn) = part_number { match (part_number, parse_range_header(req, last_v_meta.size)?) {
return handle_get_part(garage, req, last_v, last_v_data, last_v_meta, pn).await; (Some(_), Some(_)) => {
return Err(Error::BadRequest(
"Cannot specify both partNumber and Range header".into(),
));
} }
(Some(pn), None) => {
// No part_number specified, it's a normal get object return handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await;
}
if let Some(range) = parse_range_header(req, last_v_meta.size)? { (None, Some(range)) => {
return handle_get_range( return handle_get_range(
garage, garage,
last_v, last_v,
@ -213,6 +228,8 @@ pub async fn handle_get(
) )
.await; .await;
} }
(None, None) => (),
}
let resp_builder = object_headers(last_v, last_v_meta) let resp_builder = object_headers(last_v, last_v_meta)
.header(CONTENT_LENGTH, format!("{}", last_v_meta.size)) .header(CONTENT_LENGTH, format!("{}", last_v_meta.size))
@ -305,59 +322,52 @@ async fn handle_get_range(
async fn handle_get_part( async fn handle_get_part(
garage: Arc<Garage>, garage: Arc<Garage>,
req: &Request<Body>,
object_version: &ObjectVersion, object_version: &ObjectVersion,
version_data: &ObjectVersionData, version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta, version_meta: &ObjectVersionMeta,
part_number: u64, part_number: u64,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let version = if let ObjectVersionData::FirstBlock(_, _) = version_data { let resp_builder =
garage object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT);
match version_data {
ObjectVersionData::Inline(_, bytes) => {
if part_number != 1 {
return Err(Error::InvalidPart);
}
Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", bytes.len()))
.header(
CONTENT_RANGE,
format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()),
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
.body(Body::from(bytes.to_vec()))?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
.version_table .version_table
.get(&object_version.uuid, &EmptyKey) .get(&object_version.uuid, &EmptyKey)
.await? .await?
.ok_or(Error::NoSuchKey)? .ok_or(Error::NoSuchKey)?;
} else {
return Err(Error::BadRequest(
"Cannot handle part_number: not a multipart upload.".into(),
));
};
let blocks = version let (begin, end) =
.blocks calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?;
.items() let n_parts = version.parts_etags.items().len();
.iter()
.filter(|(k, _)| k.part_number == part_number)
.cloned()
.collect::<Vec<_>>();
if blocks.is_empty() { let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
return Err(Error::BadRequest(format!("No such part: {}", part_number)));
}
let part_size = blocks.iter().map(|(_, b)| b.size).sum(); Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", end - begin))
if let Some(r) = parse_range_header(req, part_size)? {
let range_begin = r.start;
let range_end = r.start + r.length;
let body = body_from_blocks_range(garage, &blocks[..], range_begin, range_end);
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", range_end - range_begin))
.header( .header(
CONTENT_RANGE, CONTENT_RANGE,
format!("bytes {}-{}/{}", range_begin, range_end - 1, part_size), format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
) )
.status(StatusCode::PARTIAL_CONTENT) .header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts))
.body(body)?)
} else {
let body = body_from_blocks_range(garage, &blocks[..], 0, part_size);
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", part_size))
.status(StatusCode::OK)
.body(body)?) .body(body)?)
} }
_ => unreachable!(),
}
} }
fn parse_range_header( fn parse_range_header(
@ -382,6 +392,22 @@ fn parse_range_header(
Ok(range) Ok(range)
} }
fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> {
let mut offset = 0;
for (i, (bk, bv)) in v.blocks.items().iter().enumerate() {
if bk.part_number == part_number {
let size: u64 = v.blocks.items()[i..]
.iter()
.take_while(|(k, _)| k.part_number == part_number)
.map(|(_, v)| v.size)
.sum();
return Some((offset, offset + size));
}
offset += bv.size;
}
None
}
fn body_from_blocks_range( fn body_from_blocks_range(
garage: Arc<Garage>, garage: Arc<Garage>,
all_blocks: &[(VersionBlockKey, VersionBlock)], all_blocks: &[(VersionBlockKey, VersionBlock)],

View file

@ -559,6 +559,17 @@ pub async fn handle_complete_multipart_upload(
return Err(Error::InvalidPartOrder); return Err(Error::InvalidPartOrder);
} }
// Garage-specific restriction, see #204: part numbers must be
// consecutive starting at 1
if body_list_of_parts[0].part_number != 1
|| !body_list_of_parts
.iter()
.zip(body_list_of_parts.iter().skip(1))
.all(|(p1, p2)| p1.part_number + 1 == p2.part_number)
{
return Err(Error::NotImplemented("Garage does not support completing a Multipart upload with non-consecutive part numbers. This is a restriction of Garage's data model, which might be fixed in a future release. See issue #204 for more information on this topic.".into()));
}
// Check that the list of parts they gave us corresponds to the parts we have here // Check that the list of parts they gave us corresponds to the parts we have here
debug!("Expected parts from request: {:?}", body_list_of_parts); debug!("Expected parts from request: {:?}", body_list_of_parts);
debug!("Parts stored in version: {:?}", version.parts_etags.items()); debug!("Parts stored in version: {:?}", version.parts_etags.items());