diff --git a/src/message.rs b/src/message.rs index 1834f28..ec9433a 100644 --- a/src/message.rs +++ b/src/message.rs @@ -454,6 +454,8 @@ impl RespEnc { let msg_len = reader.read_u32().await?; let msg = reader.read_exact(msg_len as usize).await?; + reader.fill_buffer().await; + Ok(Self { msg, stream: Some(reader.into_stream()), diff --git a/src/stream.rs b/src/stream.rs index efa0ebc..05ee051 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -95,6 +95,26 @@ impl ByteStreamReader { fn try_get(&mut self, read_len: usize) -> Option { self.buf.take_exact(read_len) } + + fn add_stream_next(&mut self, packet: Option) { + match packet { + Some(Ok(slice)) => { + self.buf.extend(slice); + } + Some(Err(e)) => { + self.err = Some(e); + self.eos = true; + } + None => { + self.eos = true; + } + } + } + + pub async fn fill_buffer(&mut self) { + let packet = self.stream.next().await; + self.add_stream_next(packet); + } } pub enum ReadExactError { @@ -132,18 +152,8 @@ impl<'a> Future for ByteStreamReadExact<'a> { } } - match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) { - Some(Ok(slice)) => { - this.reader.buf.extend(slice); - } - Some(Err(e)) => { - this.reader.err = Some(e); - this.reader.eos = true; - } - None => { - this.reader.eos = true; - } - } + let next_packet = futures::ready!(this.reader.stream.as_mut().poll_next(cx)); + this.reader.add_stream_next(next_packet); } } }