From e7e8ce73e357df4db839337d77ba3871efce4b25 Mon Sep 17 00:00:00 2001 From: Jill Date: Wed, 12 Jan 2022 15:46:14 +0100 Subject: [PATCH] garage_api(fixup): Fix unlimited buffering --- src/api/signature/streaming.rs | 105 ++++++++++++++++++--------------- 1 file changed, 59 insertions(+), 46 deletions(-) diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index 12bb80ab..17e0bd77 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -135,6 +135,11 @@ impl From> for SignedPayloadStreamError { } } +struct SignedPayload { + header: payload::Header, + data: Bytes, +} + #[pin_project::pin_project] pub struct SignedPayloadStream where @@ -160,10 +165,6 @@ where scope: &str, seed_signature: Hash, ) -> Result { - // let signing_hmac = - // super::signing_hmac(&datetime, secret_key, &garage.config.s3_api.s3_region, "s3") - // .ok_or_internal_error("Could not compute signing HMAC")?; - Ok(Self { stream, buf: bytes::BytesMut::new(), @@ -173,6 +174,36 @@ where previous_signature: seed_signature, }) } + + fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> { + use nom::bytes::streaming::{tag, take}; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(nom::Err::convert)? + }; + } + + let (input, header) = try_parse!(payload::Header::parse(input)); + + // 0-sized chunk is the last + if header.size == 0 { + return Ok(( + input, + SignedPayload { + header, + data: Bytes::new(), + }, + )); + } + + let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); + let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); + + let data = Bytes::from(data.to_vec()); + + Ok((input, SignedPayload { header, data })) + } } impl Stream for SignedPayloadStream @@ -187,60 +218,42 @@ where ) -> task::Poll> { use std::task::Poll; - use nom::bytes::streaming::{tag, take}; - let mut this = self.project(); - macro_rules! try_parse { - ($eof:expr, $expr:expr) => { - match $expr { - Ok(value) => Ok(value), - Err(nom::Err::Incomplete(_)) => { - if $eof { + loop { + let (input, payload) = match Self::parse_next(this.buf) { + Ok(res) => res, + Err(nom::Err::Incomplete(_)) => { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + continue; + } + Some(Err(e)) => { + return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))) + } + None => { + if this.buf.is_empty() { + return Poll::Ready(None); + } + return Poll::Ready(Some(Err(SignedPayloadStreamError::message( "Unexpected EOF", )))); } - continue; } - Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => Err(e), - }? - }; - } - - loop { - let eof = match futures::ready!(this.stream.as_mut().poll_next(cx)) { - Some(Ok(bytes)) => { - log::debug!("Received: {:?}", bytes); - this.buf.extend(bytes); - false } - Some(Err(e)) => return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))), - None => { - log::debug!("Buf: {:?}", this.buf); - if this.buf.is_empty() { - return Poll::Ready(None); - } - true + Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => { + return Poll::Ready(Some(Err(e))) } }; - let input: &[u8] = this.buf; - - let (input, header) = try_parse!(eof, payload::Header::parse(input)); - // 0-sized chunk is the last - if header.size == 0 { - this.buf.clear(); + if payload.data.is_empty() { return Poll::Ready(None); } - let (input, data) = - try_parse!(eof, take::<_, _, nom::error::Error<_>>(header.size)(input)); - let (input, _) = try_parse!(eof, tag::<_, _, nom::error::Error<_>>("\r\n")(input)); - - let data = Bytes::from(data.to_vec()); - let data_sha256sum = sha256sum(&data); + let data_sha256sum = sha256sum(&payload.data); let expected_signature = compute_streaming_payload_signature( this.signing_hmac, @@ -253,14 +266,14 @@ where SignedPayloadStreamError::Message(format!("Could not build signature: {}", e)) })?; - if header.signature != expected_signature { + if payload.header.signature != expected_signature { return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature))); } *this.buf = input.into(); - *this.previous_signature = header.signature; + *this.previous_signature = payload.header.signature; - return Poll::Ready(Some(Ok(data))); + return Poll::Ready(Some(Ok(payload.data))); } }