From fe003d6fbccbd202b41f3bdb29df11bd2597fef3 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 14 Jan 2022 17:51:34 +0100 Subject: [PATCH 1/3] Add ListPartsResult structure --- src/api/s3_xml.rs | 127 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 121 insertions(+), 6 deletions(-) diff --git a/src/api/s3_xml.rs b/src/api/s3_xml.rs index 1df4ed60..7bbfa083 100644 --- a/src/api/s3_xml.rs +++ b/src/api/s3_xml.rs @@ -27,12 +27,6 @@ pub struct Bucket { pub name: Value, } -#[derive(Debug, Serialize, PartialEq)] -pub struct DisplayName(#[serde(rename = "$value")] pub String); - -#[derive(Debug, Serialize, PartialEq)] -pub struct Id(#[serde(rename = "$value")] pub String); - #[derive(Debug, Serialize, PartialEq)] pub struct Owner { #[serde(rename = "DisplayName")] @@ -187,6 +181,46 @@ pub struct ListMultipartUploadsResult { pub encoding_type: Option, } +#[derive(Debug, Serialize, PartialEq)] +pub struct PartItem { + #[serde(rename = "ETag")] + pub etag: Value, + #[serde(rename = "LastModified")] + pub last_modified: Value, + #[serde(rename = "PartNumber")] + pub part_number: IntValue, + #[serde(rename = "Size")] + pub size: IntValue, +} + +#[derive(Debug, Serialize, PartialEq)] +pub struct ListPartsResult { + #[serde(serialize_with = "xmlns_tag")] + pub xmlns: (), + #[serde(rename = "Bucket")] + pub bucket: Value, + #[serde(rename = "Key")] + pub key: Value, + #[serde(rename = "UploadId")] + pub upload_id: Value, + #[serde(rename = "PartNumberMarker")] + pub part_number_marker: Option, + #[serde(rename = "NextPartNumberMarker")] + pub next_part_number_marker: Option, + #[serde(rename = "MaxParts")] + pub max_parts: IntValue, + #[serde(rename = "IsTruncated")] + pub is_truncated: Value, + #[serde(rename = "Part", default)] + pub parts: Vec, + #[serde(rename = "Initiator")] + pub initiator: Initiator, + #[serde(rename = "Owner")] + pub owner: Owner, + #[serde(rename = "StorageClass")] + pub storage_class: Value, +} + #[derive(Debug, Serialize, PartialEq)] pub struct ListBucketItem { #[serde(rename = "Key")] @@ -706,4 +740,85 @@ mod tests { ); Ok(()) } + + #[test] + fn list_parts() -> Result<(), ApiError> { + let result = ListPartsResult { + xmlns: (), + bucket: Value("example-bucket".to_string()), + key: Value("example-object".to_string()), + upload_id: Value( + "XXBsb2FkIElEIGZvciBlbHZpbmcncyVcdS1tb3ZpZS5tMnRzEEEwbG9hZA".to_string(), + ), + part_number_marker: Some(IntValue(1)), + next_part_number_marker: Some(IntValue(3)), + max_parts: IntValue(2), + is_truncated: Value("true".to_string()), + parts: vec![ + PartItem { + etag: Value("\"7778aef83f66abc1fa1e8477f296d394\"".to_string()), + last_modified: Value("2010-11-10T20:48:34.000Z".to_string()), + part_number: IntValue(2), + size: IntValue(10485760), + }, + PartItem { + etag: Value("\"aaaa18db4cc2f85cedef654fccc4a4x8\"".to_string()), + last_modified: Value("2010-11-10T20:48:33.000Z".to_string()), + part_number: IntValue(3), + size: IntValue(10485760), + }, + ], + initiator: Initiator { + display_name: Value("umat-user-11116a31-17b5-4fb7-9df5-b288870f11xx".to_string()), + id: Value( + "arn:aws:iam::111122223333:user/some-user-11116a31-17b5-4fb7-9df5-b288870f11xx" + .to_string(), + ), + }, + owner: Owner { + display_name: Value("someName".to_string()), + id: Value( + "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a".to_string(), + ), + }, + storage_class: Value("STANDARD".to_string()), + }; + + assert_eq!( + to_xml_with_header(&result)?, + "\ +\ + example-bucket\ + example-object\ + XXBsb2FkIElEIGZvciBlbHZpbmcncyVcdS1tb3ZpZS5tMnRzEEEwbG9hZA\ + 1\ + 3\ + 2\ + true\ + \ + "7778aef83f66abc1fa1e8477f296d394"\ + 2010-11-10T20:48:34.000Z\ + 2\ + 10485760\ + \ + \ + "aaaa18db4cc2f85cedef654fccc4a4x8"\ + 2010-11-10T20:48:33.000Z\ + 3\ + 10485760\ + \ + \ + umat-user-11116a31-17b5-4fb7-9df5-b288870f11xx\ + arn:aws:iam::111122223333:user/some-user-11116a31-17b5-4fb7-9df5-b288870f11xx\ + \ + \ + someName\ + 75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a\ + \ + STANDARD\ +" + ); + + Ok(()) + } } -- 2.43.0 From 440374524bcd21cade7b68410eb2d12ba23cfaaf Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 19 Jan 2022 17:16:00 +0100 Subject: [PATCH 2/3] Implement ListParts --- src/api/api_server.rs | 19 +++ src/api/s3_list.rs | 374 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 389 insertions(+), 4 deletions(-) diff --git a/src/api/api_server.rs b/src/api/api_server.rs index b064ac24..dfb8dfdb 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -294,6 +294,25 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { + handle_list_parts( + garage, + &ListPartsQuery { + bucket_name, + bucket_id, + key, + upload_id, + part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)), + max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + }, + ) + .await + } Endpoint::DeleteObjects {} => { handle_delete_objects(garage, bucket_id, req, content_sha256).await } diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index c3bc6938..92998159 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -1,3 +1,4 @@ +use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; use std::sync::Arc; @@ -10,12 +11,18 @@ use garage_util::time::*; use garage_model::garage::Garage; use garage_model::object_table::*; +use garage_model::version_table::Version; + +use garage_table::EmptyKey; use crate::encoding::*; use crate::error::*; use crate::s3_put; use crate::s3_xml; +const DUMMY_NAME: &str = "Dummy Key"; +const DUMMY_KEY: &str = "GKDummyKey"; + #[derive(Debug)] pub struct ListQueryCommon { pub bucket_name: String, @@ -42,6 +49,16 @@ pub struct ListMultipartUploadsQuery { pub common: ListQueryCommon, } +#[derive(Debug)] +pub struct ListPartsQuery { + pub bucket_name: String, + pub bucket_id: Uuid, + pub key: String, + pub upload_id: String, + pub part_number_marker: Option, + pub max_parts: u64, +} + pub async fn handle_list( garage: Arc, query: &ListObjectsQuery, @@ -54,6 +71,7 @@ pub async fn handle_list( } }; + debug!("ListObjects {:?}", query); let mut acc = query.build_accumulator(); let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?; @@ -152,6 +170,7 @@ pub async fn handle_list_multipart_upload( } }; + debug!("ListMultipartUploads {:?}", query); let mut acc = query.build_accumulator(); let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?; @@ -208,12 +227,12 @@ pub async fn handle_list_multipart_upload( upload_id: s3_xml::Value(hex::encode(uuid)), storage_class: s3_xml::Value("STANDARD".to_string()), initiator: s3_xml::Initiator { - display_name: s3_xml::Value("Dummy Key".to_string()), - id: s3_xml::Value("GKDummyKey".to_string()), + display_name: s3_xml::Value(DUMMY_NAME.to_string()), + id: s3_xml::Value(DUMMY_KEY.to_string()), }, owner: s3_xml::Owner { - display_name: s3_xml::Value("Dummy Key".to_string()), - id: s3_xml::Value("GKDummyKey".to_string()), + display_name: s3_xml::Value(DUMMY_NAME.to_string()), + id: s3_xml::Value(DUMMY_KEY.to_string()), }, }) .collect(), @@ -233,6 +252,57 @@ pub async fn handle_list_multipart_upload( .body(Body::from(xml.into_bytes()))?) } +pub async fn handle_list_parts( + garage: Arc, + query: &ListPartsQuery, +) -> Result, Error> { + debug!("ListParts {:?}", query); + + let upload_id = s3_put::decode_upload_id(&query.upload_id)?; + + let (object, version) = futures::try_join!( + garage.object_table.get(&query.bucket_id, &query.key), + garage.version_table.get(&upload_id, &EmptyKey), + )?; + + let (info, next) = fetch_part_info(query, object, version, upload_id)?; + + let result = s3_xml::ListPartsResult { + xmlns: (), + bucket: s3_xml::Value(query.bucket_name.to_string()), + key: s3_xml::Value(query.key.to_string()), + upload_id: s3_xml::Value(query.upload_id.to_string()), + part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)), + next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), + max_parts: s3_xml::IntValue(query.max_parts as i64), + is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()), + parts: info + .iter() + .map(|part| s3_xml::PartItem { + etag: s3_xml::Value(format!("\"{}\"", part.etag)), + last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)), + part_number: s3_xml::IntValue(part.part_number as i64), + size: s3_xml::IntValue(part.size as i64), + }) + .collect(), + initiator: s3_xml::Initiator { + display_name: s3_xml::Value(DUMMY_NAME.to_string()), + id: s3_xml::Value(DUMMY_KEY.to_string()), + }, + owner: s3_xml::Owner { + display_name: s3_xml::Value(DUMMY_NAME.to_string()), + id: s3_xml::Value(DUMMY_KEY.to_string()), + }, + storage_class: s3_xml::Value("STANDARD".to_string()), + }; + + let xml = s3_xml::to_xml_with_header(&result)?; + + Ok(Response::builder() + .header("Content-Type", "application/xml") + .body(Body::from(xml.into_bytes()))?) +} + /* * Private enums and structs */ @@ -250,6 +320,14 @@ struct UploadInfo { timestamp: u64, } +#[derive(Debug, PartialEq)] +struct PartInfo { + etag: String, + timestamp: u64, + part_number: u64, + size: u64, +} + enum ExtractionResult { NoMore, Filled, @@ -364,6 +442,109 @@ where } } +fn fetch_part_info( + query: &ListPartsQuery, + object: Option, + version: Option, + upload_id: Uuid, +) -> Result<(Vec, Option), Error> { + // Check results + let object = object.ok_or(Error::NoSuchKey)?; + + let obj_version = object + .versions() + .iter() + .find(|v| v.uuid == upload_id && v.is_uploading()) + .ok_or(Error::NoSuchUpload)?; + + let version = version.ok_or(Error::NoSuchKey)?; + + // Cut the beginning of our 2 vectors if required + let (etags, blocks) = match &query.part_number_marker { + Some(marker) => { + let next = marker + 1; + + let part_idx = into_ok_or_err( + version + .parts_etags + .items() + .binary_search_by(|(part_num, _)| part_num.cmp(&next)), + ); + let parts = &version.parts_etags.items()[part_idx..]; + + let block_idx = into_ok_or_err( + version + .blocks + .items() + .binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)), + ); + let blocks = &version.blocks.items()[block_idx..]; + + (parts, blocks) + } + None => (version.parts_etags.items(), version.blocks.items()), + }; + + // Use the block vector to compute a (part_number, size) vector + let mut size = Vec::<(u64, u64)>::new(); + blocks.iter().for_each(|(key, val)| { + let mut new_size = val.size; + match size.pop() { + Some((part_number, size)) if part_number == key.part_number => new_size += size, + Some(v) => size.push(v), + None => (), + } + size.push((key.part_number, new_size)) + }); + + // Merge the etag vector and size vector to build a PartInfo vector + let max_parts = query.max_parts as usize; + let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable()); + + let mut info = Vec::::with_capacity(max_parts); + + while info.len() < max_parts { + match (etag_iter.peek(), size_iter.peek()) { + (Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) { + Ordering::Less => { + debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query); + etag_iter.next(); + } + Ordering::Equal => { + info.push(PartInfo { + etag: etag.to_string(), + timestamp: obj_version.timestamp, + part_number: *ep, + size: *size, + }); + etag_iter.next(); + size_iter.next(); + } + Ordering::Greater => { + debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query); + size_iter.next(); + } + }, + (None, None) => return Ok((info, None)), + _ => { + debug!( + "Additional block or ETag information ignored. Query: {:?}", + query + ); + return Ok((info, None)); + } + } + } + + match info.last() { + Some(part_info) => { + let pagination = Some(part_info.part_number); + Ok((info, pagination)) + } + None => Ok((info, None)), + } +} + /* * ListQuery logic */ @@ -715,6 +896,14 @@ impl ExtractAccumulator for UploadAccumulator { * Utility functions */ +/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable +fn into_ok_or_err(r: Result) -> T { + match r { + Ok(r) => r, + Err(r) => r, + } +} + /// Returns the common prefix of the object given the query prefix and delimiter fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> { match &query.delimiter { @@ -766,6 +955,8 @@ fn key_after_prefix(pfx: &str) -> Option { #[cfg(test)] mod tests { use super::*; + use garage_model::version_table::*; + use garage_util::*; use std::iter::FromIterator; const TS: u64 = 1641394898314; @@ -1014,4 +1205,179 @@ mod tests { Ok(()) } + + fn version() -> Version { + let uuid = Uuid::from([0x08; 32]); + + let blocks = vec![ + ( + VersionBlockKey { + part_number: 1, + offset: 1, + }, + VersionBlock { + hash: uuid, + size: 3, + }, + ), + ( + VersionBlockKey { + part_number: 1, + offset: 2, + }, + VersionBlock { + hash: uuid, + size: 2, + }, + ), + ( + VersionBlockKey { + part_number: 2, + offset: 1, + }, + VersionBlock { + hash: uuid, + size: 8, + }, + ), + ( + VersionBlockKey { + part_number: 5, + offset: 1, + }, + VersionBlock { + hash: uuid, + size: 7, + }, + ), + ( + VersionBlockKey { + part_number: 8, + offset: 1, + }, + VersionBlock { + hash: uuid, + size: 5, + }, + ), + ]; + let etags = vec![ + (1, "etag1".to_string()), + (3, "etag2".to_string()), + (5, "etag3".to_string()), + (8, "etag4".to_string()), + (9, "etag5".to_string()), + ]; + + Version { + bucket_id: uuid, + key: "a".to_string(), + uuid: uuid, + deleted: false.into(), + blocks: crdt::Map::::from_iter(blocks), + parts_etags: crdt::Map::::from_iter(etags), + } + } + + fn obj() -> Object { + Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])]) + } + + #[test] + fn test_fetch_part_info() -> Result<(), Error> { + let uuid = Uuid::from([0x08; 32]); + let mut query = ListPartsQuery { + bucket_name: "a".to_string(), + bucket_id: uuid, + key: "a".to_string(), + upload_id: "xx".to_string(), + part_number_marker: None, + max_parts: 2, + }; + + assert!( + fetch_part_info(&query, None, None, uuid).is_err(), + "No object and version should fail" + ); + assert!( + fetch_part_info(&query, Some(obj()), None, uuid).is_err(), + "No version should faild" + ); + assert!( + fetch_part_info(&query, None, Some(version()), uuid).is_err(), + "No object should fail" + ); + + // Start from the beginning but with limited size to trigger pagination + let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + assert_eq!(pagination.unwrap(), 5); + assert_eq!( + info, + vec![ + PartInfo { + etag: "etag1".to_string(), + timestamp: TS, + part_number: 1, + size: 5 + }, + PartInfo { + etag: "etag3".to_string(), + timestamp: TS, + part_number: 5, + size: 7 + }, + ] + ); + + // Use previous pagination to make a new request + query.part_number_marker = Some(pagination.unwrap()); + let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + assert!(pagination.is_none()); + assert_eq!( + info, + vec![PartInfo { + etag: "etag4".to_string(), + timestamp: TS, + part_number: 8, + size: 5 + },] + ); + + // Trying to access a part that is way larger than registered ones + query.part_number_marker = Some(9999); + let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + assert!(pagination.is_none()); + assert_eq!(info, vec![]); + + // Try without any limitation + query.max_parts = 1000; + query.part_number_marker = None; + let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + assert!(pagination.is_none()); + assert_eq!( + info, + vec![ + PartInfo { + etag: "etag1".to_string(), + timestamp: TS, + part_number: 1, + size: 5 + }, + PartInfo { + etag: "etag3".to_string(), + timestamp: TS, + part_number: 5, + size: 7 + }, + PartInfo { + etag: "etag4".to_string(), + timestamp: TS, + part_number: 8, + size: 5 + }, + ] + ); + + Ok(()) + } } -- 2.43.0 From 94f0e7c135798af4dcb023f846b6b0b4adb3b2da Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 20 Jan 2022 16:38:28 +0100 Subject: [PATCH 3/3] Test ListParts endpoint with awscli --- .../src/reference_manual/s3_compatibility.md | 2 +- script/test-smoke.sh | 53 ++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/doc/book/src/reference_manual/s3_compatibility.md b/doc/book/src/reference_manual/s3_compatibility.md index 5c2e6315..4c4c6457 100644 --- a/doc/book/src/reference_manual/s3_compatibility.md +++ b/doc/book/src/reference_manual/s3_compatibility.md @@ -44,7 +44,7 @@ All APIs that are not mentionned are not implemented and will return a 501 Not I | ListObjects | Implemented, bugs? (see below) | | ListObjectsV2 | Implemented | | ListMultipartUpload | Implemented | -| ListParts | Missing | +| ListParts | Implemented | | PutObject | Implemented | | PutBucketWebsite | Partially implemented (see below)| | UploadPart | Implemented | diff --git a/script/test-smoke.sh b/script/test-smoke.sh index 91bf90ab..0adf322a 100755 --- a/script/test-smoke.sh +++ b/script/test-smoke.sh @@ -252,7 +252,58 @@ if [ -z "$SKIP_AWS" ]; then echo "Deleted ${key}:${uid}" done - # Test for UploadPartCopy + echo "Test for ListParts" + UPLOAD_ID=$(aws s3api create-multipart-upload --bucket eprouvette --key list-parts | jq -r .UploadId) + aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT + [ $(jq '.Parts | length' $CMDOUT) == 0 ] + [ $(jq -r '.StorageClass' $CMDOUT) == 'STANDARD' ] # check that the result is not empty + ETAG1=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 10 --body /tmp/garage.2.rnd | jq .ETag) + aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT + [ $(jq '.Parts | length' $CMDOUT) == 1 ] + [ $(jq '.Parts[0].PartNumber' $CMDOUT) == 10 ] + [ $(jq '.Parts[0].Size' $CMDOUT) == 5242880 ] + [ $(jq '.Parts[0].ETag' $CMDOUT) == $ETAG1 ] + + ETAG2=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 9999 --body /tmp/garage.3.rnd | jq .ETag) + ETAG3=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 30 --body /tmp/garage.2.rnd | jq .ETag) + aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT + [ $(jq '.Parts | length' $CMDOUT) == 3 ] + [ $(jq '.Parts[1].ETag' $CMDOUT) == $ETAG3 ] + + aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --page-size 1 >$CMDOUT + [ $(jq '.Parts | length' $CMDOUT) == 3 ] + [ $(jq '.Parts[1].ETag' $CMDOUT) == $ETAG3 ] + + cat >/tmp/garage.multipart_struct <$CMDOUT + aws s3 rm "s3://eprouvette/list-parts" + + + # @FIXME We do not write tests with --starting-token due to a bug with awscli + # See here: https://github.com/aws/aws-cli/issues/6666 + + echo "Test for UploadPartCopy" aws s3 cp "/tmp/garage.3.rnd" "s3://eprouvette/copy_part_source" UPLOAD_ID=$(aws s3api create-multipart-upload --bucket eprouvette --key test_multipart | jq -r .UploadId) PART1=$(aws s3api upload-part \ -- 2.43.0