diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs index 4279d7b5..96be0d5b 100644 --- a/src/api/common/signature/body.rs +++ b/src/api/common/signature/body.rs @@ -78,7 +78,7 @@ impl ReqBody { trailer_algorithm, } = self; - let (frame_tx, mut frame_rx) = mpsc::channel::>(1); + let (frame_tx, mut frame_rx) = mpsc::channel::>(5); let join_checksums = tokio::spawn(async move { while let Some(frame) = frame_rx.recv().await { diff --git a/src/api/common/signature/mod.rs b/src/api/common/signature/mod.rs index 78518436..50fbd304 100644 --- a/src/api/common/signature/mod.rs +++ b/src/api/common/signature/mod.rs @@ -62,7 +62,6 @@ pub struct VerifiedRequest { pub request: Request, pub access_key: Key, pub content_sha256_header: ContentSha256Header, - // TODO: oneshot chans to retrieve hashes after reading all body } pub async fn verify_request( diff --git a/src/api/common/signature/payload.rs b/src/api/common/signature/payload.rs index 4ca0153f..2d5f8603 100644 --- a/src/api/common/signature/payload.rs +++ b/src/api/common/signature/payload.rs @@ -74,21 +74,16 @@ fn parse_x_amz_content_sha256(header: Option<&str>) -> Result true, + UNSIGNED_PAYLOAD => false, + _ => { + return Err(Error::bad_request( + "invalid or unsupported x-amz-content-sha256", + )) + } + }; + Ok(ContentSha256Header::StreamingPayload { trailer, signed }) } else { let sha256 = hex::decode(header) .ok() diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs index 70b6e004..64362727 100644 --- a/src/api/common/signature/streaming.rs +++ b/src/api/common/signature/streaming.rs @@ -30,22 +30,12 @@ pub fn parse_streaming_body( checked_signature.content_sha256_header ); - let expected_checksums = ExpectedChecksums { - sha256: match &checked_signature.content_sha256_header { - ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256), - _ => None, - }, - ..Default::default() - }; - - let mut checksummer = Checksummer::init(&expected_checksums, false); - match checked_signature.content_sha256_header { ContentSha256Header::StreamingPayload { signed, trailer } => { // Sanity checks if !signed && !trailer { return Err(Error::bad_request( - "STREAMING-UNSIGNED-PAYLOAD is not a valid combination", + "STREAMING-UNSIGNED-PAYLOAD without trailer is not a valid combination", )); } @@ -64,6 +54,7 @@ pub fn parse_streaming_body( } // If trailer header is announced, add the calculation of the requested checksum + let mut checksummer = Checksummer::init(&Default::default(), false); let trailer_algorithm = if trailer { let algo = Some( request_trailer_checksum_algorithm(req.headers())? @@ -128,12 +119,21 @@ pub fn parse_streaming_body( ReqBody { stream: Mutex::new(signed_payload_stream.boxed()), checksummer, - expected_checksums, + expected_checksums: Default::default(), trailer_algorithm, } })) } _ => Ok(req.map(|body| { + let expected_checksums = ExpectedChecksums { + sha256: match &checked_signature.content_sha256_header { + ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256), + _ => None, + }, + ..Default::default() + }; + let checksummer = Checksummer::init(&expected_checksums, false); + let stream = http_body_util::BodyStream::new(body).map_err(Error::from); ReqBody { stream: Mutex::new(stream.boxed()), diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 53eff6ad..1ee04bc1 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -114,14 +114,17 @@ pub async fn handle_put_part( extra: request_checksum_value(req.headers())?, }; - // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); let (req_head, mut req_body) = req.into_parts(); + // Before we stream the body, configure the needed checksums. req_body.add_expected_checksums(expected_checksums.clone()); // TODO: avoid parsing encryption headers twice... if !EncryptionParams::new_from_headers(&garage, &req_head.headers)?.is_encrypted() { + // For non-encrypted objects, we need to compute the md5sum in all cases + // (even if content-md5 is not set), because it is used as an etag of the + // part, which is in turn used in the etag computation of the whole object req_body.add_md5(); } @@ -130,6 +133,7 @@ pub async fn handle_put_part( let mut chunker = StreamChunker::new(stream, garage.config.block_size); + // Read first chuck, and at the same time try to get object to see if it exists let ((_, object_version, mut mpu), first_block) = futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; @@ -186,7 +190,6 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - // TODO don't duplicate checksums let (total_size, _, _) = read_and_put_blocks( &ctx, &version, @@ -198,7 +201,7 @@ pub async fn handle_put_part( ) .await?; - // Verify that checksums map + // Verify that checksums match let checksums = stream_checksums .await .ok_or_internal_error("checksum calculation")??; diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 6fcf33cb..496d80f3 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -79,9 +79,14 @@ pub async fn handle_put( // Determine whether object should be encrypted, and if so the key let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; + // The request body is a special ReqBody object (see garage_api_common::signature::body) + // which supports calculating checksums while streaming the data. + // Before we start streaming, we configure it to calculate all the checksums we need. let mut req_body = req.into_body(); req_body.add_expected_checksums(expected_checksums.clone()); if !encryption.is_encrypted() { + // For non-encrypted objects, we need to compute the md5sum in all cases + // (even if content-md5 is not set), because it is used as the object etag req_body.add_md5(); }