From 730bfee753c4f22cd0595d9195222de334ec36f9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 18 Feb 2025 13:59:43 +0100 Subject: [PATCH] api: validate trailing checksum + add test for unsigned-paylad-trailer --- Cargo.lock | 1 + src/api/common/signature/body.rs | 13 +- src/api/common/signature/checksum.rs | 81 +++++------ src/api/common/signature/streaming.rs | 58 +++++--- src/api/s3/post_object.rs | 5 +- src/garage/Cargo.toml | 1 + src/garage/tests/common/custom_requester.rs | 111 +++++++++++++-- src/garage/tests/common/garage.rs | 5 +- src/garage/tests/s3/streaming_signature.rs | 150 +++++++++++++++++++- 9 files changed, 337 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26f6ea1d..477e4456 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1230,6 +1230,7 @@ dependencies = [ "bytes", "bytesize", "chrono", + "crc32fast", "format_table", "futures", "garage_api_admin", diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs index 512d02b3..4279d7b5 100644 --- a/src/api/common/signature/body.rs +++ b/src/api/common/signature/body.rs @@ -17,6 +17,7 @@ pub struct ReqBody { pub(crate) stream: Mutex, Error>>>, pub(crate) checksummer: Checksummer, pub(crate) expected_checksums: ExpectedChecksums, + pub(crate) trailer_algorithm: Option, } pub type StreamingChecksumReceiver = task::JoinHandle>; @@ -74,6 +75,7 @@ impl ReqBody { stream, mut checksummer, mut expected_checksums, + trailer_algorithm, } = self; let (frame_tx, mut frame_rx) = mpsc::channel::>(1); @@ -91,18 +93,21 @@ impl ReqBody { } Err(frame) => { let trailers = frame.into_trailers().unwrap(); - if let Some(cv) = request_checksum_value(&trailers)? { - expected_checksums.extra = Some(cv); - } + let algo = trailer_algorithm.unwrap(); + expected_checksums.extra = Some(extract_checksum_value(&trailers, algo)?); break; } } } + if trailer_algorithm.is_some() && expected_checksums.extra.is_none() { + return Err(Error::bad_request("trailing checksum was not sent")); + } + let checksums = checksummer.finalize(); checksums.verify(&expected_checksums)?; - return Ok(checksums); + Ok(checksums) }); let stream: BoxStream<_> = stream.into_inner().unwrap(); diff --git a/src/api/common/signature/checksum.rs b/src/api/common/signature/checksum.rs index 890c0452..3c5e7c53 100644 --- a/src/api/common/signature/checksum.rs +++ b/src/api/common/signature/checksum.rs @@ -12,10 +12,10 @@ use http::{HeaderMap, HeaderName, HeaderValue}; use garage_util::data::*; -use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue}; - use super::*; +pub use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue}; + pub const CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5"); pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName = @@ -198,17 +198,23 @@ impl Checksums { // ---- +pub fn parse_checksum_algorithm(algo: &str) -> Result { + match algo { + "CRC32" => Ok(ChecksumAlgorithm::Crc32), + "CRC32C" => Ok(ChecksumAlgorithm::Crc32c), + "SHA1" => Ok(ChecksumAlgorithm::Sha1), + "SHA256" => Ok(ChecksumAlgorithm::Sha256), + _ => Err(Error::bad_request("invalid checksum algorithm")), + } +} + /// Extract the value of the x-amz-checksum-algorithm header pub fn request_checksum_algorithm( headers: &HeaderMap, ) -> Result, Error> { match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { None => Ok(None), - Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)), - Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)), - Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)), - Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)), - _ => Err(Error::bad_request("invalid checksum algorithm")), + Some(x) => parse_checksum_algorithm(x.to_str()?).map(Some), } } @@ -231,37 +237,17 @@ pub fn request_checksum_value( ) -> Result, Error> { let mut ret = vec![]; - if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) { - let crc32 = BASE64_STANDARD - .decode(&crc32_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; - ret.push(ChecksumValue::Crc32(crc32)) + if headers.contains_key(X_AMZ_CHECKSUM_CRC32) { + ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Crc32)?); } - if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) { - let crc32c = BASE64_STANDARD - .decode(&crc32c_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; - ret.push(ChecksumValue::Crc32c(crc32c)) + if headers.contains_key(X_AMZ_CHECKSUM_CRC32C) { + ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Crc32c)?); } - if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) { - let sha1 = BASE64_STANDARD - .decode(&sha1_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; - ret.push(ChecksumValue::Sha1(sha1)) + if headers.contains_key(X_AMZ_CHECKSUM_SHA1) { + ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Sha1)?); } - if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) { - let sha256 = BASE64_STANDARD - .decode(&sha256_str) - .ok() - .and_then(|x| x.try_into().ok()) - .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; - ret.push(ChecksumValue::Sha256(sha256)) + if headers.contains_key(X_AMZ_CHECKSUM_SHA256) { + ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Sha256)?); } if ret.len() > 1 { @@ -274,44 +260,43 @@ pub fn request_checksum_value( /// Checks for the presence of x-amz-checksum-algorithm /// if so extract the corresponding x-amz-checksum-* value -pub fn request_checksum_algorithm_value( +pub fn extract_checksum_value( headers: &HeaderMap, -) -> Result, Error> { - match headers.get(X_AMZ_CHECKSUM_ALGORITHM) { - Some(x) if x == "CRC32" => { + algo: ChecksumAlgorithm, +) -> Result { + match algo { + ChecksumAlgorithm::Crc32 => { let crc32 = headers .get(X_AMZ_CHECKSUM_CRC32) .and_then(|x| BASE64_STANDARD.decode(&x).ok()) .and_then(|x| x.try_into().ok()) .ok_or_bad_request("invalid x-amz-checksum-crc32 header")?; - Ok(Some(ChecksumValue::Crc32(crc32))) + Ok(ChecksumValue::Crc32(crc32)) } - Some(x) if x == "CRC32C" => { + ChecksumAlgorithm::Crc32c => { let crc32c = headers .get(X_AMZ_CHECKSUM_CRC32C) .and_then(|x| BASE64_STANDARD.decode(&x).ok()) .and_then(|x| x.try_into().ok()) .ok_or_bad_request("invalid x-amz-checksum-crc32c header")?; - Ok(Some(ChecksumValue::Crc32c(crc32c))) + Ok(ChecksumValue::Crc32c(crc32c)) } - Some(x) if x == "SHA1" => { + ChecksumAlgorithm::Sha1 => { let sha1 = headers .get(X_AMZ_CHECKSUM_SHA1) .and_then(|x| BASE64_STANDARD.decode(&x).ok()) .and_then(|x| x.try_into().ok()) .ok_or_bad_request("invalid x-amz-checksum-sha1 header")?; - Ok(Some(ChecksumValue::Sha1(sha1))) + Ok(ChecksumValue::Sha1(sha1)) } - Some(x) if x == "SHA256" => { + ChecksumAlgorithm::Sha256 => { let sha256 = headers .get(X_AMZ_CHECKSUM_SHA256) .and_then(|x| BASE64_STANDARD.decode(&x).ok()) .and_then(|x| x.try_into().ok()) .ok_or_bad_request("invalid x-amz-checksum-sha256 header")?; - Ok(Some(ChecksumValue::Sha256(sha256))) + Ok(ChecksumValue::Sha256(sha256)) } - Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")), - None => Ok(None), } } diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs index 75f3bf80..70b6e004 100644 --- a/src/api/common/signature/streaming.rs +++ b/src/api/common/signature/streaming.rs @@ -5,7 +5,7 @@ use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use futures::prelude::*; use futures::task; use hmac::Mac; -use http::header::{HeaderValue, CONTENT_ENCODING}; +use http::header::{HeaderMap, HeaderValue, CONTENT_ENCODING}; use hyper::body::{Bytes, Frame, Incoming as IncomingBody}; use hyper::Request; @@ -64,10 +64,16 @@ pub fn parse_streaming_body( } // If trailer header is announced, add the calculation of the requested checksum - if trailer { - let algo = request_trailer_checksum_algorithm(req.headers())?; + let trailer_algorithm = if trailer { + let algo = Some( + request_trailer_checksum_algorithm(req.headers())? + .ok_or_bad_request("Missing x-amz-trailer header")?, + ); checksummer = checksummer.add(algo); - } + algo + } else { + None + }; // For signed variants, determine signing parameters let sign_params = if signed { @@ -123,6 +129,7 @@ pub fn parse_streaming_body( stream: Mutex::new(signed_payload_stream.boxed()), checksummer, expected_checksums, + trailer_algorithm, } })) } @@ -132,6 +139,7 @@ pub fn parse_streaming_body( stream: Mutex::new(stream.boxed()), checksummer, expected_checksums, + trailer_algorithm: None, } })), } @@ -185,6 +193,8 @@ fn compute_streaming_trailer_signature( } mod payload { + use http::{HeaderName, HeaderValue}; + use garage_util::data::Hash; use nom::bytes::streaming::{tag, take_while}; @@ -252,19 +262,21 @@ mod payload { #[derive(Debug, Clone)] pub struct TrailerChunk { - pub header_name: Vec, - pub header_value: Vec, + pub header_name: HeaderName, + pub header_value: HeaderValue, pub signature: Option, } impl TrailerChunk { fn parse_content(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { - let (input, header_name) = try_parse!(take_while( - |c: u8| c.is_ascii_alphanumeric() || c == b'-' + let (input, header_name) = try_parse!(map_res( + take_while(|c: u8| c.is_ascii_alphanumeric() || c == b'-'), + HeaderName::from_bytes )(input)); let (input, _) = try_parse!(tag(b":")(input)); - let (input, header_value) = try_parse!(take_while( - |c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c) + let (input, header_value) = try_parse!(map_res( + take_while(|c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c)), + HeaderValue::from_bytes )(input)); // Possible '\n' after the header value, depends on clients @@ -276,8 +288,8 @@ mod payload { Ok(( input, TrailerChunk { - header_name: header_name.to_vec(), - header_value: header_value.to_vec(), + header_name, + header_value, signature: None, }, )) @@ -371,6 +383,7 @@ where buf: bytes::BytesMut, signing: Option, has_trailer: bool, + done: bool, } impl StreamingPayloadStream @@ -383,6 +396,7 @@ where buf: bytes::BytesMut::new(), signing, has_trailer, + done: false, } } @@ -448,6 +462,10 @@ where let mut this = self.project(); + if *this.done { + return Poll::Ready(None); + } + loop { let (input, payload) = match Self::parse_next(this.buf, this.signing.is_some(), *this.has_trailer) { @@ -499,17 +517,23 @@ where if data.is_empty() { // if there was a trailer, it would have been returned by the parser assert!(!*this.has_trailer); + *this.done = true; return Poll::Ready(None); } return Poll::Ready(Some(Ok(Frame::data(data)))); } StreamingPayloadChunk::Trailer(trailer) => { + trace!( + "In StreamingPayloadStream::poll_next: got trailer {:?}", + trailer + ); + if let Some(signing) = this.signing.as_mut() { let data = [ - &trailer.header_name[..], + trailer.header_name.as_ref(), &b":"[..], - &trailer.header_value[..], + trailer.header_value.as_ref(), &b"\n"[..], ] .concat(); @@ -529,10 +553,12 @@ where } *this.buf = input.into(); + *this.done = true; - // TODO: handle trailer + let mut trailers_map = HeaderMap::new(); + trailers_map.insert(trailer.header_name, trailer.header_value); - return Poll::Ready(None); + return Poll::Ready(Some(Ok(Frame::trailers(trailers_map)))); } } } diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 6c1b7453..350684da 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -218,6 +218,7 @@ pub async fn handle_post_object( // around here to make sure the rest of the machinery takes our acl into account. let headers = get_headers(¶ms)?; + let checksum_algorithm = request_checksum_algorithm(¶ms)?; let expected_checksums = ExpectedChecksums { md5: params .get("content-md5") @@ -225,7 +226,9 @@ pub async fn handle_post_object( .transpose()? .map(str::to_string), sha256: None, - extra: request_checksum_algorithm_value(¶ms)?, + extra: checksum_algorithm + .map(|algo| extract_checksum_value(¶ms, algo)) + .transpose()?, }; let meta = ObjectVersionMetaInner { diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index c036f000..5860cf97 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -75,6 +75,7 @@ static_init.workspace = true assert-json-diff.workspace = true serde_json.workspace = true base64.workspace = true +crc32fast.workspace = true k2v-client.workspace = true diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index 99fd4385..6a8eed38 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -195,10 +195,10 @@ impl<'a> RequestBuilder<'a> { all_headers.insert(signature::X_AMZ_DATE, HeaderValue::from_str(&date).unwrap()); all_headers.insert(HOST, HeaderValue::from_str(&host).unwrap()); - let body_sha = match self.body_signature { + let body_sha = match &self.body_signature { BodySignature::Unsigned => "UNSIGNED-PAYLOAD".to_owned(), BodySignature::Classic => hex::encode(garage_util::data::sha256sum(&self.body)), - BodySignature::Streaming(size) => { + BodySignature::Streaming { chunk_size } => { all_headers.insert( CONTENT_ENCODING, HeaderValue::from_str("aws-chunked").unwrap(), @@ -213,15 +213,56 @@ impl<'a> RequestBuilder<'a> { // code. all_headers.insert( CONTENT_LENGTH, - to_streaming_body(&self.body, size, String::new(), signer.clone(), now, "") - .len() - .to_string() - .try_into() - .unwrap(), + to_streaming_body( + &self.body, + *chunk_size, + String::new(), + signer.clone(), + now, + "", + ) + .len() + .to_string() + .try_into() + .unwrap(), ); "STREAMING-AWS4-HMAC-SHA256-PAYLOAD".to_owned() } + BodySignature::StreamingUnsignedTrailer { + chunk_size, + trailer_algorithm, + trailer_value, + } => { + all_headers.insert( + CONTENT_ENCODING, + HeaderValue::from_str("aws-chunked").unwrap(), + ); + all_headers.insert( + HeaderName::from_static("x-amz-decoded-content-length"), + HeaderValue::from_str(&self.body.len().to_string()).unwrap(), + ); + all_headers.insert( + HeaderName::from_static("x-amz-trailer"), + HeaderValue::from_str(&trailer_algorithm).unwrap(), + ); + + all_headers.insert( + CONTENT_LENGTH, + to_streaming_unsigned_trailer_body( + &self.body, + *chunk_size, + &trailer_algorithm, + &trailer_value, + ) + .len() + .to_string() + .try_into() + .unwrap(), + ); + + "STREAMING-UNSIGNED-PAYLOAD-TRAILER".to_owned() + } }; all_headers.insert( signature::X_AMZ_CONTENT_SHA256, @@ -273,10 +314,26 @@ impl<'a> RequestBuilder<'a> { let mut request = Request::builder(); *request.headers_mut().unwrap() = all_headers; - let body = if let BodySignature::Streaming(size) = self.body_signature { - to_streaming_body(&self.body, size, signature, streaming_signer, now, &scope) - } else { - self.body.clone() + let body = match &self.body_signature { + BodySignature::Streaming { chunk_size } => to_streaming_body( + &self.body, + *chunk_size, + signature, + streaming_signer, + now, + &scope, + ), + BodySignature::StreamingUnsignedTrailer { + chunk_size, + trailer_algorithm, + trailer_value, + } => to_streaming_unsigned_trailer_body( + &self.body, + *chunk_size, + &trailer_algorithm, + &trailer_value, + ), + _ => self.body.clone(), }; let request = request .uri(uri) @@ -305,7 +362,14 @@ impl<'a> RequestBuilder<'a> { pub enum BodySignature { Unsigned, Classic, - Streaming(usize), + Streaming { + chunk_size: usize, + }, + StreamingUnsignedTrailer { + chunk_size: usize, + trailer_algorithm: String, + trailer_value: String, + }, } fn query_param_to_string(params: &HashMap>) -> String { @@ -360,3 +424,26 @@ fn to_streaming_body( res } + +fn to_streaming_unsigned_trailer_body( + body: &[u8], + chunk_size: usize, + trailer_algorithm: &str, + trailer_value: &str, +) -> Vec { + let mut res = Vec::with_capacity(body.len()); + for chunk in body.chunks(chunk_size) { + let header = format!("{:x}\r\n", chunk.len()); + res.extend_from_slice(header.as_bytes()); + res.extend_from_slice(chunk); + res.extend_from_slice(b"\r\n"); + } + + res.extend_from_slice(b"0\r\n"); + res.extend_from_slice(trailer_algorithm.as_bytes()); + res.extend_from_slice(b":"); + res.extend_from_slice(trailer_value.as_bytes()); + res.extend_from_slice(b"\n\r\n\r\n"); + + res +} diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index da6c624b..8d71504f 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -99,7 +99,10 @@ api_bind_addr = "127.0.0.1:{admin_port}" .arg("server") .stdout(stdout) .stderr(stderr) - .env("RUST_LOG", "garage=debug,garage_api=trace") + .env( + "RUST_LOG", + "garage=debug,garage_api_common=trace,garage_api_s3=trace", + ) .spawn() .expect("Could not start garage"); diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs index 351aa422..e83d2355 100644 --- a/src/garage/tests/s3/streaming_signature.rs +++ b/src/garage/tests/s3/streaming_signature.rs @@ -1,5 +1,8 @@ use std::collections::HashMap; +use base64::prelude::*; +use crc32fast::Hasher as Crc32; + use crate::common; use crate::common::ext::CommandExt; use common::custom_requester::BodySignature; @@ -21,7 +24,7 @@ async fn test_putobject_streaming() { let content_type = "text/csv"; let mut headers = HashMap::new(); headers.insert("content-type".to_owned(), content_type.to_owned()); - let _ = ctx + let res = ctx .custom_request .builder(bucket.clone()) .method(Method::PUT) @@ -29,10 +32,11 @@ async fn test_putobject_streaming() { .signed_headers(headers) .vhost_style(true) .body(vec![]) - .body_signature(BodySignature::Streaming(10)) + .body_signature(BodySignature::Streaming { chunk_size: 10 }) .send() .await .unwrap(); + assert!(res.status().is_success(), "got response: {:?}", res); // assert_eq!(r.e_tag.unwrap().as_str(), etag); // We return a version ID here @@ -65,7 +69,7 @@ async fn test_putobject_streaming() { { let etag = "\"46cf18a9b447991b450cad3facf5937e\""; - let _ = ctx + let res = ctx .custom_request .builder(bucket.clone()) .method(Method::PUT) @@ -74,10 +78,144 @@ async fn test_putobject_streaming() { .path("abc".to_owned()) .vhost_style(true) .body(BODY.to_vec()) - .body_signature(BodySignature::Streaming(16)) + .body_signature(BodySignature::Streaming { chunk_size: 16 }) .send() .await .unwrap(); + assert!(res.status().is_success(), "got response: {:?}", res); + + // assert_eq!(r.e_tag.unwrap().as_str(), etag); + // assert!(r.version_id.is_some()); + + let o = ctx + .client + .get_object() + .bucket(&bucket) + //.key(CTRL_KEY) + .key("abc") + .send() + .await + .unwrap(); + + assert_bytes_eq!(o.body, BODY); + assert_eq!(o.e_tag.unwrap(), etag); + assert!(o.last_modified.is_some()); + assert_eq!(o.content_length.unwrap(), 62); + assert_eq!(o.parts_count, None); + assert_eq!(o.tag_count, None); + } +} + +#[tokio::test] +async fn test_putobject_streaming_unsigned_trailer() { + let ctx = common::context(); + let bucket = ctx.create_bucket("putobject-streaming-unsigned-trailer"); + + { + // Send an empty object (can serve as a directory marker) + // with a content type + let etag = "\"d41d8cd98f00b204e9800998ecf8427e\""; + let content_type = "text/csv"; + let mut headers = HashMap::new(); + headers.insert("content-type".to_owned(), content_type.to_owned()); + + let empty_crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(Crc32::new().finalize())[..]); + + let res = ctx + .custom_request + .builder(bucket.clone()) + .method(Method::PUT) + .path(STD_KEY.to_owned()) + .signed_headers(headers) + .vhost_style(true) + .body(vec![]) + .body_signature(BodySignature::StreamingUnsignedTrailer { + chunk_size: 10, + trailer_algorithm: "x-amz-checksum-crc32".into(), + trailer_value: empty_crc32, + }) + .send() + .await + .unwrap(); + assert!(res.status().is_success(), "got response: {:?}", res); + + // assert_eq!(r.e_tag.unwrap().as_str(), etag); + // We return a version ID here + // We should check if Amazon is returning one when versioning is not enabled + // assert!(r.version_id.is_some()); + + //let _version = r.version_id.unwrap(); + + let o = ctx + .client + .get_object() + .bucket(&bucket) + .key(STD_KEY) + .send() + .await + .unwrap(); + + assert_bytes_eq!(o.body, b""); + assert_eq!(o.e_tag.unwrap(), etag); + // We do not return version ID + // We should check if Amazon is returning one when versioning is not enabled + // assert_eq!(o.version_id.unwrap(), _version); + assert_eq!(o.content_type.unwrap(), content_type); + assert!(o.last_modified.is_some()); + assert_eq!(o.content_length.unwrap(), 0); + assert_eq!(o.parts_count, None); + assert_eq!(o.tag_count, None); + } + + { + let etag = "\"46cf18a9b447991b450cad3facf5937e\""; + + let mut crc32 = Crc32::new(); + crc32.update(&BODY[..]); + let crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(crc32.finalize())[..]); + + // try sending with wrong crc32, check that it fails + let err_res = ctx + .custom_request + .builder(bucket.clone()) + .method(Method::PUT) + //.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this + //fail + .path("abc".to_owned()) + .vhost_style(true) + .body(BODY.to_vec()) + .body_signature(BodySignature::StreamingUnsignedTrailer { + chunk_size: 16, + trailer_algorithm: "x-amz-checksum-crc32".into(), + trailer_value: "2Yp9Yw==".into(), + }) + .send() + .await + .unwrap(); + assert!( + err_res.status().is_client_error(), + "got response: {:?}", + err_res + ); + + let res = ctx + .custom_request + .builder(bucket.clone()) + .method(Method::PUT) + //.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this + //fail + .path("abc".to_owned()) + .vhost_style(true) + .body(BODY.to_vec()) + .body_signature(BodySignature::StreamingUnsignedTrailer { + chunk_size: 16, + trailer_algorithm: "x-amz-checksum-crc32".into(), + trailer_value: crc32, + }) + .send() + .await + .unwrap(); + assert!(res.status().is_success(), "got response: {:?}", res); // assert_eq!(r.e_tag.unwrap().as_str(), etag); // assert!(r.version_id.is_some()); @@ -119,7 +257,7 @@ async fn test_create_bucket_streaming() { .custom_request .builder(bucket.to_owned()) .method(Method::PUT) - .body_signature(BodySignature::Streaming(10)) + .body_signature(BodySignature::Streaming { chunk_size: 10 }) .send() .await .unwrap(); @@ -174,7 +312,7 @@ async fn test_put_website_streaming() { .method(Method::PUT) .query_params(query) .body(website_config.as_bytes().to_vec()) - .body_signature(BodySignature::Streaming(10)) + .body_signature(BodySignature::Streaming { chunk_size: 10 }) .send() .await .unwrap();