From 732b4d0b63b58e7b09267b3165c7713ccec23ee5 Mon Sep 17 00:00:00 2001 From: Jill Date: Wed, 17 Nov 2021 18:13:34 +0100 Subject: [PATCH 01/11] garage_api: Refactor BodyChunker for stream-composition --- src/api/s3_put.rs | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 37658172..d4bd454f 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,8 +1,9 @@ use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; -use futures::stream::*; -use hyper::{Body, Request, Response}; +use futures::prelude::*; +use hyper::body::{Body, Bytes}; +use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; @@ -44,7 +45,7 @@ pub async fn handle_put( // Parse body of uploaded file let body = req.into_body(); - let mut chunker = BodyChunker::new(body, garage.config.block_size); + let mut chunker = StreamChunker::new(body, garage.config.block_size); let first_block = chunker.next().await?.unwrap_or_default(); // If body is small enough, store it directly in the object table @@ -178,13 +179,13 @@ fn ensure_checksum_matches( Ok(()) } -async fn read_and_put_blocks( +async fn read_and_put_blocks, S: Stream> + Unpin>( garage: &Garage, version: &Version, part_number: u64, first_block: Vec, first_block_hash: Hash, - chunker: &mut BodyChunker, + chunker: &mut StreamChunker, ) -> Result<(u64, GenericArray, Hash), Error> { let mut md5hasher = Md5::new(); let mut sha256hasher = Sha256::new(); @@ -205,8 +206,11 @@ async fn read_and_put_blocks( .rpc_put_block(first_block_hash, first_block); loop { - let (_, _, next_block) = - futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + let (_, _, next_block) = futures::try_join!( + put_curr_block, + put_curr_version_block, + chunker.next().map_err(Into::into), + )?; if let Some(block) = next_block { md5hasher.update(&block[..]); sha256hasher.update(&block[..]); @@ -266,25 +270,26 @@ async fn put_block_meta( Ok(()) } -struct BodyChunker { - body: Body, +struct StreamChunker>, E> { + stream: S, read_all: bool, block_size: usize, buf: VecDeque, } -impl BodyChunker { - fn new(body: Body, block_size: usize) -> Self { +impl> + Unpin, E> StreamChunker { + fn new(stream: S, block_size: usize) -> Self { Self { - body, + stream, read_all: false, block_size, buf: VecDeque::with_capacity(2 * block_size), } } - async fn next(&mut self) -> Result>, GarageError> { + + async fn next(&mut self) -> Result>, E> { while !self.read_all && self.buf.len() < self.block_size { - if let Some(block) = self.body.next().await { + if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); self.buf.extend(&bytes[..]); @@ -292,6 +297,7 @@ impl BodyChunker { self.read_all = true; } } + if self.buf.is_empty() { Ok(None) } else if self.buf.len() <= self.block_size { @@ -368,12 +374,12 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); + let mut chunker = StreamChunker::new(req.into_body(), garage.config.block_size); let (object, version, first_block) = futures::try_join!( garage.object_table.get(&bucket_id, &key), garage.version_table.get(&version_uuid, &EmptyKey), - chunker.next() + chunker.next().map_err(GarageError::from), )?; // Check object is valid and multipart block can be accepted -- 2.43.0 From f1bfc939aada51f0ab6b62d3c2b9d215e7d734a1 Mon Sep 17 00:00:00 2001 From: Jill Date: Fri, 26 Nov 2021 18:48:43 +0100 Subject: [PATCH 02/11] garage_api: Handle chunked PUT payload --- Cargo.lock | 39 ++++++++++ Cargo.nix | 53 +++++++++++++ src/api/Cargo.toml | 2 + src/api/s3_put.rs | 176 +++++++++++++++++++++++++++++++++++++++++-- src/api/signature.rs | 8 +- 5 files changed, 267 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 033d157f..6f4b6b42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -433,7 +433,9 @@ dependencies = [ "idna", "log", "md-5", + "nom", "percent-encoding", + "pin-project", "quick-xml", "roxmltree", "serde", @@ -967,6 +969,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "mio" version = "0.7.13" @@ -1011,6 +1019,17 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "nom" +version = "7.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109" +dependencies = [ + "memchr", + "minimal-lexical", + "version_check", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -1092,6 +1111,26 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.7" diff --git a/Cargo.nix b/Cargo.nix index 53e93c34..44aa42e8 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -671,7 +671,9 @@ in idna = rustPackages."registry+https://github.com/rust-lang/crates.io-index".idna."0.2.3" { inherit profileName; }; log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; }; md5 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".md-5."0.9.1" { inherit profileName; }; + nom = rustPackages."registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" { inherit profileName; }; percent_encoding = rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.1.0" { inherit profileName; }; + pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" { inherit profileName; }; quick_xml = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quick-xml."0.21.0" { inherit profileName; }; roxmltree = rustPackages."registry+https://github.com/rust-lang/crates.io-index".roxmltree."0.14.1" { inherit profileName; }; serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; }; @@ -1321,6 +1323,16 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" = overridableMkRustCrate (profileName: rec { + name = "minimal-lexical"; + version = "0.2.1"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"; }; + features = builtins.concatLists [ + [ "std" ] + ]; + }); + "registry+https://github.com/rust-lang/crates.io-index".mio."0.7.13" = overridableMkRustCrate (profileName: rec { name = "mio"; version = "0.7.13"; @@ -1381,6 +1393,25 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" = overridableMkRustCrate (profileName: rec { + name = "nom"; + version = "7.1.0"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"; }; + features = builtins.concatLists [ + [ "alloc" ] + [ "default" ] + [ "std" ] + ]; + dependencies = { + memchr = rustPackages."registry+https://github.com/rust-lang/crates.io-index".memchr."2.4.1" { inherit profileName; }; + minimal_lexical = rustPackages."registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" { inherit profileName; }; + }; + buildDependencies = { + version_check = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".version_check."0.9.3" { profileName = "__noProfile"; }; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.6" = overridableMkRustCrate (profileName: rec { name = "ntapi"; version = "0.3.6"; @@ -1490,6 +1521,28 @@ in src = fetchCratesIo { inherit name version; sha256 = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"; }; }); + "registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" = overridableMkRustCrate (profileName: rec { + name = "pin-project"; + version = "1.0.8"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"; }; + dependencies = { + pin_project_internal = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" { profileName = "__noProfile"; }; + }; + }); + + "registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" = overridableMkRustCrate (profileName: rec { + name = "pin-project-internal"; + version = "1.0.8"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"; }; + dependencies = { + proc_macro2 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".proc-macro2."1.0.30" { inherit profileName; }; + quote = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quote."1.0.10" { inherit profileName; }; + syn = rustPackages."registry+https://github.com/rust-lang/crates.io-index".syn."1.0.80" { inherit profileName; }; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.7" = overridableMkRustCrate (profileName: rec { name = "pin-project-lite"; version = "0.2.7"; diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index ca4950a1..e93e5ec5 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -28,10 +28,12 @@ hmac = "0.10" idna = "0.2" log = "0.4" md-5 = "0.9" +nom = "7.1" sha2 = "0.9" futures = "0.3" futures-util = "0.3" +pin-project = "1.0" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } http = "0.2" diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index d4bd454f..956a1989 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,7 +1,9 @@ use std::collections::{BTreeMap, VecDeque}; +use std::pin::Pin; use std::sync::Arc; -use futures::prelude::*; +use futures::task; +use futures::{prelude::*, TryFutureExt}; use hyper::body::{Body, Bytes}; use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; @@ -27,7 +29,7 @@ pub async fn handle_put( req: Request, bucket_id: Uuid, key: &str, - content_sha256: Option, + mut content_sha256: Option, ) -> Result, Error> { // Generate identity of new version let version_uuid = gen_uuid(); @@ -41,10 +43,24 @@ pub async fn handle_put( Some(x) => Some(x.to_str()?.to_string()), None => None, }; + let payload_seed_signature = match req.headers().get("x-amz-content-sha256") { + Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { + let content_sha256 = content_sha256 + .take() + .ok_or_bad_request("No signature provided")?; + Some(content_sha256) + } + _ => None, + }; // Parse body of uploaded file let body = req.into_body(); + let body = match payload_seed_signature { + Some(_) => SignedPayloadChunker::new(body).map_err(Error::from).boxed(), + None => body.map_err(Error::from).boxed(), + }; + let mut chunker = StreamChunker::new(body, garage.config.block_size); let first_block = chunker.next().await?.unwrap_or_default(); @@ -179,7 +195,7 @@ fn ensure_checksum_matches( Ok(()) } -async fn read_and_put_blocks, S: Stream> + Unpin>( +async fn read_and_put_blocks, S: Stream> + Unpin>( garage: &Garage, version: &Version, part_number: u64, @@ -207,9 +223,9 @@ async fn read_and_put_blocks, S: Stream, + } + + impl Header { + pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self> { + use nom::bytes::complete::tag; + use nom::character::complete::hex_digit1; + use nom::combinator::map_res; + use nom::number::complete::hex_u32; + + let (input, size) = hex_u32(input)?; + let (input, _) = tag(";")(input)?; + + let (input, _) = tag("chunk-signature=")(input)?; + let (input, data) = map_res(hex_digit1, hex::decode)(input)?; + + let (input, _) = tag("\r\n")(input)?; + + let header = Header { + size: size as usize, + signature: data.into_boxed_slice(), + }; + + Ok((input, header)) + } + } +} + +enum SignedPayloadChunkerError { + Stream(StreamE), + Message(String), +} + +impl From> for Error +where + StreamE: Into, +{ + fn from(err: SignedPayloadChunkerError) -> Self { + match err { + SignedPayloadChunkerError::Stream(e) => e.into(), + SignedPayloadChunkerError::Message(e) => { + Error::BadRequest(format!("Chunk format error: {}", e)) + } + } + } +} + +impl From> for SignedPayloadChunkerError { + fn from(err: nom::error::Error) -> Self { + Self::Message(err.code.description().into()) + } +} + +#[pin_project::pin_project] +struct SignedPayloadChunker +where + S: Stream>, +{ + #[pin] + stream: S, + buf: bytes::BytesMut, +} + +impl SignedPayloadChunker +where + S: Stream>, +{ + fn new(stream: S) -> Self { + Self { + stream, + buf: bytes::BytesMut::new(), + } + } +} + +impl Stream for SignedPayloadChunker +where + S: Stream> + Unpin, +{ + type Item = Result>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll> { + use std::task::Poll; + + use nom::bytes::complete::{tag, take}; + + let mut this = self.project(); + + macro_rules! parse_try { + ($expr:expr) => { + match $expr { + Ok(value) => value, + Err(nom::Err::Incomplete(_)) => continue, + Err(nom::Err::Error(e @ nom::error::Error { .. })) + | Err(nom::Err::Failure(e)) => return Poll::Ready(Some(Err(e.into()))), + } + }; + } + + loop { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(SignedPayloadChunkerError::Stream(e)))) + } + None => { + if this.buf.is_empty() { + return Poll::Ready(None); + } + } + } + + let input: &[u8] = this.buf; + + let (input, header) = parse_try!(payload::Header::parse(input)); + + // 0-sized chunk is the last + if header.size == 0 { + this.buf.clear(); + return Poll::Ready(None); + } + + let (input, data) = parse_try!(take(header.size)(input)); + let (input, _) = parse_try!(tag("\r\n")(input)); + + let data = Bytes::from(data.to_vec()); + + *this.buf = input.into(); + return Poll::Ready(Some(Ok(data))); + } + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + struct StreamChunker>, E> { stream: S, read_all: bool, @@ -292,7 +454,7 @@ impl> + Unpin, E> StreamChunker { if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); - self.buf.extend(&bytes[..]); + self.buf.extend(bytes); } else { self.read_all = true; } diff --git a/src/api/signature.rs b/src/api/signature.rs index 311e6a9a..1128b36f 100644 --- a/src/api/signature.rs +++ b/src/api/signature.rs @@ -97,13 +97,13 @@ pub async fn check_signature( let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" { None + } else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" { + let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?; + Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?) } else { let bytes = hex::decode(authorization.content_sha256) .ok_or_bad_request("Invalid content sha256 hash")?; - Some( - Hash::try_from(&bytes[..]) - .ok_or_else(|| Error::BadRequest("Invalid content sha256 hash".to_string()))?, - ) + Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?) }; Ok((key, content_sha256)) -- 2.43.0 From 006e5cc231033a2dd5e21f6c76d1df0ad0092584 Mon Sep 17 00:00:00 2001 From: Jill Date: Tue, 14 Dec 2021 19:18:33 +0100 Subject: [PATCH 03/11] garage_api: Validate signature for chunked PUT payload --- src/api/api_server.rs | 6 +- src/api/s3_bucket.rs | 13 +- src/api/s3_delete.rs | 3 + src/api/s3_put.rs | 117 +++++++++++++++--- src/api/s3_website.rs | 3 + src/api/signature/mod.rs | 43 +++++++ .../{signature.rs => signature/payload.rs} | 44 +------ src/api/signature/streaming.rs | 44 +++++++ 8 files changed, 208 insertions(+), 65 deletions(-) create mode 100644 src/api/signature/mod.rs rename src/api/{signature.rs => signature/payload.rs} (84%) create mode 100644 src/api/signature/streaming.rs diff --git a/src/api/api_server.rs b/src/api/api_server.rs index a38a3c5b..81e52aa4 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -15,7 +15,7 @@ use garage_model::garage::Garage; use garage_model::key_table::Key; use crate::error::*; -use crate::signature::check_signature; +use crate::signature::payload::check_payload_signature; use crate::helpers::*; use crate::s3_bucket::*; @@ -90,7 +90,7 @@ async fn handler( } async fn handler_inner(garage: Arc, req: Request) -> Result, Error> { - let (api_key, content_sha256) = check_signature(&garage, &req).await?; + let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?; let authority = req .headers() @@ -176,7 +176,7 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { - handle_put(garage, req, bucket_id, &key, content_sha256).await + handle_put(garage, req, bucket_id, &key, &api_key, content_sha256).await } Endpoint::AbortMultipartUpload { key, upload_id, .. } => { handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await diff --git a/src/api/s3_bucket.rs b/src/api/s3_bucket.rs index 494224c8..8a5407d3 100644 --- a/src/api/s3_bucket.rs +++ b/src/api/s3_bucket.rs @@ -120,7 +120,10 @@ pub async fn handle_create_bucket( bucket_name: String, ) -> Result, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let cmd = parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?; @@ -320,7 +323,7 @@ mod tests { assert_eq!( parse_create_bucket_xml( br#" - + "# ), @@ -329,8 +332,8 @@ mod tests { assert_eq!( parse_create_bucket_xml( br#" - - Europe + + Europe "# ), @@ -339,7 +342,7 @@ mod tests { assert_eq!( parse_create_bucket_xml( br#" - + "# ), diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 9e267490..93271579 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -80,6 +80,9 @@ pub async fn handle_delete_objects( req: Request, content_sha256: Option, ) -> Result, Error> { + let content_sha256 = + content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; + let body = hyper::body::to_bytes(req.into_body()).await?; verify_signed_content(content_sha256, &body[..])?; diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 956a1989..00771638 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, VecDeque}; use std::pin::Pin; use std::sync::Arc; +use chrono::{DateTime, NaiveDateTime, Utc}; use futures::task; use futures::{prelude::*, TryFutureExt}; use hyper::body::{Body, Bytes}; @@ -17,18 +18,21 @@ use garage_util::time::*; use garage_model::block::INLINE_THRESHOLD; use garage_model::block_ref_table::*; use garage_model::garage::Garage; +use garage_model::key_table::Key; use garage_model::object_table::*; use garage_model::version_table::*; use crate::error::*; use crate::s3_xml; -use crate::signature::verify_signed_content; +use crate::signature::LONG_DATETIME; +use crate::signature::{streaming::check_streaming_payload_signature, verify_signed_content}; pub async fn handle_put( garage: Arc, req: Request, bucket_id: Uuid, key: &str, + api_key: &Key, mut content_sha256: Option, ) -> Result, Error> { // Generate identity of new version @@ -54,11 +58,29 @@ pub async fn handle_put( }; // Parse body of uploaded file - let body = req.into_body(); + let (head, body) = req.into_parts(); - let body = match payload_seed_signature { - Some(_) => SignedPayloadChunker::new(body).map_err(Error::from).boxed(), - None => body.map_err(Error::from).boxed(), + let body = if let Some(signature) = payload_seed_signature { + let secret_key = &api_key + .state + .as_option() + .ok_or_internal_error("Deleted key state")? + .secret_key; + + let date = head + .headers + .get("x-amz-date") + .ok_or_bad_request("Missing X-Amz-Date field")? + .to_str()?; + let date: NaiveDateTime = + NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; + let date: DateTime = DateTime::from_utc(date, Utc); + + SignedPayloadChunker::new(body, garage.clone(), date, secret_key, signature) + .map_err(Error::from) + .boxed() + } else { + body.map_err(Error::from).boxed() }; let mut chunker = StreamChunker::new(body, garage.config.block_size); @@ -287,7 +309,8 @@ async fn put_block_meta( } mod payload { - #[derive(Debug)] + use std::fmt; + pub struct Header { pub size: usize, pub signature: Box<[u8]>, @@ -295,10 +318,10 @@ mod payload { impl Header { pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self> { - use nom::bytes::complete::tag; - use nom::character::complete::hex_digit1; + use nom::bytes::streaming::tag; + use nom::character::streaming::hex_digit1; use nom::combinator::map_res; - use nom::number::complete::hex_u32; + use nom::number::streaming::hex_u32; let (input, size) = hex_u32(input)?; let (input, _) = tag(";")(input)?; @@ -316,13 +339,29 @@ mod payload { Ok((input, header)) } } + + impl fmt::Debug for Header { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Header") + .field("size", &self.size) + .field("signature", &hex::encode(&self.signature)) + .finish() + } + } } enum SignedPayloadChunkerError { Stream(StreamE), + InvalidSignature, Message(String), } +impl SignedPayloadChunkerError { + fn message(msg: &str) -> Self { + SignedPayloadChunkerError::Message(msg.into()) + } +} + impl From> for Error where StreamE: Into, @@ -330,6 +369,9 @@ where fn from(err: SignedPayloadChunkerError) -> Self { match err { SignedPayloadChunkerError::Stream(e) => e.into(), + SignedPayloadChunkerError::InvalidSignature => { + Error::BadRequest("Invalid payload signature".into()) + } SignedPayloadChunkerError::Message(e) => { Error::BadRequest(format!("Chunk format error: {}", e)) } @@ -339,7 +381,7 @@ where impl From> for SignedPayloadChunkerError { fn from(err: nom::error::Error) -> Self { - Self::Message(err.code.description().into()) + Self::message(err.code.description()) } } @@ -351,16 +393,30 @@ where #[pin] stream: S, buf: bytes::BytesMut, + garage: Arc, + datetime: DateTime, + secret_key: String, + previous_signature: Hash, } impl SignedPayloadChunker where S: Stream>, { - fn new(stream: S) -> Self { + fn new( + stream: S, + garage: Arc, + datetime: DateTime, + secret_key: &str, + seed_signature: Hash, + ) -> Self { Self { stream, buf: bytes::BytesMut::new(), + garage, + datetime, + secret_key: secret_key.into(), + previous_signature: seed_signature, } } } @@ -377,18 +433,18 @@ where ) -> task::Poll> { use std::task::Poll; - use nom::bytes::complete::{tag, take}; + use nom::bytes::streaming::{tag, take}; let mut this = self.project(); - macro_rules! parse_try { + macro_rules! try_parse { ($expr:expr) => { match $expr { - Ok(value) => value, + Ok(value) => Ok(value), Err(nom::Err::Incomplete(_)) => continue, Err(nom::Err::Error(e @ nom::error::Error { .. })) - | Err(nom::Err::Failure(e)) => return Poll::Ready(Some(Err(e.into()))), - } + | Err(nom::Err::Failure(e)) => Err(e), + }? }; } @@ -409,7 +465,9 @@ where let input: &[u8] = this.buf; - let (input, header) = parse_try!(payload::Header::parse(input)); + let (input, header) = try_parse!(payload::Header::parse(input)); + let signature = Hash::try_from(&*header.signature) + .ok_or_else(|| SignedPayloadChunkerError::message("Invalid signature"))?; // 0-sized chunk is the last if header.size == 0 { @@ -417,12 +475,30 @@ where return Poll::Ready(None); } - let (input, data) = parse_try!(take(header.size)(input)); - let (input, _) = parse_try!(tag("\r\n")(input)); + let (input, data) = try_parse!(take(header.size)(input)); + let (input, _) = try_parse!(tag("\r\n")(input)); let data = Bytes::from(data.to_vec()); + let data_sha256sum = sha256sum(&data); + + let expected_signature = check_streaming_payload_signature( + this.garage, + this.secret_key, + *this.datetime, + *this.previous_signature, + data_sha256sum, + ) + .map_err(|e| { + SignedPayloadChunkerError::Message(format!("Could not build signature: {}", e)) + })?; + + if signature != expected_signature { + return Poll::Ready(Some(Err(SignedPayloadChunkerError::InvalidSignature))); + } *this.buf = input.into(); + *this.previous_signature = signature; + return Poll::Ready(Some(Ok(data))); } } @@ -611,6 +687,9 @@ pub async fn handle_complete_multipart_upload( upload_id: &str, content_sha256: Option, ) -> Result, Error> { + let content_sha256 = + content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; + let body = hyper::body::to_bytes(req.into_body()).await?; verify_signed_content(content_sha256, &body[..])?; diff --git a/src/api/s3_website.rs b/src/api/s3_website.rs index 85d7c261..b662c0b5 100644 --- a/src/api/s3_website.rs +++ b/src/api/s3_website.rs @@ -43,6 +43,9 @@ pub async fn handle_put_website( req: Request, content_sha256: Option, ) -> Result, Error> { + let content_sha256 = + content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; + let body = hyper::body::to_bytes(req.into_body()).await?; verify_signed_content(content_sha256, &body[..])?; diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs new file mode 100644 index 00000000..45981df9 --- /dev/null +++ b/src/api/signature/mod.rs @@ -0,0 +1,43 @@ +use chrono::{DateTime, Utc}; +use hmac::{Hmac, Mac, NewMac}; +use sha2::Sha256; + +use garage_util::data::{sha256sum, Hash}; + +use crate::error::*; + +pub mod payload; +pub mod streaming; + +pub const SHORT_DATE: &str = "%Y%m%d"; +pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; + +type HmacSha256 = Hmac; + +pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> { + if expected_sha256 != sha256sum(body) { + return Err(Error::BadRequest( + "Request content hash does not match signed hash".to_string(), + )); + } + Ok(()) +} + +fn signing_hmac( + datetime: &DateTime, + secret_key: &str, + region: &str, + service: &str, +) -> Result { + let secret = String::from("AWS4") + secret_key; + let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; + date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); + let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; + region_hmac.update(region.as_bytes()); + let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; + service_hmac.update(service.as_bytes()); + let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; + signing_hmac.update(b"aws4_request"); + let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; + Ok(hmac) +} diff --git a/src/api/signature.rs b/src/api/signature/payload.rs similarity index 84% rename from src/api/signature.rs rename to src/api/signature/payload.rs index 1128b36f..b13819a8 100644 --- a/src/api/signature.rs +++ b/src/api/signature/payload.rs @@ -1,25 +1,23 @@ use std::collections::HashMap; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; -use hmac::{Hmac, Mac, NewMac}; +use hmac::Mac; use hyper::{Body, Method, Request}; use sha2::{Digest, Sha256}; use garage_table::*; -use garage_util::data::{sha256sum, Hash}; +use garage_util::data::Hash; use garage_model::garage::Garage; use garage_model::key_table::*; +use super::signing_hmac; +use super::{LONG_DATETIME, SHORT_DATE}; + use crate::encoding::uri_encode; use crate::error::*; -const SHORT_DATE: &str = "%Y%m%d"; -const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; - -type HmacSha256 = Hmac; - -pub async fn check_signature( +pub async fn check_payload_signature( garage: &Garage, request: &Request, ) -> Result<(Key, Option), Error> { @@ -222,25 +220,6 @@ fn string_to_sign(datetime: &DateTime, scope_string: &str, canonical_req: & .join("\n") } -fn signing_hmac( - datetime: &DateTime, - secret_key: &str, - region: &str, - service: &str, -) -> Result { - let secret = String::from("AWS4") + secret_key; - let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; - date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); - let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; - region_hmac.update(region.as_bytes()); - let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; - service_hmac.update(service.as_bytes()); - let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; - signing_hmac.update(b"aws4_request"); - let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; - Ok(hmac) -} - fn canonical_request( method: &Method, url_path: &str, @@ -288,14 +267,3 @@ fn canonical_query_string(uri: &hyper::Uri) -> String { "".to_string() } } - -pub fn verify_signed_content(content_sha256: Option, body: &[u8]) -> Result<(), Error> { - let expected_sha256 = - content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; - if expected_sha256 != sha256sum(body) { - return Err(Error::BadRequest( - "Request content hash does not match signed hash".to_string(), - )); - } - Ok(()) -} diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs new file mode 100644 index 00000000..35c93f5a --- /dev/null +++ b/src/api/signature/streaming.rs @@ -0,0 +1,44 @@ +use chrono::{DateTime, Utc}; + +use garage_model::garage::Garage; +use garage_util::data::Hash; +use hmac::Mac; + +use super::signing_hmac; +use super::{LONG_DATETIME, SHORT_DATE}; + +use crate::error::*; + +/// Result of `sha256("")` +const EMPTY_STRING_HEX_DIGEST: &str = + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; + +pub fn check_streaming_payload_signature( + garage: &Garage, + secret_key: &str, + date: DateTime, + previous_signature: Hash, + content_sha256: Hash, +) -> Result { + let scope = format!( + "{}/{}/s3/aws4_request", + date.format(SHORT_DATE), + garage.config.s3_api.s3_region + ); + + let string_to_sign = [ + "AWS4-HMAC-SHA256-PAYLOAD", + &date.format(LONG_DATETIME).to_string(), + &scope, + &hex::encode(previous_signature), + EMPTY_STRING_HEX_DIGEST, + &hex::encode(content_sha256), + ] + .join("\n"); + + let mut hmac = signing_hmac(&date, secret_key, &garage.config.s3_api.s3_region, "s3") + .ok_or_internal_error("Unable to build signing HMAC")?; + hmac.update(string_to_sign.as_bytes()); + + Hash::try_from(&hmac.finalize().into_bytes()).ok_or_bad_request("Invalid signature") +} -- 2.43.0 From f0cb931a45465bc137fd54e22822e6dc3bf32484 Mon Sep 17 00:00:00 2001 From: Jill Date: Wed, 15 Dec 2021 12:15:30 +0100 Subject: [PATCH 04/11] nix(garage_api): Add missing signature submodule --- default.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/default.nix b/default.nix index faf6f522..108d4804 100644 --- a/default.nix +++ b/default.nix @@ -76,7 +76,7 @@ in let */ ''^(src|tests)'' # fixed default ''.*\.(rs|toml)$'' # fixed default - ''^(crdt|replication|cli|helper)'' # our crate submodules + ''^(crdt|replication|cli|helper|signature)'' # our crate submodules ]; }; -- 2.43.0 From 2b034b7c4eb83a80cbcda9a36008f4d6a1c78abe Mon Sep 17 00:00:00 2001 From: Jill Date: Tue, 4 Jan 2022 13:21:33 +0100 Subject: [PATCH 05/11] garage_api(fixup): Fixups from reviews --- src/api/s3_delete.rs | 8 +- src/api/s3_put.rs | 242 ++++----------------------------- src/api/s3_website.rs | 8 +- src/api/signature/streaming.rs | 233 ++++++++++++++++++++++++++++++- 4 files changed, 260 insertions(+), 231 deletions(-) diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 93271579..b243d982 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -80,11 +80,11 @@ pub async fn handle_delete_objects( req: Request, content_sha256: Option, ) -> Result, Error> { - let content_sha256 = - content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; - let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?; diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 00771638..7ebbdb12 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,9 +1,7 @@ use std::collections::{BTreeMap, VecDeque}; -use std::pin::Pin; use std::sync::Arc; use chrono::{DateTime, NaiveDateTime, Utc}; -use futures::task; use futures::{prelude::*, TryFutureExt}; use hyper::body::{Body, Bytes}; use hyper::{Request, Response}; @@ -24,8 +22,9 @@ use garage_model::version_table::*; use crate::error::*; use crate::s3_xml; +use crate::signature::streaming::SignedPayloadStream; +use crate::signature::verify_signed_content; use crate::signature::LONG_DATETIME; -use crate::signature::{streaming::check_streaming_payload_signature, verify_signed_content}; pub async fn handle_put( garage: Arc, @@ -59,6 +58,7 @@ pub async fn handle_put( // Parse body of uploaded file let (head, body) = req.into_parts(); + let body = body.map_err(Error::from); let body = if let Some(signature) = payload_seed_signature { let secret_key = &api_key @@ -76,11 +76,11 @@ pub async fn handle_put( NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; let date: DateTime = DateTime::from_utc(date, Utc); - SignedPayloadChunker::new(body, garage.clone(), date, secret_key, signature) + SignedPayloadStream::new(body, garage.clone(), date, secret_key, signature)? .map_err(Error::from) .boxed() } else { - body.map_err(Error::from).boxed() + body.boxed() }; let mut chunker = StreamChunker::new(body, garage.config.block_size); @@ -217,13 +217,13 @@ fn ensure_checksum_matches( Ok(()) } -async fn read_and_put_blocks, S: Stream> + Unpin>( +async fn read_and_put_blocks> + Unpin>( garage: &Garage, version: &Version, part_number: u64, first_block: Vec, first_block_hash: Hash, - chunker: &mut StreamChunker, + chunker: &mut StreamChunker, ) -> Result<(u64, GenericArray, Hash), Error> { let mut md5hasher = Md5::new(); let mut sha256hasher = Sha256::new(); @@ -245,9 +245,9 @@ async fn read_and_put_blocks, S: Stream> loop { let (_, _, next_block) = futures::try_join!( - put_curr_block.map_err(Into::into), - put_curr_version_block.map_err(Into::into), - chunker.next().map_err(Into::into) + put_curr_block.map_err(Error::from), + put_curr_version_block.map_err(Error::from), + chunker.next(), )?; if let Some(block) = next_block { md5hasher.update(&block[..]); @@ -308,214 +308,14 @@ async fn put_block_meta( Ok(()) } -mod payload { - use std::fmt; - - pub struct Header { - pub size: usize, - pub signature: Box<[u8]>, - } - - impl Header { - pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self> { - use nom::bytes::streaming::tag; - use nom::character::streaming::hex_digit1; - use nom::combinator::map_res; - use nom::number::streaming::hex_u32; - - let (input, size) = hex_u32(input)?; - let (input, _) = tag(";")(input)?; - - let (input, _) = tag("chunk-signature=")(input)?; - let (input, data) = map_res(hex_digit1, hex::decode)(input)?; - - let (input, _) = tag("\r\n")(input)?; - - let header = Header { - size: size as usize, - signature: data.into_boxed_slice(), - }; - - Ok((input, header)) - } - } - - impl fmt::Debug for Header { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Header") - .field("size", &self.size) - .field("signature", &hex::encode(&self.signature)) - .finish() - } - } -} - -enum SignedPayloadChunkerError { - Stream(StreamE), - InvalidSignature, - Message(String), -} - -impl SignedPayloadChunkerError { - fn message(msg: &str) -> Self { - SignedPayloadChunkerError::Message(msg.into()) - } -} - -impl From> for Error -where - StreamE: Into, -{ - fn from(err: SignedPayloadChunkerError) -> Self { - match err { - SignedPayloadChunkerError::Stream(e) => e.into(), - SignedPayloadChunkerError::InvalidSignature => { - Error::BadRequest("Invalid payload signature".into()) - } - SignedPayloadChunkerError::Message(e) => { - Error::BadRequest(format!("Chunk format error: {}", e)) - } - } - } -} - -impl From> for SignedPayloadChunkerError { - fn from(err: nom::error::Error) -> Self { - Self::message(err.code.description()) - } -} - -#[pin_project::pin_project] -struct SignedPayloadChunker -where - S: Stream>, -{ - #[pin] - stream: S, - buf: bytes::BytesMut, - garage: Arc, - datetime: DateTime, - secret_key: String, - previous_signature: Hash, -} - -impl SignedPayloadChunker -where - S: Stream>, -{ - fn new( - stream: S, - garage: Arc, - datetime: DateTime, - secret_key: &str, - seed_signature: Hash, - ) -> Self { - Self { - stream, - buf: bytes::BytesMut::new(), - garage, - datetime, - secret_key: secret_key.into(), - previous_signature: seed_signature, - } - } -} - -impl Stream for SignedPayloadChunker -where - S: Stream> + Unpin, -{ - type Item = Result>; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> task::Poll> { - use std::task::Poll; - - use nom::bytes::streaming::{tag, take}; - - let mut this = self.project(); - - macro_rules! try_parse { - ($expr:expr) => { - match $expr { - Ok(value) => Ok(value), - Err(nom::Err::Incomplete(_)) => continue, - Err(nom::Err::Error(e @ nom::error::Error { .. })) - | Err(nom::Err::Failure(e)) => Err(e), - }? - }; - } - - loop { - match futures::ready!(this.stream.as_mut().poll_next(cx)) { - Some(Ok(bytes)) => { - this.buf.extend(bytes); - } - Some(Err(e)) => { - return Poll::Ready(Some(Err(SignedPayloadChunkerError::Stream(e)))) - } - None => { - if this.buf.is_empty() { - return Poll::Ready(None); - } - } - } - - let input: &[u8] = this.buf; - - let (input, header) = try_parse!(payload::Header::parse(input)); - let signature = Hash::try_from(&*header.signature) - .ok_or_else(|| SignedPayloadChunkerError::message("Invalid signature"))?; - - // 0-sized chunk is the last - if header.size == 0 { - this.buf.clear(); - return Poll::Ready(None); - } - - let (input, data) = try_parse!(take(header.size)(input)); - let (input, _) = try_parse!(tag("\r\n")(input)); - - let data = Bytes::from(data.to_vec()); - let data_sha256sum = sha256sum(&data); - - let expected_signature = check_streaming_payload_signature( - this.garage, - this.secret_key, - *this.datetime, - *this.previous_signature, - data_sha256sum, - ) - .map_err(|e| { - SignedPayloadChunkerError::Message(format!("Could not build signature: {}", e)) - })?; - - if signature != expected_signature { - return Poll::Ready(Some(Err(SignedPayloadChunkerError::InvalidSignature))); - } - - *this.buf = input.into(); - *this.previous_signature = signature; - - return Poll::Ready(Some(Ok(data))); - } - } - - fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() - } -} - -struct StreamChunker>, E> { +struct StreamChunker>> { stream: S, read_all: bool, block_size: usize, buf: VecDeque, } -impl> + Unpin, E> StreamChunker { +impl> + Unpin> StreamChunker { fn new(stream: S, block_size: usize) -> Self { Self { stream, @@ -525,7 +325,7 @@ impl> + Unpin, E> StreamChunker { } } - async fn next(&mut self) -> Result>, E> { + async fn next(&mut self) -> Result>, Error> { while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.stream.next().await { let bytes = block?; @@ -612,12 +412,20 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let mut chunker = StreamChunker::new(req.into_body(), garage.config.block_size); + + let body = req.into_body().map_err(Error::from); + let mut chunker = StreamChunker::new(body, garage.config.block_size); let (object, version, first_block) = futures::try_join!( - garage.object_table.get(&bucket_id, &key), - garage.version_table.get(&version_uuid, &EmptyKey), - chunker.next().map_err(GarageError::from), + garage + .object_table + .get(&bucket_id, &key) + .map_err(Error::from), + garage + .version_table + .get(&version_uuid, &EmptyKey) + .map_err(Error::from), + chunker.next(), )?; // Check object is valid and multipart block can be accepted diff --git a/src/api/s3_website.rs b/src/api/s3_website.rs index b662c0b5..ab95d0af 100644 --- a/src/api/s3_website.rs +++ b/src/api/s3_website.rs @@ -43,11 +43,11 @@ pub async fn handle_put_website( req: Request, content_sha256: Option, ) -> Result, Error> { - let content_sha256 = - content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; - let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let mut bucket = garage .bucket_table diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index 35c93f5a..00fc5572 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -1,10 +1,17 @@ +use std::pin::Pin; +use std::sync::Arc; + use chrono::{DateTime, Utc}; +use futures::prelude::*; +use futures::task; +use hyper::body::Bytes; use garage_model::garage::Garage; use garage_util::data::Hash; use hmac::Mac; -use super::signing_hmac; +use super::sha256sum; +use super::HmacSha256; use super::{LONG_DATETIME, SHORT_DATE}; use crate::error::*; @@ -13,9 +20,9 @@ use crate::error::*; const EMPTY_STRING_HEX_DIGEST: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; -pub fn check_streaming_payload_signature( +fn compute_streaming_payload_signature( garage: &Garage, - secret_key: &str, + signing_hmac: &HmacSha256, date: DateTime, previous_signature: Hash, content_sha256: Hash, @@ -36,9 +43,223 @@ pub fn check_streaming_payload_signature( ] .join("\n"); - let mut hmac = signing_hmac(&date, secret_key, &garage.config.s3_api.s3_region, "s3") - .ok_or_internal_error("Unable to build signing HMAC")?; + let mut hmac = signing_hmac.clone(); hmac.update(string_to_sign.as_bytes()); - Hash::try_from(&hmac.finalize().into_bytes()).ok_or_bad_request("Invalid signature") + Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature") +} + +mod payload { + use garage_util::data::Hash; + + pub enum Error { + Parser(nom::error::Error), + BadSignature, + } + + impl Error { + pub fn description(&self) -> &str { + match *self { + Error::Parser(ref e) => e.code.description(), + Error::BadSignature => "Bad signature", + } + } + } + + #[derive(Debug, Clone)] + pub struct Header { + pub size: usize, + pub signature: Hash, + } + + impl Header { + pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + use nom::bytes::streaming::tag; + use nom::character::streaming::hex_digit1; + use nom::combinator::map_res; + use nom::number::streaming::hex_u32; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(|e| e.map(Error::Parser))? + }; + } + + let (input, size) = try_parse!(hex_u32(input)); + let (input, _) = try_parse!(tag(";")(input)); + + let (input, _) = try_parse!(tag("chunk-signature=")(input)); + let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); + let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; + + let (input, _) = try_parse!(tag("\r\n")(input)); + + let header = Header { + size: size as usize, + signature, + }; + + Ok((input, header)) + } + } +} + +pub enum SignedPayloadStreamError { + Stream(Error), + InvalidSignature, + Message(String), +} + +impl SignedPayloadStreamError { + fn message(msg: &str) -> Self { + SignedPayloadStreamError::Message(msg.into()) + } +} + +impl From for Error { + fn from(err: SignedPayloadStreamError) -> Self { + match err { + SignedPayloadStreamError::Stream(e) => e, + SignedPayloadStreamError::InvalidSignature => { + Error::BadRequest("Invalid payload signature".into()) + } + SignedPayloadStreamError::Message(e) => { + Error::BadRequest(format!("Chunk format error: {}", e)) + } + } + } +} + +impl From> for SignedPayloadStreamError { + fn from(err: payload::Error) -> Self { + Self::message(err.description()) + } +} + +impl From> for SignedPayloadStreamError { + fn from(err: nom::error::Error) -> Self { + Self::message(err.code.description()) + } +} + +#[pin_project::pin_project] +pub struct SignedPayloadStream +where + S: Stream>, +{ + #[pin] + stream: S, + buf: bytes::BytesMut, + garage: Arc, + datetime: DateTime, + signing_hmac: HmacSha256, + previous_signature: Hash, +} + +impl SignedPayloadStream +where + S: Stream>, +{ + pub fn new( + stream: S, + garage: Arc, + datetime: DateTime, + secret_key: &str, + seed_signature: Hash, + ) -> Result { + let signing_hmac = + super::signing_hmac(&datetime, secret_key, &garage.config.s3_api.s3_region, "s3") + .ok_or_internal_error("Could not compute signing HMAC")?; + + Ok(Self { + stream, + buf: bytes::BytesMut::new(), + garage, + datetime, + signing_hmac, + previous_signature: seed_signature, + }) + } +} + +impl Stream for SignedPayloadStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll> { + use std::task::Poll; + + use nom::bytes::streaming::{tag, take}; + + let mut this = self.project(); + + macro_rules! try_parse { + ($expr:expr) => { + match $expr { + Ok(value) => Ok(value), + Err(nom::Err::Incomplete(_)) => continue, + Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => Err(e), + }? + }; + } + + loop { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + } + Some(Err(e)) => return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))), + None => { + if this.buf.is_empty() { + return Poll::Ready(None); + } + } + } + + let input: &[u8] = this.buf; + + let (input, header) = try_parse!(payload::Header::parse(input)); + + // 0-sized chunk is the last + if header.size == 0 { + this.buf.clear(); + return Poll::Ready(None); + } + + let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); + let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); + + let data = Bytes::from(data.to_vec()); + let data_sha256sum = sha256sum(&data); + + let expected_signature = compute_streaming_payload_signature( + this.garage, + this.signing_hmac, + *this.datetime, + *this.previous_signature, + data_sha256sum, + ) + .map_err(|e| { + SignedPayloadStreamError::Message(format!("Could not build signature: {}", e)) + })?; + + if header.signature != expected_signature { + return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature))); + } + + *this.buf = input.into(); + *this.previous_signature = header.signature; + + return Poll::Ready(Some(Ok(data))); + } + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } } -- 2.43.0 From bf08e62ce081ecd6600a08d4ea4df3f21843a1b9 Mon Sep 17 00:00:00 2001 From: Jill Date: Mon, 10 Jan 2022 17:37:35 +0100 Subject: [PATCH 06/11] garage_api(fixup): Handle interrupted stream (unittest included) --- src/api/s3_put.rs | 13 +++++- src/api/signature/mod.rs | 6 ++- src/api/signature/streaming.rs | 75 +++++++++++++++++++++++++--------- 3 files changed, 72 insertions(+), 22 deletions(-) diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 7ebbdb12..9fb5b826 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -23,8 +23,8 @@ use garage_model::version_table::*; use crate::error::*; use crate::s3_xml; use crate::signature::streaming::SignedPayloadStream; -use crate::signature::verify_signed_content; use crate::signature::LONG_DATETIME; +use crate::signature::{compute_scope, verify_signed_content}; pub async fn handle_put( garage: Arc, @@ -76,7 +76,16 @@ pub async fn handle_put( NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; let date: DateTime = DateTime::from_utc(date, Utc); - SignedPayloadStream::new(body, garage.clone(), date, secret_key, signature)? + let scope = compute_scope(&date, &garage.config.s3_api.s3_region); + let signing_hmac = crate::signature::signing_hmac( + &date, + secret_key, + &garage.config.s3_api.s3_region, + "s3", + ) + .ok_or_internal_error("Unable to build signing HMAC")?; + + SignedPayloadStream::new(body, signing_hmac, date, &scope, signature)? .map_err(Error::from) .boxed() } else { diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs index 45981df9..ebdee6da 100644 --- a/src/api/signature/mod.rs +++ b/src/api/signature/mod.rs @@ -23,7 +23,7 @@ pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), E Ok(()) } -fn signing_hmac( +pub fn signing_hmac( datetime: &DateTime, secret_key: &str, region: &str, @@ -41,3 +41,7 @@ fn signing_hmac( let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; Ok(hmac) } + +pub fn compute_scope(datetime: &DateTime, region: &str) -> String { + format!("{}/{}/s3/aws4_request", datetime.format(SHORT_DATE), region,) +} diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index 00fc5572..a2744316 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -1,18 +1,16 @@ use std::pin::Pin; -use std::sync::Arc; use chrono::{DateTime, Utc}; use futures::prelude::*; use futures::task; use hyper::body::Bytes; -use garage_model::garage::Garage; use garage_util::data::Hash; use hmac::Mac; use super::sha256sum; use super::HmacSha256; -use super::{LONG_DATETIME, SHORT_DATE}; +use super::LONG_DATETIME; use crate::error::*; @@ -21,22 +19,16 @@ const EMPTY_STRING_HEX_DIGEST: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; fn compute_streaming_payload_signature( - garage: &Garage, signing_hmac: &HmacSha256, date: DateTime, + scope: &str, previous_signature: Hash, content_sha256: Hash, ) -> Result { - let scope = format!( - "{}/{}/s3/aws4_request", - date.format(SHORT_DATE), - garage.config.s3_api.s3_region - ); - let string_to_sign = [ "AWS4-HMAC-SHA256-PAYLOAD", &date.format(LONG_DATETIME).to_string(), - &scope, + scope, &hex::encode(previous_signature), EMPTY_STRING_HEX_DIGEST, &hex::encode(content_sha256), @@ -104,6 +96,7 @@ mod payload { } } +#[derive(Debug)] pub enum SignedPayloadStreamError { Stream(Error), InvalidSignature, @@ -150,8 +143,8 @@ where #[pin] stream: S, buf: bytes::BytesMut, - garage: Arc, datetime: DateTime, + scope: String, signing_hmac: HmacSha256, previous_signature: Hash, } @@ -162,20 +155,20 @@ where { pub fn new( stream: S, - garage: Arc, + signing_hmac: HmacSha256, datetime: DateTime, - secret_key: &str, + scope: &str, seed_signature: Hash, ) -> Result { - let signing_hmac = - super::signing_hmac(&datetime, secret_key, &garage.config.s3_api.s3_region, "s3") - .ok_or_internal_error("Could not compute signing HMAC")?; + // let signing_hmac = + // super::signing_hmac(&datetime, secret_key, &garage.config.s3_api.s3_region, "s3") + // .ok_or_internal_error("Could not compute signing HMAC")?; Ok(Self { stream, buf: bytes::BytesMut::new(), - garage, datetime, + scope: scope.into(), signing_hmac, previous_signature: seed_signature, }) @@ -217,6 +210,10 @@ where None => { if this.buf.is_empty() { return Poll::Ready(None); + } else { + return Poll::Ready(Some(Err(SignedPayloadStreamError::message( + "Unexpected EOF", + )))); } } } @@ -238,9 +235,9 @@ where let data_sha256sum = sha256sum(&data); let expected_signature = compute_streaming_payload_signature( - this.garage, this.signing_hmac, *this.datetime, + this.scope, *this.previous_signature, data_sha256sum, ) @@ -263,3 +260,43 @@ where self.stream.size_hint() } } + +#[cfg(test)] +mod tests { + use futures::prelude::*; + + use super::{SignedPayloadStream, SignedPayloadStreamError}; + + #[tokio::test] + async fn test_interrupted_signed_payload_stream() { + use chrono::{DateTime, Utc}; + + use garage_util::data::Hash; + + let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0 + .unwrap() + .with_timezone(&Utc); + let secret_key = "test"; + let region = "test"; + let scope = crate::signature::compute_scope(&datetime, region); + let signing_hmac = + crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap(); + + let data: &[&[u8]] = &[b"1"]; + let body = futures::stream::iter(data.iter().map(|block| Ok(block.as_ref().into()))); + + let seed_signature = Hash::default(); + + let mut stream = + SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature).unwrap(); + + assert!(stream.try_next().await.is_err()); + match stream.try_next().await { + Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {} + item => panic!( + "Unexpected result, expected early EOF error, got {:?}", + item + ), + } + } +} -- 2.43.0 From 561f614b6eb5230ede5bc87734232af749541a62 Mon Sep 17 00:00:00 2001 From: Jill Date: Mon, 10 Jan 2022 17:39:54 +0100 Subject: [PATCH 07/11] ci: Use HTTP endpoint for minio --- script/dev-env-mc.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/script/dev-env-mc.sh b/script/dev-env-mc.sh index 98b63047..3f58b0a8 100644 --- a/script/dev-env-mc.sh +++ b/script/dev-env-mc.sh @@ -8,7 +8,7 @@ cat > /tmp/garage.mc/config.json < Date: Tue, 11 Jan 2022 16:07:36 +0100 Subject: [PATCH 08/11] garage_api(fixup): Fix unexpected EOF error --- src/api/signature/streaming.rs | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index a2744316..12bb80ab 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -192,35 +192,42 @@ where let mut this = self.project(); macro_rules! try_parse { - ($expr:expr) => { + ($eof:expr, $expr:expr) => { match $expr { Ok(value) => Ok(value), - Err(nom::Err::Incomplete(_)) => continue, + Err(nom::Err::Incomplete(_)) => { + if $eof { + return Poll::Ready(Some(Err(SignedPayloadStreamError::message( + "Unexpected EOF", + )))); + } + continue; + } Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => Err(e), }? }; } loop { - match futures::ready!(this.stream.as_mut().poll_next(cx)) { + let eof = match futures::ready!(this.stream.as_mut().poll_next(cx)) { Some(Ok(bytes)) => { + log::debug!("Received: {:?}", bytes); this.buf.extend(bytes); + false } Some(Err(e)) => return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))), None => { + log::debug!("Buf: {:?}", this.buf); if this.buf.is_empty() { return Poll::Ready(None); - } else { - return Poll::Ready(Some(Err(SignedPayloadStreamError::message( - "Unexpected EOF", - )))); } + true } - } + }; let input: &[u8] = this.buf; - let (input, header) = try_parse!(payload::Header::parse(input)); + let (input, header) = try_parse!(eof, payload::Header::parse(input)); // 0-sized chunk is the last if header.size == 0 { @@ -228,8 +235,9 @@ where return Poll::Ready(None); } - let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); - let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); + let (input, data) = + try_parse!(eof, take::<_, _, nom::error::Error<_>>(header.size)(input)); + let (input, _) = try_parse!(eof, tag::<_, _, nom::error::Error<_>>("\r\n")(input)); let data = Bytes::from(data.to_vec()); let data_sha256sum = sha256sum(&data); -- 2.43.0 From e7e8ce73e357df4db839337d77ba3871efce4b25 Mon Sep 17 00:00:00 2001 From: Jill Date: Wed, 12 Jan 2022 15:46:14 +0100 Subject: [PATCH 09/11] garage_api(fixup): Fix unlimited buffering --- src/api/signature/streaming.rs | 105 ++++++++++++++++++--------------- 1 file changed, 59 insertions(+), 46 deletions(-) diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index 12bb80ab..17e0bd77 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -135,6 +135,11 @@ impl From> for SignedPayloadStreamError { } } +struct SignedPayload { + header: payload::Header, + data: Bytes, +} + #[pin_project::pin_project] pub struct SignedPayloadStream where @@ -160,10 +165,6 @@ where scope: &str, seed_signature: Hash, ) -> Result { - // let signing_hmac = - // super::signing_hmac(&datetime, secret_key, &garage.config.s3_api.s3_region, "s3") - // .ok_or_internal_error("Could not compute signing HMAC")?; - Ok(Self { stream, buf: bytes::BytesMut::new(), @@ -173,6 +174,36 @@ where previous_signature: seed_signature, }) } + + fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> { + use nom::bytes::streaming::{tag, take}; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(nom::Err::convert)? + }; + } + + let (input, header) = try_parse!(payload::Header::parse(input)); + + // 0-sized chunk is the last + if header.size == 0 { + return Ok(( + input, + SignedPayload { + header, + data: Bytes::new(), + }, + )); + } + + let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); + let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); + + let data = Bytes::from(data.to_vec()); + + Ok((input, SignedPayload { header, data })) + } } impl Stream for SignedPayloadStream @@ -187,60 +218,42 @@ where ) -> task::Poll> { use std::task::Poll; - use nom::bytes::streaming::{tag, take}; - let mut this = self.project(); - macro_rules! try_parse { - ($eof:expr, $expr:expr) => { - match $expr { - Ok(value) => Ok(value), - Err(nom::Err::Incomplete(_)) => { - if $eof { + loop { + let (input, payload) = match Self::parse_next(this.buf) { + Ok(res) => res, + Err(nom::Err::Incomplete(_)) => { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + continue; + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))) + } + None => { + if this.buf.is_empty() { + return Poll::Ready(None); + } + return Poll::Ready(Some(Err(SignedPayloadStreamError::message( "Unexpected EOF", )))); } - continue; } - Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => Err(e), - }? - }; - } - - loop { - let eof = match futures::ready!(this.stream.as_mut().poll_next(cx)) { - Some(Ok(bytes)) => { - log::debug!("Received: {:?}", bytes); - this.buf.extend(bytes); - false } - Some(Err(e)) => return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))), - None => { - log::debug!("Buf: {:?}", this.buf); - if this.buf.is_empty() { - return Poll::Ready(None); - } - true + Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { + return Poll::Ready(Some(Err(e))) } }; - let input: &[u8] = this.buf; - - let (input, header) = try_parse!(eof, payload::Header::parse(input)); - // 0-sized chunk is the last - if header.size == 0 { - this.buf.clear(); + if payload.data.is_empty() { return Poll::Ready(None); } - let (input, data) = - try_parse!(eof, take::<_, _, nom::error::Error<_>>(header.size)(input)); - let (input, _) = try_parse!(eof, tag::<_, _, nom::error::Error<_>>("\r\n")(input)); - - let data = Bytes::from(data.to_vec()); - let data_sha256sum = sha256sum(&data); + let data_sha256sum = sha256sum(&payload.data); let expected_signature = compute_streaming_payload_signature( this.signing_hmac, @@ -253,14 +266,14 @@ where SignedPayloadStreamError::Message(format!("Could not build signature: {}", e)) })?; - if header.signature != expected_signature { + if payload.header.signature != expected_signature { return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature))); } *this.buf = input.into(); - *this.previous_signature = header.signature; + *this.previous_signature = payload.header.signature; - return Poll::Ready(Some(Ok(data))); + return Poll::Ready(Some(Ok(payload.data))); } } -- 2.43.0 From b2eda2c13ec54a6b1dc772a3b30fd7b590c8e90a Mon Sep 17 00:00:00 2001 From: Jill Date: Thu, 13 Jan 2022 16:45:23 +0100 Subject: [PATCH 10/11] garage_api(fixup): Fix stream early ending edge-case --- src/api/signature/streaming.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index 17e0bd77..b2dc1591 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -233,10 +233,6 @@ where return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))) } None => { - if this.buf.is_empty() { - return Poll::Ready(None); - } - return Poll::Ready(Some(Err(SignedPayloadStreamError::message( "Unexpected EOF", )))); -- 2.43.0 From 11a1f3f6cfb23254e5bd58c0752c2d042bb94cee Mon Sep 17 00:00:00 2001 From: Jill Date: Fri, 14 Jan 2022 14:09:18 +0100 Subject: [PATCH 11/11] garage_api(fixup): Verify Content-SHA256 header for multipart upload only when needed. --- src/api/s3_put.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 9fb5b826..4e85664b 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -504,11 +504,11 @@ pub async fn handle_complete_multipart_upload( upload_id: &str, content_sha256: Option, ) -> Result, Error> { - let content_sha256 = - content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; - let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml) -- 2.43.0