diff --git a/src/message.rs b/src/message.rs index 56e6e8e..629992d 100644 --- a/src/message.rs +++ b/src/message.rs @@ -313,14 +313,13 @@ impl ReqEnc { buf.put(&self.telemetry_id[..]); buf.put_u32(self.msg.len() as u32); - buf.put(&self.msg[..]); let header = buf.freeze(); if let Some(stream) = self.stream { - Box::pin(futures::stream::once(async move { Ok(header) }).chain(stream)) + Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]).chain(stream)) } else { - Box::pin(futures::stream::once(async move { Ok(header) })) + Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)])) } } @@ -387,16 +386,14 @@ impl RespEnc { let mut buf = BytesMut::with_capacity(msg.len() + 8); buf.put_u8(0); - buf.put_u32(msg.len() as u32); - buf.put(&msg[..]); let header = buf.freeze(); if let Some(stream) = stream { - Box::pin(futures::stream::once(async move { Ok(header) }).chain(stream)) + Box::pin(futures::stream::iter([Ok(header), Ok(msg)]).chain(stream)) } else { - Box::pin(futures::stream::once(async move { Ok(header) })) + Box::pin(futures::stream::iter([Ok(header), Ok(msg)])) } } RespEnc::Error { code, message } => {