diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index a2744316..12bb80ab 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -192,35 +192,42 @@ where let mut this = self.project(); macro_rules! try_parse { - ($expr:expr) => { + ($eof:expr, $expr:expr) => { match $expr { Ok(value) => Ok(value), - Err(nom::Err::Incomplete(_)) => continue, + Err(nom::Err::Incomplete(_)) => { + if $eof { + return Poll::Ready(Some(Err(SignedPayloadStreamError::message( + "Unexpected EOF", + )))); + } + continue; + } Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => Err(e), }? }; } loop { - match futures::ready!(this.stream.as_mut().poll_next(cx)) { + let eof = match futures::ready!(this.stream.as_mut().poll_next(cx)) { Some(Ok(bytes)) => { + log::debug!("Received: {:?}", bytes); this.buf.extend(bytes); + false } Some(Err(e)) => return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))), None => { + log::debug!("Buf: {:?}", this.buf); if this.buf.is_empty() { return Poll::Ready(None); - } else { - return Poll::Ready(Some(Err(SignedPayloadStreamError::message( - "Unexpected EOF", - )))); } + true } - } + }; let input: &[u8] = this.buf; - let (input, header) = try_parse!(payload::Header::parse(input)); + let (input, header) = try_parse!(eof, payload::Header::parse(input)); // 0-sized chunk is the last if header.size == 0 { @@ -228,8 +235,9 @@ where return Poll::Ready(None); } - let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); - let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); + let (input, data) = + try_parse!(eof, take::<_, _, nom::error::Error<_>>(header.size)(input)); + let (input, _) = try_parse!(eof, tag::<_, _, nom::error::Error<_>>("\r\n")(input)); let data = Bytes::from(data.to_vec()); let data_sha256sum = sha256sum(&data);