api: validate trailing checksum + add test for unsigned-paylad-trailer

This commit is contained in:
Alex 2025-02-18 13:59:43 +01:00
parent ccab0e4ae5
commit 730bfee753
9 changed files with 337 additions and 88 deletions

1
Cargo.lock generated
View file

@ -1230,6 +1230,7 @@ dependencies = [
"bytes",
"bytesize",
"chrono",
"crc32fast",
"format_table",
"futures",
"garage_api_admin",

View file

@ -17,6 +17,7 @@ pub struct ReqBody {
pub(crate) stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>,
pub(crate) checksummer: Checksummer,
pub(crate) expected_checksums: ExpectedChecksums,
pub(crate) trailer_algorithm: Option<ChecksumAlgorithm>,
}
pub type StreamingChecksumReceiver = task::JoinHandle<Result<Checksums, Error>>;
@ -74,6 +75,7 @@ impl ReqBody {
stream,
mut checksummer,
mut expected_checksums,
trailer_algorithm,
} = self;
let (frame_tx, mut frame_rx) = mpsc::channel::<Frame<Bytes>>(1);
@ -91,18 +93,21 @@ impl ReqBody {
}
Err(frame) => {
let trailers = frame.into_trailers().unwrap();
if let Some(cv) = request_checksum_value(&trailers)? {
expected_checksums.extra = Some(cv);
}
let algo = trailer_algorithm.unwrap();
expected_checksums.extra = Some(extract_checksum_value(&trailers, algo)?);
break;
}
}
}
if trailer_algorithm.is_some() && expected_checksums.extra.is_none() {
return Err(Error::bad_request("trailing checksum was not sent"));
}
let checksums = checksummer.finalize();
checksums.verify(&expected_checksums)?;
return Ok(checksums);
Ok(checksums)
});
let stream: BoxStream<_> = stream.into_inner().unwrap();

View file

@ -12,10 +12,10 @@ use http::{HeaderMap, HeaderName, HeaderValue};
use garage_util::data::*;
use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue};
use super::*;
pub use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue};
pub const CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName =
@ -198,17 +198,23 @@ impl Checksums {
// ----
pub fn parse_checksum_algorithm(algo: &str) -> Result<ChecksumAlgorithm, Error> {
match algo {
"CRC32" => Ok(ChecksumAlgorithm::Crc32),
"CRC32C" => Ok(ChecksumAlgorithm::Crc32c),
"SHA1" => Ok(ChecksumAlgorithm::Sha1),
"SHA256" => Ok(ChecksumAlgorithm::Sha256),
_ => Err(Error::bad_request("invalid checksum algorithm")),
}
}
/// Extract the value of the x-amz-checksum-algorithm header
pub fn request_checksum_algorithm(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumAlgorithm>, Error> {
match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
None => Ok(None),
Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)),
Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)),
Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)),
Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)),
_ => Err(Error::bad_request("invalid checksum algorithm")),
Some(x) => parse_checksum_algorithm(x.to_str()?).map(Some),
}
}
@ -231,37 +237,17 @@ pub fn request_checksum_value(
) -> Result<Option<ChecksumValue>, Error> {
let mut ret = vec![];
if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) {
let crc32 = BASE64_STANDARD
.decode(&crc32_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
ret.push(ChecksumValue::Crc32(crc32))
if headers.contains_key(X_AMZ_CHECKSUM_CRC32) {
ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Crc32)?);
}
if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) {
let crc32c = BASE64_STANDARD
.decode(&crc32c_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
ret.push(ChecksumValue::Crc32c(crc32c))
if headers.contains_key(X_AMZ_CHECKSUM_CRC32C) {
ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Crc32c)?);
}
if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) {
let sha1 = BASE64_STANDARD
.decode(&sha1_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
ret.push(ChecksumValue::Sha1(sha1))
if headers.contains_key(X_AMZ_CHECKSUM_SHA1) {
ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Sha1)?);
}
if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) {
let sha256 = BASE64_STANDARD
.decode(&sha256_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
ret.push(ChecksumValue::Sha256(sha256))
if headers.contains_key(X_AMZ_CHECKSUM_SHA256) {
ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Sha256)?);
}
if ret.len() > 1 {
@ -274,44 +260,43 @@ pub fn request_checksum_value(
/// Checks for the presence of x-amz-checksum-algorithm
/// if so extract the corresponding x-amz-checksum-* value
pub fn request_checksum_algorithm_value(
pub fn extract_checksum_value(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumValue>, Error> {
match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
Some(x) if x == "CRC32" => {
algo: ChecksumAlgorithm,
) -> Result<ChecksumValue, Error> {
match algo {
ChecksumAlgorithm::Crc32 => {
let crc32 = headers
.get(X_AMZ_CHECKSUM_CRC32)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
Ok(Some(ChecksumValue::Crc32(crc32)))
Ok(ChecksumValue::Crc32(crc32))
}
Some(x) if x == "CRC32C" => {
ChecksumAlgorithm::Crc32c => {
let crc32c = headers
.get(X_AMZ_CHECKSUM_CRC32C)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
Ok(Some(ChecksumValue::Crc32c(crc32c)))
Ok(ChecksumValue::Crc32c(crc32c))
}
Some(x) if x == "SHA1" => {
ChecksumAlgorithm::Sha1 => {
let sha1 = headers
.get(X_AMZ_CHECKSUM_SHA1)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
Ok(Some(ChecksumValue::Sha1(sha1)))
Ok(ChecksumValue::Sha1(sha1))
}
Some(x) if x == "SHA256" => {
ChecksumAlgorithm::Sha256 => {
let sha256 = headers
.get(X_AMZ_CHECKSUM_SHA256)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
Ok(Some(ChecksumValue::Sha256(sha256)))
Ok(ChecksumValue::Sha256(sha256))
}
Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")),
None => Ok(None),
}
}

View file

@ -5,7 +5,7 @@ use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use futures::prelude::*;
use futures::task;
use hmac::Mac;
use http::header::{HeaderValue, CONTENT_ENCODING};
use http::header::{HeaderMap, HeaderValue, CONTENT_ENCODING};
use hyper::body::{Bytes, Frame, Incoming as IncomingBody};
use hyper::Request;
@ -64,10 +64,16 @@ pub fn parse_streaming_body(
}
// If trailer header is announced, add the calculation of the requested checksum
if trailer {
let algo = request_trailer_checksum_algorithm(req.headers())?;
let trailer_algorithm = if trailer {
let algo = Some(
request_trailer_checksum_algorithm(req.headers())?
.ok_or_bad_request("Missing x-amz-trailer header")?,
);
checksummer = checksummer.add(algo);
}
algo
} else {
None
};
// For signed variants, determine signing parameters
let sign_params = if signed {
@ -123,6 +129,7 @@ pub fn parse_streaming_body(
stream: Mutex::new(signed_payload_stream.boxed()),
checksummer,
expected_checksums,
trailer_algorithm,
}
}))
}
@ -132,6 +139,7 @@ pub fn parse_streaming_body(
stream: Mutex::new(stream.boxed()),
checksummer,
expected_checksums,
trailer_algorithm: None,
}
})),
}
@ -185,6 +193,8 @@ fn compute_streaming_trailer_signature(
}
mod payload {
use http::{HeaderName, HeaderValue};
use garage_util::data::Hash;
use nom::bytes::streaming::{tag, take_while};
@ -252,19 +262,21 @@ mod payload {
#[derive(Debug, Clone)]
pub struct TrailerChunk {
pub header_name: Vec<u8>,
pub header_value: Vec<u8>,
pub header_name: HeaderName,
pub header_value: HeaderValue,
pub signature: Option<Hash>,
}
impl TrailerChunk {
fn parse_content(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
let (input, header_name) = try_parse!(take_while(
|c: u8| c.is_ascii_alphanumeric() || c == b'-'
let (input, header_name) = try_parse!(map_res(
take_while(|c: u8| c.is_ascii_alphanumeric() || c == b'-'),
HeaderName::from_bytes
)(input));
let (input, _) = try_parse!(tag(b":")(input));
let (input, header_value) = try_parse!(take_while(
|c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c)
let (input, header_value) = try_parse!(map_res(
take_while(|c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c)),
HeaderValue::from_bytes
)(input));
// Possible '\n' after the header value, depends on clients
@ -276,8 +288,8 @@ mod payload {
Ok((
input,
TrailerChunk {
header_name: header_name.to_vec(),
header_value: header_value.to_vec(),
header_name,
header_value,
signature: None,
},
))
@ -371,6 +383,7 @@ where
buf: bytes::BytesMut,
signing: Option<SignParams>,
has_trailer: bool,
done: bool,
}
impl<S> StreamingPayloadStream<S>
@ -383,6 +396,7 @@ where
buf: bytes::BytesMut::new(),
signing,
has_trailer,
done: false,
}
}
@ -448,6 +462,10 @@ where
let mut this = self.project();
if *this.done {
return Poll::Ready(None);
}
loop {
let (input, payload) =
match Self::parse_next(this.buf, this.signing.is_some(), *this.has_trailer) {
@ -499,17 +517,23 @@ where
if data.is_empty() {
// if there was a trailer, it would have been returned by the parser
assert!(!*this.has_trailer);
*this.done = true;
return Poll::Ready(None);
}
return Poll::Ready(Some(Ok(Frame::data(data))));
}
StreamingPayloadChunk::Trailer(trailer) => {
trace!(
"In StreamingPayloadStream::poll_next: got trailer {:?}",
trailer
);
if let Some(signing) = this.signing.as_mut() {
let data = [
&trailer.header_name[..],
trailer.header_name.as_ref(),
&b":"[..],
&trailer.header_value[..],
trailer.header_value.as_ref(),
&b"\n"[..],
]
.concat();
@ -529,10 +553,12 @@ where
}
*this.buf = input.into();
*this.done = true;
// TODO: handle trailer
let mut trailers_map = HeaderMap::new();
trailers_map.insert(trailer.header_name, trailer.header_value);
return Poll::Ready(None);
return Poll::Ready(Some(Ok(Frame::trailers(trailers_map))));
}
}
}

View file

@ -218,6 +218,7 @@ pub async fn handle_post_object(
// around here to make sure the rest of the machinery takes our acl into account.
let headers = get_headers(&params)?;
let checksum_algorithm = request_checksum_algorithm(&params)?;
let expected_checksums = ExpectedChecksums {
md5: params
.get("content-md5")
@ -225,7 +226,9 @@ pub async fn handle_post_object(
.transpose()?
.map(str::to_string),
sha256: None,
extra: request_checksum_algorithm_value(&params)?,
extra: checksum_algorithm
.map(|algo| extract_checksum_value(&params, algo))
.transpose()?,
};
let meta = ObjectVersionMetaInner {

View file

@ -75,6 +75,7 @@ static_init.workspace = true
assert-json-diff.workspace = true
serde_json.workspace = true
base64.workspace = true
crc32fast.workspace = true
k2v-client.workspace = true

View file

@ -195,10 +195,10 @@ impl<'a> RequestBuilder<'a> {
all_headers.insert(signature::X_AMZ_DATE, HeaderValue::from_str(&date).unwrap());
all_headers.insert(HOST, HeaderValue::from_str(&host).unwrap());
let body_sha = match self.body_signature {
let body_sha = match &self.body_signature {
BodySignature::Unsigned => "UNSIGNED-PAYLOAD".to_owned(),
BodySignature::Classic => hex::encode(garage_util::data::sha256sum(&self.body)),
BodySignature::Streaming(size) => {
BodySignature::Streaming { chunk_size } => {
all_headers.insert(
CONTENT_ENCODING,
HeaderValue::from_str("aws-chunked").unwrap(),
@ -213,15 +213,56 @@ impl<'a> RequestBuilder<'a> {
// code.
all_headers.insert(
CONTENT_LENGTH,
to_streaming_body(&self.body, size, String::new(), signer.clone(), now, "")
.len()
.to_string()
.try_into()
.unwrap(),
to_streaming_body(
&self.body,
*chunk_size,
String::new(),
signer.clone(),
now,
"",
)
.len()
.to_string()
.try_into()
.unwrap(),
);
"STREAMING-AWS4-HMAC-SHA256-PAYLOAD".to_owned()
}
BodySignature::StreamingUnsignedTrailer {
chunk_size,
trailer_algorithm,
trailer_value,
} => {
all_headers.insert(
CONTENT_ENCODING,
HeaderValue::from_str("aws-chunked").unwrap(),
);
all_headers.insert(
HeaderName::from_static("x-amz-decoded-content-length"),
HeaderValue::from_str(&self.body.len().to_string()).unwrap(),
);
all_headers.insert(
HeaderName::from_static("x-amz-trailer"),
HeaderValue::from_str(&trailer_algorithm).unwrap(),
);
all_headers.insert(
CONTENT_LENGTH,
to_streaming_unsigned_trailer_body(
&self.body,
*chunk_size,
&trailer_algorithm,
&trailer_value,
)
.len()
.to_string()
.try_into()
.unwrap(),
);
"STREAMING-UNSIGNED-PAYLOAD-TRAILER".to_owned()
}
};
all_headers.insert(
signature::X_AMZ_CONTENT_SHA256,
@ -273,10 +314,26 @@ impl<'a> RequestBuilder<'a> {
let mut request = Request::builder();
*request.headers_mut().unwrap() = all_headers;
let body = if let BodySignature::Streaming(size) = self.body_signature {
to_streaming_body(&self.body, size, signature, streaming_signer, now, &scope)
} else {
self.body.clone()
let body = match &self.body_signature {
BodySignature::Streaming { chunk_size } => to_streaming_body(
&self.body,
*chunk_size,
signature,
streaming_signer,
now,
&scope,
),
BodySignature::StreamingUnsignedTrailer {
chunk_size,
trailer_algorithm,
trailer_value,
} => to_streaming_unsigned_trailer_body(
&self.body,
*chunk_size,
&trailer_algorithm,
&trailer_value,
),
_ => self.body.clone(),
};
let request = request
.uri(uri)
@ -305,7 +362,14 @@ impl<'a> RequestBuilder<'a> {
pub enum BodySignature {
Unsigned,
Classic,
Streaming(usize),
Streaming {
chunk_size: usize,
},
StreamingUnsignedTrailer {
chunk_size: usize,
trailer_algorithm: String,
trailer_value: String,
},
}
fn query_param_to_string(params: &HashMap<String, Option<String>>) -> String {
@ -360,3 +424,26 @@ fn to_streaming_body(
res
}
fn to_streaming_unsigned_trailer_body(
body: &[u8],
chunk_size: usize,
trailer_algorithm: &str,
trailer_value: &str,
) -> Vec<u8> {
let mut res = Vec::with_capacity(body.len());
for chunk in body.chunks(chunk_size) {
let header = format!("{:x}\r\n", chunk.len());
res.extend_from_slice(header.as_bytes());
res.extend_from_slice(chunk);
res.extend_from_slice(b"\r\n");
}
res.extend_from_slice(b"0\r\n");
res.extend_from_slice(trailer_algorithm.as_bytes());
res.extend_from_slice(b":");
res.extend_from_slice(trailer_value.as_bytes());
res.extend_from_slice(b"\n\r\n\r\n");
res
}

View file

@ -99,7 +99,10 @@ api_bind_addr = "127.0.0.1:{admin_port}"
.arg("server")
.stdout(stdout)
.stderr(stderr)
.env("RUST_LOG", "garage=debug,garage_api=trace")
.env(
"RUST_LOG",
"garage=debug,garage_api_common=trace,garage_api_s3=trace",
)
.spawn()
.expect("Could not start garage");

View file

@ -1,5 +1,8 @@
use std::collections::HashMap;
use base64::prelude::*;
use crc32fast::Hasher as Crc32;
use crate::common;
use crate::common::ext::CommandExt;
use common::custom_requester::BodySignature;
@ -21,7 +24,7 @@ async fn test_putobject_streaming() {
let content_type = "text/csv";
let mut headers = HashMap::new();
headers.insert("content-type".to_owned(), content_type.to_owned());
let _ = ctx
let res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
@ -29,10 +32,11 @@ async fn test_putobject_streaming() {
.signed_headers(headers)
.vhost_style(true)
.body(vec![])
.body_signature(BodySignature::Streaming(10))
.body_signature(BodySignature::Streaming { chunk_size: 10 })
.send()
.await
.unwrap();
assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// We return a version ID here
@ -65,7 +69,7 @@ async fn test_putobject_streaming() {
{
let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
let _ = ctx
let res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
@ -74,10 +78,144 @@ async fn test_putobject_streaming() {
.path("abc".to_owned())
.vhost_style(true)
.body(BODY.to_vec())
.body_signature(BodySignature::Streaming(16))
.body_signature(BodySignature::Streaming { chunk_size: 16 })
.send()
.await
.unwrap();
assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// assert!(r.version_id.is_some());
let o = ctx
.client
.get_object()
.bucket(&bucket)
//.key(CTRL_KEY)
.key("abc")
.send()
.await
.unwrap();
assert_bytes_eq!(o.body, BODY);
assert_eq!(o.e_tag.unwrap(), etag);
assert!(o.last_modified.is_some());
assert_eq!(o.content_length.unwrap(), 62);
assert_eq!(o.parts_count, None);
assert_eq!(o.tag_count, None);
}
}
#[tokio::test]
async fn test_putobject_streaming_unsigned_trailer() {
let ctx = common::context();
let bucket = ctx.create_bucket("putobject-streaming-unsigned-trailer");
{
// Send an empty object (can serve as a directory marker)
// with a content type
let etag = "\"d41d8cd98f00b204e9800998ecf8427e\"";
let content_type = "text/csv";
let mut headers = HashMap::new();
headers.insert("content-type".to_owned(), content_type.to_owned());
let empty_crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(Crc32::new().finalize())[..]);
let res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
.path(STD_KEY.to_owned())
.signed_headers(headers)
.vhost_style(true)
.body(vec![])
.body_signature(BodySignature::StreamingUnsignedTrailer {
chunk_size: 10,
trailer_algorithm: "x-amz-checksum-crc32".into(),
trailer_value: empty_crc32,
})
.send()
.await
.unwrap();
assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// We return a version ID here
// We should check if Amazon is returning one when versioning is not enabled
// assert!(r.version_id.is_some());
//let _version = r.version_id.unwrap();
let o = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.send()
.await
.unwrap();
assert_bytes_eq!(o.body, b"");
assert_eq!(o.e_tag.unwrap(), etag);
// We do not return version ID
// We should check if Amazon is returning one when versioning is not enabled
// assert_eq!(o.version_id.unwrap(), _version);
assert_eq!(o.content_type.unwrap(), content_type);
assert!(o.last_modified.is_some());
assert_eq!(o.content_length.unwrap(), 0);
assert_eq!(o.parts_count, None);
assert_eq!(o.tag_count, None);
}
{
let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
let mut crc32 = Crc32::new();
crc32.update(&BODY[..]);
let crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(crc32.finalize())[..]);
// try sending with wrong crc32, check that it fails
let err_res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
//.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this
//fail
.path("abc".to_owned())
.vhost_style(true)
.body(BODY.to_vec())
.body_signature(BodySignature::StreamingUnsignedTrailer {
chunk_size: 16,
trailer_algorithm: "x-amz-checksum-crc32".into(),
trailer_value: "2Yp9Yw==".into(),
})
.send()
.await
.unwrap();
assert!(
err_res.status().is_client_error(),
"got response: {:?}",
err_res
);
let res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
//.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this
//fail
.path("abc".to_owned())
.vhost_style(true)
.body(BODY.to_vec())
.body_signature(BodySignature::StreamingUnsignedTrailer {
chunk_size: 16,
trailer_algorithm: "x-amz-checksum-crc32".into(),
trailer_value: crc32,
})
.send()
.await
.unwrap();
assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// assert!(r.version_id.is_some());
@ -119,7 +257,7 @@ async fn test_create_bucket_streaming() {
.custom_request
.builder(bucket.to_owned())
.method(Method::PUT)
.body_signature(BodySignature::Streaming(10))
.body_signature(BodySignature::Streaming { chunk_size: 10 })
.send()
.await
.unwrap();
@ -174,7 +312,7 @@ async fn test_put_website_streaming() {
.method(Method::PUT)
.query_params(query)
.body(website_config.as_bytes().to_vec())
.body_signature(BodySignature::Streaming(10))
.body_signature(BodySignature::Streaming { chunk_size: 10 })
.send()
.await
.unwrap();