garage_api(fixup): Fix unlimited buffering
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Jill 2022-01-12 15:46:14 +01:00
parent 7e18d2543e
commit 4c57fb892b
Signed by: KokaKiwi
GPG Key ID: 09A5A2688F13FAC1
1 changed files with 59 additions and 46 deletions

View File

@ -135,6 +135,11 @@ impl<I> From<nom::error::Error<I>> for SignedPayloadStreamError {
}
}
struct SignedPayload {
header: payload::Header,
data: Bytes,
}
#[pin_project::pin_project]
pub struct SignedPayloadStream<S>
where
@ -160,10 +165,6 @@ where
scope: &str,
seed_signature: Hash,
) -> Result<Self, Error> {
// let signing_hmac =
// super::signing_hmac(&datetime, secret_key, &garage.config.s3_api.s3_region, "s3")
// .ok_or_internal_error("Could not compute signing HMAC")?;
Ok(Self {
stream,
buf: bytes::BytesMut::new(),
@ -173,6 +174,36 @@ where
previous_signature: seed_signature,
})
}
fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> {
use nom::bytes::streaming::{tag, take};
macro_rules! try_parse {
($expr:expr) => {
$expr.map_err(nom::Err::convert)?
};
}
let (input, header) = try_parse!(payload::Header::parse(input));
// 0-sized chunk is the last
if header.size == 0 {
return Ok((
input,
SignedPayload {
header,
data: Bytes::new(),
},
));
}
let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input));
let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input));
let data = Bytes::from(data.to_vec());
Ok((input, SignedPayload { header, data }))
}
}
impl<S> Stream for SignedPayloadStream<S>
@ -187,60 +218,42 @@ where
) -> task::Poll<Option<Self::Item>> {
use std::task::Poll;
use nom::bytes::streaming::{tag, take};
let mut this = self.project();
macro_rules! try_parse {
($eof:expr, $expr:expr) => {
match $expr {
Ok(value) => Ok(value),
Err(nom::Err::Incomplete(_)) => {
if $eof {
loop {
let (input, payload) = match Self::parse_next(this.buf) {
Ok(res) => res,
Err(nom::Err::Incomplete(_)) => {
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
this.buf.extend(bytes);
continue;
}
Some(Err(e)) => {
return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e))))
}
None => {
if this.buf.is_empty() {
return Poll::Ready(None);
}
return Poll::Ready(Some(Err(SignedPayloadStreamError::message(
"Unexpected EOF",
))));
}
continue;
}
Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => Err(e),
}?
};
}
loop {
let eof = match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
log::debug!("Received: {:?}", bytes);
this.buf.extend(bytes);
false
}
Some(Err(e)) => return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))),
None => {
log::debug!("Buf: {:?}", this.buf);
if this.buf.is_empty() {
return Poll::Ready(None);
}
true
Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => {
return Poll::Ready(Some(Err(e)))
}
};
let input: &[u8] = this.buf;
let (input, header) = try_parse!(eof, payload::Header::parse(input));
// 0-sized chunk is the last
if header.size == 0 {
this.buf.clear();
if payload.data.is_empty() {
return Poll::Ready(None);
}
let (input, data) =
try_parse!(eof, take::<_, _, nom::error::Error<_>>(header.size)(input));
let (input, _) = try_parse!(eof, tag::<_, _, nom::error::Error<_>>("\r\n")(input));
let data = Bytes::from(data.to_vec());
let data_sha256sum = sha256sum(&data);
let data_sha256sum = sha256sum(&payload.data);
let expected_signature = compute_streaming_payload_signature(
this.signing_hmac,
@ -253,14 +266,14 @@ where
SignedPayloadStreamError::Message(format!("Could not build signature: {}", e))
})?;
if header.signature != expected_signature {
if payload.header.signature != expected_signature {
return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature)));
}
*this.buf = input.into();
*this.previous_signature = header.signature;
*this.previous_signature = payload.header.signature;
return Poll::Ready(Some(Ok(data)));
return Poll::Ready(Some(Ok(payload.data)));
}
}