From b45dcc1925d76f3f7dcae7deea0391953ba548e5 Mon Sep 17 00:00:00 2001 From: Jill Date: Mon, 17 Jan 2022 10:55:31 +0100 Subject: [PATCH] Support STREAMING-AWS4-HMAC-SHA256-PAYLOAD (#64) (#156) Closes #64. Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/156 Co-authored-by: Jill Co-committed-by: Jill --- Cargo.lock | 39 +++ Cargo.nix | 53 +++ default.nix | 2 +- script/dev-env-mc.sh | 2 +- src/api/Cargo.toml | 2 + src/api/api_server.rs | 6 +- src/api/s3_bucket.rs | 13 +- src/api/s3_delete.rs | 5 +- src/api/s3_put.rs | 110 ++++-- src/api/s3_website.rs | 5 +- src/api/signature/mod.rs | 47 +++ .../{signature.rs => signature/payload.rs} | 52 +-- src/api/signature/streaming.rs | 319 ++++++++++++++++++ 13 files changed, 578 insertions(+), 77 deletions(-) create mode 100644 src/api/signature/mod.rs rename src/api/{signature.rs => signature/payload.rs} (83%) create mode 100644 src/api/signature/streaming.rs diff --git a/Cargo.lock b/Cargo.lock index 033d157f..6f4b6b42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -433,7 +433,9 @@ dependencies = [ "idna", "log", "md-5", + "nom", "percent-encoding", + "pin-project", "quick-xml", "roxmltree", "serde", @@ -967,6 +969,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "mio" version = "0.7.13" @@ -1011,6 +1019,17 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "nom" +version = "7.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109" +dependencies = [ + "memchr", + "minimal-lexical", + "version_check", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -1092,6 +1111,26 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.7" diff --git a/Cargo.nix b/Cargo.nix index 53e93c34..44aa42e8 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -671,7 +671,9 @@ in idna = rustPackages."registry+https://github.com/rust-lang/crates.io-index".idna."0.2.3" { inherit profileName; }; log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; }; md5 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".md-5."0.9.1" { inherit profileName; }; + nom = rustPackages."registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" { inherit profileName; }; percent_encoding = rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.1.0" { inherit profileName; }; + pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" { inherit profileName; }; quick_xml = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quick-xml."0.21.0" { inherit profileName; }; roxmltree = rustPackages."registry+https://github.com/rust-lang/crates.io-index".roxmltree."0.14.1" { inherit profileName; }; serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; }; @@ -1321,6 +1323,16 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" = overridableMkRustCrate (profileName: rec { + name = "minimal-lexical"; + version = "0.2.1"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"; }; + features = builtins.concatLists [ + [ "std" ] + ]; + }); + "registry+https://github.com/rust-lang/crates.io-index".mio."0.7.13" = overridableMkRustCrate (profileName: rec { name = "mio"; version = "0.7.13"; @@ -1381,6 +1393,25 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" = overridableMkRustCrate (profileName: rec { + name = "nom"; + version = "7.1.0"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"; }; + features = builtins.concatLists [ + [ "alloc" ] + [ "default" ] + [ "std" ] + ]; + dependencies = { + memchr = rustPackages."registry+https://github.com/rust-lang/crates.io-index".memchr."2.4.1" { inherit profileName; }; + minimal_lexical = rustPackages."registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" { inherit profileName; }; + }; + buildDependencies = { + version_check = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".version_check."0.9.3" { profileName = "__noProfile"; }; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.6" = overridableMkRustCrate (profileName: rec { name = "ntapi"; version = "0.3.6"; @@ -1490,6 +1521,28 @@ in src = fetchCratesIo { inherit name version; sha256 = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"; }; }); + "registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" = overridableMkRustCrate (profileName: rec { + name = "pin-project"; + version = "1.0.8"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"; }; + dependencies = { + pin_project_internal = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" { profileName = "__noProfile"; }; + }; + }); + + "registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" = overridableMkRustCrate (profileName: rec { + name = "pin-project-internal"; + version = "1.0.8"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"; }; + dependencies = { + proc_macro2 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".proc-macro2."1.0.30" { inherit profileName; }; + quote = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quote."1.0.10" { inherit profileName; }; + syn = rustPackages."registry+https://github.com/rust-lang/crates.io-index".syn."1.0.80" { inherit profileName; }; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.7" = overridableMkRustCrate (profileName: rec { name = "pin-project-lite"; version = "0.2.7"; diff --git a/default.nix b/default.nix index faf6f522..108d4804 100644 --- a/default.nix +++ b/default.nix @@ -76,7 +76,7 @@ in let */ ''^(src|tests)'' # fixed default ''.*\.(rs|toml)$'' # fixed default - ''^(crdt|replication|cli|helper)'' # our crate submodules + ''^(crdt|replication|cli|helper|signature)'' # our crate submodules ]; }; diff --git a/script/dev-env-mc.sh b/script/dev-env-mc.sh index 98b63047..3f58b0a8 100644 --- a/script/dev-env-mc.sh +++ b/script/dev-env-mc.sh @@ -8,7 +8,7 @@ cat > /tmp/garage.mc/config.json <, req: Request) -> Result, Error> { - let (api_key, content_sha256) = check_signature(&garage, &req).await?; + let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?; let authority = req .headers() @@ -176,7 +176,7 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { - handle_put(garage, req, bucket_id, &key, content_sha256).await + handle_put(garage, req, bucket_id, &key, &api_key, content_sha256).await } Endpoint::AbortMultipartUpload { key, upload_id, .. } => { handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await diff --git a/src/api/s3_bucket.rs b/src/api/s3_bucket.rs index 494224c8..8a5407d3 100644 --- a/src/api/s3_bucket.rs +++ b/src/api/s3_bucket.rs @@ -120,7 +120,10 @@ pub async fn handle_create_bucket( bucket_name: String, ) -> Result, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let cmd = parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?; @@ -320,7 +323,7 @@ mod tests { assert_eq!( parse_create_bucket_xml( br#" - + "# ), @@ -329,8 +332,8 @@ mod tests { assert_eq!( parse_create_bucket_xml( br#" - - Europe + + Europe "# ), @@ -339,7 +342,7 @@ mod tests { assert_eq!( parse_create_bucket_xml( br#" - + "# ), diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 9e267490..b243d982 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -81,7 +81,10 @@ pub async fn handle_delete_objects( content_sha256: Option, ) -> Result, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?; diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 37658172..4e85664b 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,8 +1,10 @@ use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; -use futures::stream::*; -use hyper::{Body, Request, Response}; +use chrono::{DateTime, NaiveDateTime, Utc}; +use futures::{prelude::*, TryFutureExt}; +use hyper::body::{Body, Bytes}; +use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; @@ -14,19 +16,23 @@ use garage_util::time::*; use garage_model::block::INLINE_THRESHOLD; use garage_model::block_ref_table::*; use garage_model::garage::Garage; +use garage_model::key_table::Key; use garage_model::object_table::*; use garage_model::version_table::*; use crate::error::*; use crate::s3_xml; -use crate::signature::verify_signed_content; +use crate::signature::streaming::SignedPayloadStream; +use crate::signature::LONG_DATETIME; +use crate::signature::{compute_scope, verify_signed_content}; pub async fn handle_put( garage: Arc, req: Request, bucket_id: Uuid, key: &str, - content_sha256: Option, + api_key: &Key, + mut content_sha256: Option, ) -> Result, Error> { // Generate identity of new version let version_uuid = gen_uuid(); @@ -40,11 +46,53 @@ pub async fn handle_put( Some(x) => Some(x.to_str()?.to_string()), None => None, }; + let payload_seed_signature = match req.headers().get("x-amz-content-sha256") { + Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { + let content_sha256 = content_sha256 + .take() + .ok_or_bad_request("No signature provided")?; + Some(content_sha256) + } + _ => None, + }; // Parse body of uploaded file - let body = req.into_body(); + let (head, body) = req.into_parts(); + let body = body.map_err(Error::from); - let mut chunker = BodyChunker::new(body, garage.config.block_size); + let body = if let Some(signature) = payload_seed_signature { + let secret_key = &api_key + .state + .as_option() + .ok_or_internal_error("Deleted key state")? + .secret_key; + + let date = head + .headers + .get("x-amz-date") + .ok_or_bad_request("Missing X-Amz-Date field")? + .to_str()?; + let date: NaiveDateTime = + NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; + let date: DateTime = DateTime::from_utc(date, Utc); + + let scope = compute_scope(&date, &garage.config.s3_api.s3_region); + let signing_hmac = crate::signature::signing_hmac( + &date, + secret_key, + &garage.config.s3_api.s3_region, + "s3", + ) + .ok_or_internal_error("Unable to build signing HMAC")?; + + SignedPayloadStream::new(body, signing_hmac, date, &scope, signature)? + .map_err(Error::from) + .boxed() + } else { + body.boxed() + }; + + let mut chunker = StreamChunker::new(body, garage.config.block_size); let first_block = chunker.next().await?.unwrap_or_default(); // If body is small enough, store it directly in the object table @@ -178,13 +226,13 @@ fn ensure_checksum_matches( Ok(()) } -async fn read_and_put_blocks( +async fn read_and_put_blocks> + Unpin>( garage: &Garage, version: &Version, part_number: u64, first_block: Vec, first_block_hash: Hash, - chunker: &mut BodyChunker, + chunker: &mut StreamChunker, ) -> Result<(u64, GenericArray, Hash), Error> { let mut md5hasher = Md5::new(); let mut sha256hasher = Sha256::new(); @@ -205,8 +253,11 @@ async fn read_and_put_blocks( .rpc_put_block(first_block_hash, first_block); loop { - let (_, _, next_block) = - futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + let (_, _, next_block) = futures::try_join!( + put_curr_block.map_err(Error::from), + put_curr_version_block.map_err(Error::from), + chunker.next(), + )?; if let Some(block) = next_block { md5hasher.update(&block[..]); sha256hasher.update(&block[..]); @@ -266,32 +317,34 @@ async fn put_block_meta( Ok(()) } -struct BodyChunker { - body: Body, +struct StreamChunker>> { + stream: S, read_all: bool, block_size: usize, buf: VecDeque, } -impl BodyChunker { - fn new(body: Body, block_size: usize) -> Self { +impl> + Unpin> StreamChunker { + fn new(stream: S, block_size: usize) -> Self { Self { - body, + stream, read_all: false, block_size, buf: VecDeque::with_capacity(2 * block_size), } } - async fn next(&mut self) -> Result>, GarageError> { + + async fn next(&mut self) -> Result>, Error> { while !self.read_all && self.buf.len() < self.block_size { - if let Some(block) = self.body.next().await { + if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); - self.buf.extend(&bytes[..]); + self.buf.extend(bytes); } else { self.read_all = true; } } + if self.buf.is_empty() { Ok(None) } else if self.buf.len() <= self.block_size { @@ -368,12 +421,20 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); + + let body = req.into_body().map_err(Error::from); + let mut chunker = StreamChunker::new(body, garage.config.block_size); let (object, version, first_block) = futures::try_join!( - garage.object_table.get(&bucket_id, &key), - garage.version_table.get(&version_uuid, &EmptyKey), - chunker.next() + garage + .object_table + .get(&bucket_id, &key) + .map_err(Error::from), + garage + .version_table + .get(&version_uuid, &EmptyKey) + .map_err(Error::from), + chunker.next(), )?; // Check object is valid and multipart block can be accepted @@ -444,7 +505,10 @@ pub async fn handle_complete_multipart_upload( content_sha256: Option, ) -> Result, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml) diff --git a/src/api/s3_website.rs b/src/api/s3_website.rs index fcf8cba3..c4a43e2c 100644 --- a/src/api/s3_website.rs +++ b/src/api/s3_website.rs @@ -80,7 +80,10 @@ pub async fn handle_put_website( content_sha256: Option, ) -> Result, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let mut bucket = garage .bucket_table diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs new file mode 100644 index 00000000..ebdee6da --- /dev/null +++ b/src/api/signature/mod.rs @@ -0,0 +1,47 @@ +use chrono::{DateTime, Utc}; +use hmac::{Hmac, Mac, NewMac}; +use sha2::Sha256; + +use garage_util::data::{sha256sum, Hash}; + +use crate::error::*; + +pub mod payload; +pub mod streaming; + +pub const SHORT_DATE: &str = "%Y%m%d"; +pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; + +type HmacSha256 = Hmac; + +pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> { + if expected_sha256 != sha256sum(body) { + return Err(Error::BadRequest( + "Request content hash does not match signed hash".to_string(), + )); + } + Ok(()) +} + +pub fn signing_hmac( + datetime: &DateTime, + secret_key: &str, + region: &str, + service: &str, +) -> Result { + let secret = String::from("AWS4") + secret_key; + let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; + date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); + let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; + region_hmac.update(region.as_bytes()); + let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; + service_hmac.update(service.as_bytes()); + let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; + signing_hmac.update(b"aws4_request"); + let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; + Ok(hmac) +} + +pub fn compute_scope(datetime: &DateTime, region: &str) -> String { + format!("{}/{}/s3/aws4_request", datetime.format(SHORT_DATE), region,) +} diff --git a/src/api/signature.rs b/src/api/signature/payload.rs similarity index 83% rename from src/api/signature.rs rename to src/api/signature/payload.rs index 311e6a9a..b13819a8 100644 --- a/src/api/signature.rs +++ b/src/api/signature/payload.rs @@ -1,25 +1,23 @@ use std::collections::HashMap; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; -use hmac::{Hmac, Mac, NewMac}; +use hmac::Mac; use hyper::{Body, Method, Request}; use sha2::{Digest, Sha256}; use garage_table::*; -use garage_util::data::{sha256sum, Hash}; +use garage_util::data::Hash; use garage_model::garage::Garage; use garage_model::key_table::*; +use super::signing_hmac; +use super::{LONG_DATETIME, SHORT_DATE}; + use crate::encoding::uri_encode; use crate::error::*; -const SHORT_DATE: &str = "%Y%m%d"; -const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; - -type HmacSha256 = Hmac; - -pub async fn check_signature( +pub async fn check_payload_signature( garage: &Garage, request: &Request, ) -> Result<(Key, Option), Error> { @@ -97,13 +95,13 @@ pub async fn check_signature( let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" { None + } else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" { + let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?; + Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?) } else { let bytes = hex::decode(authorization.content_sha256) .ok_or_bad_request("Invalid content sha256 hash")?; - Some( - Hash::try_from(&bytes[..]) - .ok_or_else(|| Error::BadRequest("Invalid content sha256 hash".to_string()))?, - ) + Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?) }; Ok((key, content_sha256)) @@ -222,25 +220,6 @@ fn string_to_sign(datetime: &DateTime, scope_string: &str, canonical_req: & .join("\n") } -fn signing_hmac( - datetime: &DateTime, - secret_key: &str, - region: &str, - service: &str, -) -> Result { - let secret = String::from("AWS4") + secret_key; - let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; - date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); - let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; - region_hmac.update(region.as_bytes()); - let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; - service_hmac.update(service.as_bytes()); - let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; - signing_hmac.update(b"aws4_request"); - let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; - Ok(hmac) -} - fn canonical_request( method: &Method, url_path: &str, @@ -288,14 +267,3 @@ fn canonical_query_string(uri: &hyper::Uri) -> String { "".to_string() } } - -pub fn verify_signed_content(content_sha256: Option, body: &[u8]) -> Result<(), Error> { - let expected_sha256 = - content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; - if expected_sha256 != sha256sum(body) { - return Err(Error::BadRequest( - "Request content hash does not match signed hash".to_string(), - )); - } - Ok(()) -} diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs new file mode 100644 index 00000000..b2dc1591 --- /dev/null +++ b/src/api/signature/streaming.rs @@ -0,0 +1,319 @@ +use std::pin::Pin; + +use chrono::{DateTime, Utc}; +use futures::prelude::*; +use futures::task; +use hyper::body::Bytes; + +use garage_util::data::Hash; +use hmac::Mac; + +use super::sha256sum; +use super::HmacSha256; +use super::LONG_DATETIME; + +use crate::error::*; + +/// Result of `sha256("")` +const EMPTY_STRING_HEX_DIGEST: &str = + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + +fn compute_streaming_payload_signature( + signing_hmac: &HmacSha256, + date: DateTime, + scope: &str, + previous_signature: Hash, + content_sha256: Hash, +) -> Result { + let string_to_sign = [ + "AWS4-HMAC-SHA256-PAYLOAD", + &date.format(LONG_DATETIME).to_string(), + scope, + &hex::encode(previous_signature), + EMPTY_STRING_HEX_DIGEST, + &hex::encode(content_sha256), + ] + .join("\n"); + + let mut hmac = signing_hmac.clone(); + hmac.update(string_to_sign.as_bytes()); + + Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature") +} + +mod payload { + use garage_util::data::Hash; + + pub enum Error { + Parser(nom::error::Error), + BadSignature, + } + + impl Error { + pub fn description(&self) -> &str { + match *self { + Error::Parser(ref e) => e.code.description(), + Error::BadSignature => "Bad signature", + } + } + } + + #[derive(Debug, Clone)] + pub struct Header { + pub size: usize, + pub signature: Hash, + } + + impl Header { + pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + use nom::bytes::streaming::tag; + use nom::character::streaming::hex_digit1; + use nom::combinator::map_res; + use nom::number::streaming::hex_u32; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(|e| e.map(Error::Parser))? + }; + } + + let (input, size) = try_parse!(hex_u32(input)); + let (input, _) = try_parse!(tag(";")(input)); + + let (input, _) = try_parse!(tag("chunk-signature=")(input)); + let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); + let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; + + let (input, _) = try_parse!(tag("\r\n")(input)); + + let header = Header { + size: size as usize, + signature, + }; + + Ok((input, header)) + } + } +} + +#[derive(Debug)] +pub enum SignedPayloadStreamError { + Stream(Error), + InvalidSignature, + Message(String), +} + +impl SignedPayloadStreamError { + fn message(msg: &str) -> Self { + SignedPayloadStreamError::Message(msg.into()) + } +} + +impl From for Error { + fn from(err: SignedPayloadStreamError) -> Self { + match err { + SignedPayloadStreamError::Stream(e) => e, + SignedPayloadStreamError::InvalidSignature => { + Error::BadRequest("Invalid payload signature".into()) + } + SignedPayloadStreamError::Message(e) => { + Error::BadRequest(format!("Chunk format error: {}", e)) + } + } + } +} + +impl From> for SignedPayloadStreamError { + fn from(err: payload::Error) -> Self { + Self::message(err.description()) + } +} + +impl From> for SignedPayloadStreamError { + fn from(err: nom::error::Error) -> Self { + Self::message(err.code.description()) + } +} + +struct SignedPayload { + header: payload::Header, + data: Bytes, +} + +#[pin_project::pin_project] +pub struct SignedPayloadStream +where + S: Stream>, +{ + #[pin] + stream: S, + buf: bytes::BytesMut, + datetime: DateTime, + scope: String, + signing_hmac: HmacSha256, + previous_signature: Hash, +} + +impl SignedPayloadStream +where + S: Stream>, +{ + pub fn new( + stream: S, + signing_hmac: HmacSha256, + datetime: DateTime, + scope: &str, + seed_signature: Hash, + ) -> Result { + Ok(Self { + stream, + buf: bytes::BytesMut::new(), + datetime, + scope: scope.into(), + signing_hmac, + previous_signature: seed_signature, + }) + } + + fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> { + use nom::bytes::streaming::{tag, take}; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(nom::Err::convert)? + }; + } + + let (input, header) = try_parse!(payload::Header::parse(input)); + + // 0-sized chunk is the last + if header.size == 0 { + return Ok(( + input, + SignedPayload { + header, + data: Bytes::new(), + }, + )); + } + + let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); + let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); + + let data = Bytes::from(data.to_vec()); + + Ok((input, SignedPayload { header, data })) + } +} + +impl Stream for SignedPayloadStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll> { + use std::task::Poll; + + let mut this = self.project(); + + loop { + let (input, payload) = match Self::parse_next(this.buf) { + Ok(res) => res, + Err(nom::Err::Incomplete(_)) => { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + continue; + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))) + } + None => { + return Poll::Ready(Some(Err(SignedPayloadStreamError::message( + "Unexpected EOF", + )))); + } + } + } + Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { + return Poll::Ready(Some(Err(e))) + } + }; + + // 0-sized chunk is the last + if payload.data.is_empty() { + return Poll::Ready(None); + } + + let data_sha256sum = sha256sum(&payload.data); + + let expected_signature = compute_streaming_payload_signature( + this.signing_hmac, + *this.datetime, + this.scope, + *this.previous_signature, + data_sha256sum, + ) + .map_err(|e| { + SignedPayloadStreamError::Message(format!("Could not build signature: {}", e)) + })?; + + if payload.header.signature != expected_signature { + return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature))); + } + + *this.buf = input.into(); + *this.previous_signature = payload.header.signature; + + return Poll::Ready(Some(Ok(payload.data))); + } + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +#[cfg(test)] +mod tests { + use futures::prelude::*; + + use super::{SignedPayloadStream, SignedPayloadStreamError}; + + #[tokio::test] + async fn test_interrupted_signed_payload_stream() { + use chrono::{DateTime, Utc}; + + use garage_util::data::Hash; + + let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0 + .unwrap() + .with_timezone(&Utc); + let secret_key = "test"; + let region = "test"; + let scope = crate::signature::compute_scope(&datetime, region); + let signing_hmac = + crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap(); + + let data: &[&[u8]] = &[b"1"]; + let body = futures::stream::iter(data.iter().map(|block| Ok(block.as_ref().into()))); + + let seed_signature = Hash::default(); + + let mut stream = + SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature).unwrap(); + + assert!(stream.try_next().await.is_err()); + match stream.try_next().await { + Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {} + item => panic!( + "Unexpected result, expected early EOF error, got {:?}", + item + ), + } + } +}