From af7600f989d79d07253405647973828435f9d16c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 10 Mar 2021 17:01:05 +0100 Subject: [PATCH] Correctly implement CompleteMultipartUpload with etag check of parts --- src/api/s3_put.rs | 49 +++++++++++++++++++++++--------------- src/model/version_table.rs | 6 +++++ 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 1939f2cf..7aec12ed 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -100,7 +100,7 @@ pub async fn handle_put( // Transfer data and verify checksum let tx_result = read_and_put_blocks( &garage, - version, + &version, 1, first_block, first_block_hash, @@ -174,7 +174,7 @@ fn ensure_checksum_matches( async fn read_and_put_blocks( garage: &Arc, - version: Version, + version: &Version, part_number: u64, first_block: Vec, first_block_hash: Hash, @@ -392,7 +392,7 @@ pub async fn handle_put_part( let first_block_hash = blake2sum(&first_block[..]); let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( &garage, - version, + &version, part_number, first_block, first_block_hash, @@ -400,6 +400,7 @@ pub async fn handle_put_part( ) .await?; + // Verify that checksums map ensure_checksum_matches( data_md5sum.as_slice(), data_sha256sum, @@ -407,8 +408,14 @@ pub async fn handle_put_part( content_sha256, )?; + // Store part etag in version + let data_md5sum_hex = hex::encode(data_md5sum); + let mut version = version; + version.parts_etags.put(part_number, data_md5sum_hex.clone()); + garage.version_table.insert(&version).await?; + let response = Response::builder() - .header("ETag", format!("\"{}\"", hex::encode(data_md5sum))) + .header("ETag", format!("\"{}\"", data_md5sum_hex)) .body(Body::from(vec![])) .unwrap(); Ok(response) @@ -463,43 +470,46 @@ pub async fn handle_complete_multipart_upload( }; // Check that the list of parts they gave us corresponds to the parts we have here - // TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere... - let mut parts = version - .blocks + println!("Expected parts from request: {:?}", body_list_of_parts); + println!("Parts stored in version: {:?}", version.parts_etags.items()); + let parts = version + .parts_etags .items() .iter() - .map(|x| x.0.part_number) - .collect::>(); - parts.dedup(); + .map(|pair| (&pair.0, &pair.1)); let same_parts = body_list_of_parts .iter() - .map(|x| &x.part_number) - .eq(parts.iter()); + .map(|x| (&x.part_number, &x.etag)) + .eq(parts); if !same_parts { return Err(Error::BadRequest(format!("We don't have the same parts"))); } - // ETag calculation: we produce ETags that have the same form as - // those of S3 multipart uploads, but we don't use their actual - // calculation for the first part (we use random bytes). This - // shouldn't impact compatibility as the S3 docs specify that - // the ETag is an opaque value in case of a multipart upload. - // See also: https://teppen.io/2018/06/23/aws_s3_etags/ + // Calculate etag of final object + // To understand how etags are calculated, read more here: + // https://teppen.io/2018/06/23/aws_s3_etags/ let num_parts = version.blocks.items().last().unwrap().0.part_number - version.blocks.items().first().unwrap().0.part_number + 1; + let mut etag_md5_hasher = Md5::new(); + for (_, etag) in version.parts_etags.items().iter() { + etag_md5_hasher.update(etag.as_bytes()); + } let etag = format!( "{}-{}", - hex::encode(&rand::random::<[u8; 16]>()[..]), + hex::encode(etag_md5_hasher.finalize()), num_parts ); + // Calculate total size of final object let total_size = version .blocks .items() .iter() .map(|x| x.1.size) .fold(0, |x, y| x + y); + + // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { headers, @@ -512,6 +522,7 @@ pub async fn handle_complete_multipart_upload( let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; + // Send response saying ok we're done let mut xml = String::new(); writeln!(&mut xml, r#""#).unwrap(); writeln!( diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 26abb64e..7ccc6a33 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -16,8 +16,11 @@ pub struct Version { pub uuid: UUID, // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload pub deleted: crdt::Bool, pub blocks: crdt::Map, + pub parts_etags: crdt::Map, // Back link to bucket+key so that we can figure if // this was deleted later on @@ -31,6 +34,7 @@ impl Version { uuid, deleted: deleted.into(), blocks: crdt::Map::new(), + parts_etags: crdt::Map::new(), bucket, key, } @@ -82,8 +86,10 @@ impl CRDT for Version { if self.deleted.get() { self.blocks.clear(); + self.parts_etags.clear(); } else { self.blocks.merge(&other.blocks); + self.parts_etags.merge(&other.parts_etags); } } }