garage_api(fixup): Fix unlimited buffering

This commit is contained in:
Jill 2022-01-12 15:46:14 +01:00
parent 847b5ad407
commit e7e8ce73e3
Signed by: KokaKiwi
GPG key ID: 09A5A2688F13FAC1

View file

@ -135,6 +135,11 @@ impl<I> From<nom::error::Error<I>> for SignedPayloadStreamError {
} }
} }
struct SignedPayload {
header: payload::Header,
data: Bytes,
}
#[pin_project::pin_project] #[pin_project::pin_project]
pub struct SignedPayloadStream<S> pub struct SignedPayloadStream<S>
where where
@ -160,10 +165,6 @@ where
scope: &str, scope: &str,
seed_signature: Hash, seed_signature: Hash,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
// let signing_hmac =
// super::signing_hmac(&datetime, secret_key, &garage.config.s3_api.s3_region, "s3")
// .ok_or_internal_error("Could not compute signing HMAC")?;
Ok(Self { Ok(Self {
stream, stream,
buf: bytes::BytesMut::new(), buf: bytes::BytesMut::new(),
@ -173,6 +174,36 @@ where
previous_signature: seed_signature, previous_signature: seed_signature,
}) })
} }
fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> {
use nom::bytes::streaming::{tag, take};
macro_rules! try_parse {
($expr:expr) => {
$expr.map_err(nom::Err::convert)?
};
}
let (input, header) = try_parse!(payload::Header::parse(input));
// 0-sized chunk is the last
if header.size == 0 {
return Ok((
input,
SignedPayload {
header,
data: Bytes::new(),
},
));
}
let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input));
let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input));
let data = Bytes::from(data.to_vec());
Ok((input, SignedPayload { header, data }))
}
} }
impl<S> Stream for SignedPayloadStream<S> impl<S> Stream for SignedPayloadStream<S>
@ -187,60 +218,42 @@ where
) -> task::Poll<Option<Self::Item>> { ) -> task::Poll<Option<Self::Item>> {
use std::task::Poll; use std::task::Poll;
use nom::bytes::streaming::{tag, take};
let mut this = self.project(); let mut this = self.project();
macro_rules! try_parse { loop {
($eof:expr, $expr:expr) => { let (input, payload) = match Self::parse_next(this.buf) {
match $expr { Ok(res) => res,
Ok(value) => Ok(value),
Err(nom::Err::Incomplete(_)) => { Err(nom::Err::Incomplete(_)) => {
if $eof { match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
this.buf.extend(bytes);
continue;
}
Some(Err(e)) => {
return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e))))
}
None => {
if this.buf.is_empty() {
return Poll::Ready(None);
}
return Poll::Ready(Some(Err(SignedPayloadStreamError::message( return Poll::Ready(Some(Err(SignedPayloadStreamError::message(
"Unexpected EOF", "Unexpected EOF",
)))); ))));
} }
continue;
} }
Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => Err(e),
}?
};
} }
Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => {
loop { return Poll::Ready(Some(Err(e)))
let eof = match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
log::debug!("Received: {:?}", bytes);
this.buf.extend(bytes);
false
}
Some(Err(e)) => return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))),
None => {
log::debug!("Buf: {:?}", this.buf);
if this.buf.is_empty() {
return Poll::Ready(None);
}
true
} }
}; };
let input: &[u8] = this.buf;
let (input, header) = try_parse!(eof, payload::Header::parse(input));
// 0-sized chunk is the last // 0-sized chunk is the last
if header.size == 0 { if payload.data.is_empty() {
this.buf.clear();
return Poll::Ready(None); return Poll::Ready(None);
} }
let (input, data) = let data_sha256sum = sha256sum(&payload.data);
try_parse!(eof, take::<_, _, nom::error::Error<_>>(header.size)(input));
let (input, _) = try_parse!(eof, tag::<_, _, nom::error::Error<_>>("\r\n")(input));
let data = Bytes::from(data.to_vec());
let data_sha256sum = sha256sum(&data);
let expected_signature = compute_streaming_payload_signature( let expected_signature = compute_streaming_payload_signature(
this.signing_hmac, this.signing_hmac,
@ -253,14 +266,14 @@ where
SignedPayloadStreamError::Message(format!("Could not build signature: {}", e)) SignedPayloadStreamError::Message(format!("Could not build signature: {}", e))
})?; })?;
if header.signature != expected_signature { if payload.header.signature != expected_signature {
return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature))); return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature)));
} }
*this.buf = input.into(); *this.buf = input.into();
*this.previous_signature = header.signature; *this.previous_signature = payload.header.signature;
return Poll::Ready(Some(Ok(data))); return Poll::Ready(Some(Ok(payload.data)));
} }
} }