diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs index 98079ffb..b8a5e66d 100644 --- a/src/api/common/signature/streaming.rs +++ b/src/api/common/signature/streaming.rs @@ -25,53 +25,60 @@ pub fn parse_streaming_body( ) -> Result, Error> { match checked_signature.content_sha256_header { ContentSha256Header::StreamingPayload { signed, trailer } => { - if trailer { + if !signed && !trailer { return Err(Error::bad_request( - "STREAMING-*-TRAILER is not supported by Garage", - )); - } - if !signed { - return Err(Error::bad_request( - "STREAMING-UNSIGNED-PAYLOAD-* is not supported by Garage", + "STREAMING-UNSIGNED-PAYLOAD is not a valid combination", )); } - let signature = checked_signature - .signature_header - .clone() - .ok_or_bad_request("No signature provided")?; - let signature = hex::decode(signature) - .ok() - .and_then(|bytes| Hash::try_from(&bytes)) - .ok_or_bad_request("Invalid signature")?; + let sign_params = if signed { + let signature = checked_signature + .signature_header + .clone() + .ok_or_bad_request("No signature provided")?; + let signature = hex::decode(signature) + .ok() + .and_then(|bytes| Hash::try_from(&bytes)) + .ok_or_bad_request("Invalid signature")?; - let secret_key = checked_signature - .key - .as_ref() - .ok_or_bad_request("Cannot sign streaming payload without signing key")? - .state - .as_option() - .ok_or_internal_error("Deleted key state")? - .secret_key - .to_string(); + let secret_key = checked_signature + .key + .as_ref() + .ok_or_bad_request("Cannot sign streaming payload without signing key")? + .state + .as_option() + .ok_or_internal_error("Deleted key state")? + .secret_key + .to_string(); - let date = req - .headers() - .get(X_AMZ_DATE) - .ok_or_bad_request("Missing X-Amz-Date field")? - .to_str()?; - let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) - .ok_or_bad_request("Invalid date")?; - let date: DateTime = Utc.from_utc_datetime(&date); + let date = req + .headers() + .get(X_AMZ_DATE) + .ok_or_bad_request("Missing X-Amz-Date field")? + .to_str()?; + let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) + .ok_or_bad_request("Invalid date")?; + let date: DateTime = Utc.from_utc_datetime(&date); - let scope = compute_scope(&date, region, service); - let signing_hmac = crate::signature::signing_hmac(&date, &secret_key, region, service) - .ok_or_internal_error("Unable to build signing HMAC")?; + let scope = compute_scope(&date, region, service); + let signing_hmac = + crate::signature::signing_hmac(&date, &secret_key, region, service) + .ok_or_internal_error("Unable to build signing HMAC")?; + + Some(SignParams { + datetime: date, + scope, + signing_hmac, + previous_signature: signature, + }) + } else { + None + }; Ok(req.map(move |body| { let stream = body_stream::<_, Error>(body); let signed_payload_stream = - SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature) + StreamingPayloadStream::new(stream, sign_params, trailer) .map(|x| x.map(hyper::body::Frame::data)) .map_err(Error::from); ReqBody::new(StreamBody::new(signed_payload_stream)) @@ -87,7 +94,7 @@ fn compute_streaming_payload_signature( scope: &str, previous_signature: Hash, content_sha256: Hash, -) -> Result { +) -> Result { let string_to_sign = [ AWS4_HMAC_SHA256_PAYLOAD, &date.format(LONG_DATETIME).to_string(), @@ -101,12 +108,47 @@ fn compute_streaming_payload_signature( let mut hmac = signing_hmac.clone(); hmac.update(string_to_sign.as_bytes()); - Ok(Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")?) + Hash::try_from(&hmac.finalize().into_bytes()) + .ok_or_else(|| StreamingPayloadError::Message("Could not build signature".into())) +} + +fn compute_streaming_trailer_signature( + signing_hmac: &HmacSha256, + date: DateTime, + scope: &str, + previous_signature: Hash, + trailer_sha256: Hash, +) -> Result { + let string_to_sign = [ + AWS4_HMAC_SHA256_PAYLOAD, + &date.format(LONG_DATETIME).to_string(), + scope, + &hex::encode(previous_signature), + &hex::encode(trailer_sha256), + ] + .join("\n"); + + let mut hmac = signing_hmac.clone(); + hmac.update(string_to_sign.as_bytes()); + + Hash::try_from(&hmac.finalize().into_bytes()) + .ok_or_else(|| StreamingPayloadError::Message("Could not build signature".into())) } mod payload { use garage_util::data::Hash; + use nom::bytes::streaming::{tag, take_while}; + use nom::character::streaming::hex_digit1; + use nom::combinator::map_res; + use nom::number::streaming::hex_u32; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(|e| e.map(Error::Parser))? + }; + } + pub enum Error { Parser(nom::error::Error), BadSignature, @@ -122,24 +164,13 @@ mod payload { } #[derive(Debug, Clone)] - pub struct Header { + pub struct ChunkHeader { pub size: usize, - pub signature: Hash, + pub signature: Option, } - impl Header { - pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { - use nom::bytes::streaming::tag; - use nom::character::streaming::hex_digit1; - use nom::combinator::map_res; - use nom::number::streaming::hex_u32; - - macro_rules! try_parse { - ($expr:expr) => { - $expr.map_err(|e| e.map(Error::Parser))? - }; - } - + impl ChunkHeader { + pub fn parse_signed(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { let (input, size) = try_parse!(hex_u32(input)); let (input, _) = try_parse!(tag(";")(input)); @@ -149,96 +180,165 @@ mod payload { let (input, _) = try_parse!(tag("\r\n")(input)); - let header = Header { + let header = ChunkHeader { size: size as usize, - signature, + signature: Some(signature), + }; + + Ok((input, header)) + } + + pub fn parse_unsigned(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, size) = try_parse!(hex_u32(input)); + let (input, _) = try_parse!(tag("\r\n")(input)); + + let header = ChunkHeader { + size: size as usize, + signature: None, }; Ok((input, header)) } } + + #[derive(Debug, Clone)] + pub struct TrailerChunk { + pub header_name: Vec, + pub header_value: Vec, + pub signature: Option, + } + + impl TrailerChunk { + fn parse_content(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, header_name) = try_parse!(take_while( + |c: u8| c.is_ascii_alphanumeric() || c == b'-' + )(input)); + let (input, _) = try_parse!(tag(b":")(input)); + let (input, header_value) = try_parse!(take_while( + |c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c) + )(input)); + let (input, _) = try_parse!(tag(b"\n")(input)); + + Ok(( + input, + TrailerChunk { + header_name: header_name.to_vec(), + header_value: header_value.to_vec(), + signature: None, + }, + )) + } + pub fn parse_signed(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, trailer) = Self::parse_content(input)?; + + let (input, _) = try_parse!(tag(b"\r\n\r\n")(input)); + + Ok((input, trailer)) + } + pub fn parse_unsigned(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + let (input, trailer) = Self::parse_content(input)?; + + let (input, _) = try_parse!(tag(b"\r\n")(input)); + + let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); + let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; + let (input, _) = try_parse!(tag(b"\r\n")(input)); + + Ok(( + input, + TrailerChunk { + signature: Some(signature), + ..trailer + }, + )) + } + } } #[derive(Debug)] -pub enum SignedPayloadStreamError { +pub enum StreamingPayloadError { Stream(Error), InvalidSignature, Message(String), } -impl SignedPayloadStreamError { +impl StreamingPayloadError { fn message(msg: &str) -> Self { - SignedPayloadStreamError::Message(msg.into()) + StreamingPayloadError::Message(msg.into()) } } -impl From for Error { - fn from(err: SignedPayloadStreamError) -> Self { +impl From for Error { + fn from(err: StreamingPayloadError) -> Self { match err { - SignedPayloadStreamError::Stream(e) => e, - SignedPayloadStreamError::InvalidSignature => { + StreamingPayloadError::Stream(e) => e, + StreamingPayloadError::InvalidSignature => { Error::bad_request("Invalid payload signature") } - SignedPayloadStreamError::Message(e) => { + StreamingPayloadError::Message(e) => { Error::bad_request(format!("Chunk format error: {}", e)) } } } } -impl From> for SignedPayloadStreamError { +impl From> for StreamingPayloadError { fn from(err: payload::Error) -> Self { Self::message(err.description()) } } -impl From> for SignedPayloadStreamError { +impl From> for StreamingPayloadError { fn from(err: nom::error::Error) -> Self { Self::message(err.code.description()) } } -struct SignedPayload { - header: payload::Header, - data: Bytes, +enum StreamingPayloadChunk { + Chunk { + header: payload::ChunkHeader, + data: Bytes, + }, + Trailer(payload::TrailerChunk), } -#[pin_project::pin_project] -pub struct SignedPayloadStream -where - S: Stream>, -{ - #[pin] - stream: S, - buf: bytes::BytesMut, +struct SignParams { datetime: DateTime, scope: String, signing_hmac: HmacSha256, previous_signature: Hash, } -impl SignedPayloadStream +#[pin_project::pin_project] +pub struct StreamingPayloadStream where S: Stream>, { - pub fn new( - stream: S, - signing_hmac: HmacSha256, - datetime: DateTime, - scope: &str, - seed_signature: Hash, - ) -> Self { + #[pin] + stream: S, + buf: bytes::BytesMut, + signing: Option, + has_trailer: bool, +} + +impl StreamingPayloadStream +where + S: Stream>, +{ + fn new(stream: S, signing: Option, has_trailer: bool) -> Self { Self { stream, buf: bytes::BytesMut::new(), - datetime, - scope: scope.into(), - signing_hmac, - previous_signature: seed_signature, + signing, + has_trailer, } } - fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> { + fn parse_next( + input: &[u8], + is_signed: bool, + has_trailer: bool, + ) -> nom::IResult<&[u8], StreamingPayloadChunk, StreamingPayloadError> { use nom::bytes::streaming::{tag, take}; macro_rules! try_parse { @@ -247,17 +347,30 @@ where }; } - let (input, header) = try_parse!(payload::Header::parse(input)); + let (input, header) = if is_signed { + try_parse!(payload::ChunkHeader::parse_signed(input)) + } else { + try_parse!(payload::ChunkHeader::parse_unsigned(input)) + }; // 0-sized chunk is the last if header.size == 0 { - return Ok(( - input, - SignedPayload { - header, - data: Bytes::new(), - }, - )); + if has_trailer { + let (input, trailer) = if is_signed { + try_parse!(payload::TrailerChunk::parse_signed(input)) + } else { + try_parse!(payload::TrailerChunk::parse_unsigned(input)) + }; + return Ok((input, StreamingPayloadChunk::Trailer(trailer))); + } else { + return Ok(( + input, + StreamingPayloadChunk::Chunk { + header, + data: Bytes::new(), + }, + )); + } } let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); @@ -265,15 +378,15 @@ where let data = Bytes::from(data.to_vec()); - Ok((input, SignedPayload { header, data })) + Ok((input, StreamingPayloadChunk::Chunk { header, data })) } } -impl Stream for SignedPayloadStream +impl Stream for StreamingPayloadStream where S: Stream> + Unpin, { - type Item = Result; + type Item = Result; fn poll_next( self: Pin<&mut Self>, @@ -284,55 +397,92 @@ where let mut this = self.project(); loop { - let (input, payload) = match Self::parse_next(this.buf) { - Ok(res) => res, - Err(nom::Err::Incomplete(_)) => { - match futures::ready!(this.stream.as_mut().poll_next(cx)) { - Some(Ok(bytes)) => { - this.buf.extend(bytes); - continue; - } - Some(Err(e)) => { - return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))) - } - None => { - return Poll::Ready(Some(Err(SignedPayloadStreamError::message( - "Unexpected EOF", - )))); + let (input, payload) = + match Self::parse_next(this.buf, this.signing.is_some(), *this.has_trailer) { + Ok(res) => res, + Err(nom::Err::Incomplete(_)) => { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + continue; + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(StreamingPayloadError::Stream(e)))) + } + None => { + return Poll::Ready(Some(Err(StreamingPayloadError::message( + "Unexpected EOF", + )))); + } } } + Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { + return Poll::Ready(Some(Err(e))) + } + }; + + match payload { + StreamingPayloadChunk::Chunk { data, header } => { + if let Some(signing) = this.signing.as_mut() { + let data_sha256sum = sha256sum(&data); + + let expected_signature = compute_streaming_payload_signature( + &signing.signing_hmac, + signing.datetime, + &signing.scope, + signing.previous_signature, + data_sha256sum, + )?; + + if header.signature.unwrap() != expected_signature { + return Poll::Ready(Some(Err(StreamingPayloadError::InvalidSignature))); + } + + signing.previous_signature = header.signature.unwrap(); + } + + *this.buf = input.into(); + + // 0-sized chunk is the last + if data.is_empty() { + // if there was a trailer, it would have been returned by the parser + assert!(!*this.has_trailer); + return Poll::Ready(None); + } + + return Poll::Ready(Some(Ok(data))); } - Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { - return Poll::Ready(Some(Err(e))) + StreamingPayloadChunk::Trailer(trailer) => { + if let Some(signing) = this.signing.as_mut() { + let data = [ + &trailer.header_name[..], + &b":"[..], + &trailer.header_value[..], + &b"\n"[..], + ] + .concat(); + let trailer_sha256sum = sha256sum(&data); + + let expected_signature = compute_streaming_trailer_signature( + &signing.signing_hmac, + signing.datetime, + &signing.scope, + signing.previous_signature, + trailer_sha256sum, + )?; + + if trailer.signature.unwrap() != expected_signature { + return Poll::Ready(Some(Err(StreamingPayloadError::InvalidSignature))); + } + } + + *this.buf = input.into(); + + // TODO: handle trailer + + return Poll::Ready(None); } - }; - - // 0-sized chunk is the last - if payload.data.is_empty() { - return Poll::Ready(None); } - - let data_sha256sum = sha256sum(&payload.data); - - let expected_signature = compute_streaming_payload_signature( - this.signing_hmac, - *this.datetime, - this.scope, - *this.previous_signature, - data_sha256sum, - ) - .map_err(|e| { - SignedPayloadStreamError::Message(format!("Could not build signature: {}", e)) - })?; - - if payload.header.signature != expected_signature { - return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature))); - } - - *this.buf = input.into(); - *this.previous_signature = payload.header.signature; - - return Poll::Ready(Some(Ok(payload.data))); } } @@ -345,7 +495,7 @@ where mod tests { use futures::prelude::*; - use super::{SignedPayloadStream, SignedPayloadStreamError}; + use super::{SignParams, StreamingPayloadError, StreamingPayloadStream}; #[tokio::test] async fn test_interrupted_signed_payload_stream() { @@ -367,12 +517,20 @@ mod tests { let seed_signature = Hash::default(); - let mut stream = - SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature); + let mut stream = StreamingPayloadStream::new( + body, + Some(SignParams { + signing_hmac, + datetime, + scope, + previous_signature: seed_signature, + }), + false, + ); assert!(stream.try_next().await.is_err()); match stream.try_next().await { - Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {} + Err(StreamingPayloadError::Message(msg)) if msg == "Unexpected EOF" => {} item => panic!( "Unexpected result, expected early EOF error, got {:?}", item