Correctly implement CompleteMultipartUpload with etag check of parts
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Alex 2021-03-10 17:01:05 +01:00
parent 445912dc6a
commit af7600f989
2 changed files with 36 additions and 19 deletions

View file

@ -100,7 +100,7 @@ pub async fn handle_put(
// Transfer data and verify checksum // Transfer data and verify checksum
let tx_result = read_and_put_blocks( let tx_result = read_and_put_blocks(
&garage, &garage,
version, &version,
1, 1,
first_block, first_block,
first_block_hash, first_block_hash,
@ -174,7 +174,7 @@ fn ensure_checksum_matches(
async fn read_and_put_blocks( async fn read_and_put_blocks(
garage: &Arc<Garage>, garage: &Arc<Garage>,
version: Version, version: &Version,
part_number: u64, part_number: u64,
first_block: Vec<u8>, first_block: Vec<u8>,
first_block_hash: Hash, first_block_hash: Hash,
@ -392,7 +392,7 @@ pub async fn handle_put_part(
let first_block_hash = blake2sum(&first_block[..]); let first_block_hash = blake2sum(&first_block[..]);
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
&garage, &garage,
version, &version,
part_number, part_number,
first_block, first_block,
first_block_hash, first_block_hash,
@ -400,6 +400,7 @@ pub async fn handle_put_part(
) )
.await?; .await?;
// Verify that checksums map
ensure_checksum_matches( ensure_checksum_matches(
data_md5sum.as_slice(), data_md5sum.as_slice(),
data_sha256sum, data_sha256sum,
@ -407,8 +408,14 @@ pub async fn handle_put_part(
content_sha256, content_sha256,
)?; )?;
// Store part etag in version
let data_md5sum_hex = hex::encode(data_md5sum);
let mut version = version;
version.parts_etags.put(part_number, data_md5sum_hex.clone());
garage.version_table.insert(&version).await?;
let response = Response::builder() let response = Response::builder()
.header("ETag", format!("\"{}\"", hex::encode(data_md5sum))) .header("ETag", format!("\"{}\"", data_md5sum_hex))
.body(Body::from(vec![])) .body(Body::from(vec![]))
.unwrap(); .unwrap();
Ok(response) Ok(response)
@ -463,43 +470,46 @@ pub async fn handle_complete_multipart_upload(
}; };
// Check that the list of parts they gave us corresponds to the parts we have here // Check that the list of parts they gave us corresponds to the parts we have here
// TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere... println!("Expected parts from request: {:?}", body_list_of_parts);
let mut parts = version println!("Parts stored in version: {:?}", version.parts_etags.items());
.blocks let parts = version
.parts_etags
.items() .items()
.iter() .iter()
.map(|x| x.0.part_number) .map(|pair| (&pair.0, &pair.1));
.collect::<Vec<_>>();
parts.dedup();
let same_parts = body_list_of_parts let same_parts = body_list_of_parts
.iter() .iter()
.map(|x| &x.part_number) .map(|x| (&x.part_number, &x.etag))
.eq(parts.iter()); .eq(parts);
if !same_parts { if !same_parts {
return Err(Error::BadRequest(format!("We don't have the same parts"))); return Err(Error::BadRequest(format!("We don't have the same parts")));
} }
// ETag calculation: we produce ETags that have the same form as // Calculate etag of final object
// those of S3 multipart uploads, but we don't use their actual // To understand how etags are calculated, read more here:
// calculation for the first part (we use random bytes). This // https://teppen.io/2018/06/23/aws_s3_etags/
// shouldn't impact compatibility as the S3 docs specify that
// the ETag is an opaque value in case of a multipart upload.
// See also: https://teppen.io/2018/06/23/aws_s3_etags/
let num_parts = version.blocks.items().last().unwrap().0.part_number let num_parts = version.blocks.items().last().unwrap().0.part_number
- version.blocks.items().first().unwrap().0.part_number - version.blocks.items().first().unwrap().0.part_number
+ 1; + 1;
let mut etag_md5_hasher = Md5::new();
for (_, etag) in version.parts_etags.items().iter() {
etag_md5_hasher.update(etag.as_bytes());
}
let etag = format!( let etag = format!(
"{}-{}", "{}-{}",
hex::encode(&rand::random::<[u8; 16]>()[..]), hex::encode(etag_md5_hasher.finalize()),
num_parts num_parts
); );
// Calculate total size of final object
let total_size = version let total_size = version
.blocks .blocks
.items() .items()
.iter() .iter()
.map(|x| x.1.size) .map(|x| x.1.size)
.fold(0, |x, y| x + y); .fold(0, |x, y| x + y);
// Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta { ObjectVersionMeta {
headers, headers,
@ -512,6 +522,7 @@ pub async fn handle_complete_multipart_upload(
let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]); let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?; garage.object_table.insert(&final_object).await?;
// Send response saying ok we're done
let mut xml = String::new(); let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap(); writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
writeln!( writeln!(

View file

@ -16,8 +16,11 @@ pub struct Version {
pub uuid: UUID, pub uuid: UUID,
// Actual data: the blocks for this version // Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload
pub deleted: crdt::Bool, pub deleted: crdt::Bool,
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if // Back link to bucket+key so that we can figure if
// this was deleted later on // this was deleted later on
@ -31,6 +34,7 @@ impl Version {
uuid, uuid,
deleted: deleted.into(), deleted: deleted.into(),
blocks: crdt::Map::new(), blocks: crdt::Map::new(),
parts_etags: crdt::Map::new(),
bucket, bucket,
key, key,
} }
@ -82,8 +86,10 @@ impl CRDT for Version {
if self.deleted.get() { if self.deleted.get() {
self.blocks.clear(); self.blocks.clear();
self.parts_etags.clear();
} else { } else {
self.blocks.merge(&other.blocks); self.blocks.merge(&other.blocks);
self.parts_etags.merge(&other.parts_etags);
} }
} }
} }