diff --git a/Cargo.lock b/Cargo.lock index 033d157f..6f4b6b42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -433,7 +433,9 @@ dependencies = [ "idna", "log", "md-5", + "nom", "percent-encoding", + "pin-project", "quick-xml", "roxmltree", "serde", @@ -967,6 +969,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "mio" version = "0.7.13" @@ -1011,6 +1019,17 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "nom" +version = "7.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109" +dependencies = [ + "memchr", + "minimal-lexical", + "version_check", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -1092,6 +1111,26 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.7" diff --git a/Cargo.nix b/Cargo.nix index 53e93c34..44aa42e8 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -671,7 +671,9 @@ in idna = rustPackages."registry+https://github.com/rust-lang/crates.io-index".idna."0.2.3" { inherit profileName; }; log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; }; md5 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".md-5."0.9.1" { inherit profileName; }; + nom = rustPackages."registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" { inherit profileName; }; percent_encoding = rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.1.0" { inherit profileName; }; + pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" { inherit profileName; }; quick_xml = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quick-xml."0.21.0" { inherit profileName; }; roxmltree = rustPackages."registry+https://github.com/rust-lang/crates.io-index".roxmltree."0.14.1" { inherit profileName; }; serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; }; @@ -1321,6 +1323,16 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" = overridableMkRustCrate (profileName: rec { + name = "minimal-lexical"; + version = "0.2.1"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"; }; + features = builtins.concatLists [ + [ "std" ] + ]; + }); + "registry+https://github.com/rust-lang/crates.io-index".mio."0.7.13" = overridableMkRustCrate (profileName: rec { name = "mio"; version = "0.7.13"; @@ -1381,6 +1393,25 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" = overridableMkRustCrate (profileName: rec { + name = "nom"; + version = "7.1.0"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"; }; + features = builtins.concatLists [ + [ "alloc" ] + [ "default" ] + [ "std" ] + ]; + dependencies = { + memchr = rustPackages."registry+https://github.com/rust-lang/crates.io-index".memchr."2.4.1" { inherit profileName; }; + minimal_lexical = rustPackages."registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" { inherit profileName; }; + }; + buildDependencies = { + version_check = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".version_check."0.9.3" { profileName = "__noProfile"; }; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.6" = overridableMkRustCrate (profileName: rec { name = "ntapi"; version = "0.3.6"; @@ -1490,6 +1521,28 @@ in src = fetchCratesIo { inherit name version; sha256 = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"; }; }); + "registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" = overridableMkRustCrate (profileName: rec { + name = "pin-project"; + version = "1.0.8"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"; }; + dependencies = { + pin_project_internal = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" { profileName = "__noProfile"; }; + }; + }); + + "registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" = overridableMkRustCrate (profileName: rec { + name = "pin-project-internal"; + version = "1.0.8"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"; }; + dependencies = { + proc_macro2 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".proc-macro2."1.0.30" { inherit profileName; }; + quote = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quote."1.0.10" { inherit profileName; }; + syn = rustPackages."registry+https://github.com/rust-lang/crates.io-index".syn."1.0.80" { inherit profileName; }; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.7" = overridableMkRustCrate (profileName: rec { name = "pin-project-lite"; version = "0.2.7"; diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index ca4950a1..e93e5ec5 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -28,10 +28,12 @@ hmac = "0.10" idna = "0.2" log = "0.4" md-5 = "0.9" +nom = "7.1" sha2 = "0.9" futures = "0.3" futures-util = "0.3" +pin-project = "1.0" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } http = "0.2" diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index d4bd454f..956a1989 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,7 +1,9 @@ use std::collections::{BTreeMap, VecDeque}; +use std::pin::Pin; use std::sync::Arc; -use futures::prelude::*; +use futures::task; +use futures::{prelude::*, TryFutureExt}; use hyper::body::{Body, Bytes}; use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; @@ -27,7 +29,7 @@ pub async fn handle_put( req: Request, bucket_id: Uuid, key: &str, - content_sha256: Option, + mut content_sha256: Option, ) -> Result, Error> { // Generate identity of new version let version_uuid = gen_uuid(); @@ -41,10 +43,24 @@ pub async fn handle_put( Some(x) => Some(x.to_str()?.to_string()), None => None, }; + let payload_seed_signature = match req.headers().get("x-amz-content-sha256") { + Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { + let content_sha256 = content_sha256 + .take() + .ok_or_bad_request("No signature provided")?; + Some(content_sha256) + } + _ => None, + }; // Parse body of uploaded file let body = req.into_body(); + let body = match payload_seed_signature { + Some(_) => SignedPayloadChunker::new(body).map_err(Error::from).boxed(), + None => body.map_err(Error::from).boxed(), + }; + let mut chunker = StreamChunker::new(body, garage.config.block_size); let first_block = chunker.next().await?.unwrap_or_default(); @@ -179,7 +195,7 @@ fn ensure_checksum_matches( Ok(()) } -async fn read_and_put_blocks, S: Stream> + Unpin>( +async fn read_and_put_blocks, S: Stream> + Unpin>( garage: &Garage, version: &Version, part_number: u64, @@ -207,9 +223,9 @@ async fn read_and_put_blocks, S: Stream, + } + + impl Header { + pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self> { + use nom::bytes::complete::tag; + use nom::character::complete::hex_digit1; + use nom::combinator::map_res; + use nom::number::complete::hex_u32; + + let (input, size) = hex_u32(input)?; + let (input, _) = tag(";")(input)?; + + let (input, _) = tag("chunk-signature=")(input)?; + let (input, data) = map_res(hex_digit1, hex::decode)(input)?; + + let (input, _) = tag("\r\n")(input)?; + + let header = Header { + size: size as usize, + signature: data.into_boxed_slice(), + }; + + Ok((input, header)) + } + } +} + +enum SignedPayloadChunkerError { + Stream(StreamE), + Message(String), +} + +impl From> for Error +where + StreamE: Into, +{ + fn from(err: SignedPayloadChunkerError) -> Self { + match err { + SignedPayloadChunkerError::Stream(e) => e.into(), + SignedPayloadChunkerError::Message(e) => { + Error::BadRequest(format!("Chunk format error: {}", e)) + } + } + } +} + +impl From> for SignedPayloadChunkerError { + fn from(err: nom::error::Error) -> Self { + Self::Message(err.code.description().into()) + } +} + +#[pin_project::pin_project] +struct SignedPayloadChunker +where + S: Stream>, +{ + #[pin] + stream: S, + buf: bytes::BytesMut, +} + +impl SignedPayloadChunker +where + S: Stream>, +{ + fn new(stream: S) -> Self { + Self { + stream, + buf: bytes::BytesMut::new(), + } + } +} + +impl Stream for SignedPayloadChunker +where + S: Stream> + Unpin, +{ + type Item = Result>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll> { + use std::task::Poll; + + use nom::bytes::complete::{tag, take}; + + let mut this = self.project(); + + macro_rules! parse_try { + ($expr:expr) => { + match $expr { + Ok(value) => value, + Err(nom::Err::Incomplete(_)) => continue, + Err(nom::Err::Error(e @ nom::error::Error { .. })) + | Err(nom::Err::Failure(e)) => return Poll::Ready(Some(Err(e.into()))), + } + }; + } + + loop { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(SignedPayloadChunkerError::Stream(e)))) + } + None => { + if this.buf.is_empty() { + return Poll::Ready(None); + } + } + } + + let input: &[u8] = this.buf; + + let (input, header) = parse_try!(payload::Header::parse(input)); + + // 0-sized chunk is the last + if header.size == 0 { + this.buf.clear(); + return Poll::Ready(None); + } + + let (input, data) = parse_try!(take(header.size)(input)); + let (input, _) = parse_try!(tag("\r\n")(input)); + + let data = Bytes::from(data.to_vec()); + + *this.buf = input.into(); + return Poll::Ready(Some(Ok(data))); + } + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + struct StreamChunker>, E> { stream: S, read_all: bool, @@ -292,7 +454,7 @@ impl> + Unpin, E> StreamChunker { if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); - self.buf.extend(&bytes[..]); + self.buf.extend(bytes); } else { self.read_all = true; } diff --git a/src/api/signature.rs b/src/api/signature.rs index 311e6a9a..1128b36f 100644 --- a/src/api/signature.rs +++ b/src/api/signature.rs @@ -97,13 +97,13 @@ pub async fn check_signature( let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" { None + } else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" { + let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?; + Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?) } else { let bytes = hex::decode(authorization.content_sha256) .ok_or_bad_request("Invalid content sha256 hash")?; - Some( - Hash::try_from(&bytes[..]) - .ok_or_else(|| Error::BadRequest("Invalid content sha256 hash".to_string()))?, - ) + Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?) }; Ok((key, content_sha256))