diff --git a/Cargo.toml b/Cargo.toml index f87fbc2..6117541 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] bytes = "1.1" -miette = "4.7" +miette = "5.1" thiserror = "1.0" tracing = "0.1" @@ -18,6 +18,7 @@ imap-codec = "0.5" # Async async-compat = "0.2" +async-stream = "0.3" futures = "0.3" pin-project = "1.0" @@ -26,7 +27,7 @@ tokio-tower = "0.6" tower = { version = "0.4", features = ["full"] } [dev-dependencies] -miette = { version = "4.7", features = ["fancy"] } +miette = { version = "5.1", features = ["fancy"] } tracing-subscriber = "0.3" console-subscriber = "0.1" diff --git a/examples/simple.rs b/examples/simple.rs index c23f2a2..5bdd45c 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -4,13 +4,16 @@ use boitalettres::proto::{Request, Response}; use boitalettres::server::accept::addr::{AddrIncoming, AddrStream}; use boitalettres::server::Server; -async fn handle_req(req: Request) -> Result { - use imap_codec::types::response::{Capability, Data}; +async fn handle_req(_req: Request) -> Result { + use imap_codec::types::response::{Capability, Data as ImapData}; - tracing::debug!("Got request: {:#?}", req); + use boitalettres::proto::res::{body::Data, Status}; let capabilities = vec![Capability::Imap4Rev1, Capability::Idle]; - let body = vec![Data::Capability(capabilities)]; + let body: Vec = vec![ + Status::ok("Yeah")?.into(), + ImapData::Capability(capabilities).into(), + ]; Ok(Response::ok("Done")?.with_body(body)) } diff --git a/src/lib.rs b/src/lib.rs index d8c4fb1..96e8fcf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ pub mod errors; pub mod proto; pub mod server; +mod util; diff --git a/src/proto/req/mod.rs b/src/proto/req/mod.rs index 0f814d5..86838e6 100644 --- a/src/proto/req/mod.rs +++ b/src/proto/req/mod.rs @@ -1,5 +1,6 @@ use imap_codec::types::command::Command; +#[derive(Debug)] pub struct Request { pub command: Command, } diff --git a/src/proto/res/body.rs b/src/proto/res/body.rs index 6a3c589..048a90c 100644 --- a/src/proto/res/body.rs +++ b/src/proto/res/body.rs @@ -1,17 +1,28 @@ -use std::io; +use std::{fmt, io}; +use futures::prelude::*; +use futures::stream::BoxStream; use imap_codec::codec::Encode; use imap_codec::types::response::{Data as ImapData, Status as ImapStatus}; -#[derive(Debug)] +use super::Status; + pub enum Body { Once(Vec), + Stream(BoxStream<'static, Data>), } impl Body { - pub(crate) fn into_data(self) -> Vec { + pub fn from_stream + Send + 'static>(stream: St) -> Self { + Body::Stream(stream.boxed()) + } +} + +impl Body { + pub(crate) fn into_stream(self) -> BoxStream<'static, Data> { match self { - Body::Once(data) => data, + Body::Once(data) => futures::stream::iter(data).boxed(), + Body::Stream(stream) => stream, } } } @@ -46,6 +57,15 @@ impl From for Body { } } +impl fmt::Debug for Body { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Body::Once(ref data) => f.debug_struct("Body::Once").field("data", data).finish(), + Body::Stream(_) => f.debug_struct("Body::Stream").finish_non_exhaustive(), + } + } +} + #[derive(Debug, Clone)] pub enum Data { Data(ImapData), @@ -72,3 +92,9 @@ impl From for Data { Data::Status(status) } } + +impl From for Data { + fn from(status: Status) -> Self { + status.into_imap(None).into() + } +} diff --git a/src/proto/res/mod.rs b/src/proto/res/mod.rs index 96b92ee..e970d51 100644 --- a/src/proto/res/mod.rs +++ b/src/proto/res/mod.rs @@ -7,6 +7,7 @@ use self::body::Body; use crate::errors::{Error, Result}; pub mod body; +pub(crate) mod stream; #[derive(Debug)] pub struct Response { @@ -51,6 +52,12 @@ impl Response { } } +impl Response { + pub fn split(self) -> (Option, Status) { + (self.body, self.status) + } +} + #[derive(Debug, Clone)] pub struct Status { pub(crate) code: StatusCode, @@ -76,6 +83,22 @@ impl Status { }) } + pub fn ok(msg: &str) -> Result { + Self::new(StatusCode::Ok, msg) + } + + pub fn no(msg: &str) -> Result { + Self::new(StatusCode::No, msg) + } + + pub fn bad(msg: &str) -> Result { + Self::new(StatusCode::Bad, msg) + } + + pub fn bye(msg: &str) -> Result { + Self::new(StatusCode::Bye, msg) + } + pub(crate) fn into_imap(self, tag: Option) -> ImapStatus { match self.code { StatusCode::Ok => ImapStatus::Ok { diff --git a/src/proto/res/stream.rs b/src/proto/res/stream.rs new file mode 100644 index 0000000..99a4f7a --- /dev/null +++ b/src/proto/res/stream.rs @@ -0,0 +1,23 @@ +use futures::prelude::*; +use imap_codec::types::core::Tag; + +use super::body::Data; +use super::Response; + +pub fn response_stream(res: Response, tag: Option) -> impl Stream { + let (body, status) = res.split(); + let body = body.map(|body| body.into_stream()); + + async_stream::stream! { + if let Some(body) = body { + for await item in body { + tracing::trace!(?item, "response_stream.yield"); + yield item; + } + } + + let item = status.into_imap(tag); + tracing::trace!(?item, "response_stream.yield"); + yield item.into(); + } +} diff --git a/src/server/pipeline.rs b/src/server/pipeline.rs index 3942bce..15ceab8 100644 --- a/src/server/pipeline.rs +++ b/src/server/pipeline.rs @@ -2,12 +2,13 @@ use std::pin::Pin; use std::task::{self, Poll}; use bytes::BytesMut; -use futures::io::{AsyncRead, AsyncWrite}; -use futures::sink::Sink; -use futures::stream::Stream; +use futures::prelude::*; +use futures::stream::BoxStream; use imap_codec::types::core::Tag; +use crate::proto::res::body::Data; use crate::proto::{Request, Response}; +use crate::util::stream::ConcatAll; type Error = tower::BoxError; type Result = std::result::Result; @@ -15,8 +16,12 @@ type Result = std::result::Result; #[pin_project::pin_project] pub struct Connection { #[pin] - conn: C, + pub conn: C, + read_buf: BytesMut, + + #[pin] + outbox: ConcatAll>, write_buf: BytesMut, } @@ -24,7 +29,10 @@ impl Connection { pub fn new(conn: C) -> Self { Self { conn, - read_buf: BytesMut::new(), + + read_buf: BytesMut::with_capacity(1024), + + outbox: ConcatAll::new(), write_buf: BytesMut::new(), } } @@ -47,7 +55,15 @@ where Err(e) if e.is_incomplete() => { let mut buf = [0u8; 256]; - let read = futures::ready!(this.conn.as_mut().poll_read(cx, &mut buf))?; + tracing::trace!("transport.poll_read"); + // let read = futures::ready!(this.conn.as_mut().poll_read(cx, &mut buf))?; + let read = match this.conn.as_mut().poll_read(cx, &mut buf) { + Poll::Ready(res) => res?, + Poll::Pending => { + tracing::trace!("transport.pending"); + return Poll::Pending; + } + }; tracing::trace!(read = read, "transport.poll_next"); if read == 0 { @@ -78,15 +94,29 @@ where C: AsyncWrite, { fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { - use bytes::Buf; + use bytes::{Buf, BufMut}; + use imap_codec::codec::Encode; let mut this = self.project(); + tracing::debug!(size = this.outbox.len(), "transport.flush_outbox"); + let mut writer = this.write_buf.writer(); + + while let Poll::Ready(Some(data)) = this.outbox.as_mut().poll_next(cx) { + tracing::trace!(?data, "transport.write_buf"); + + if let Err(err) = data.encode(&mut writer) { + tracing::error!(?err, "transport.encode_error"); + return Poll::Ready(Err(Box::new(err))); + } + } + tracing::debug!(size = this.write_buf.len(), "transport.flush_buffer"); while !this.write_buf.is_empty() { let written = futures::ready!(this.conn.as_mut().poll_write(cx, this.write_buf))?; this.write_buf.advance(written); } + this.write_buf.clear(); Poll::Ready(Ok(())) } @@ -107,23 +137,13 @@ where } fn start_send( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, (tag, res): (Option, Response), ) -> Result<(), Self::Error> { - use bytes::BufMut; - use imap_codec::codec::Encode; + use crate::proto::res::stream::response_stream; - debug_assert!(self.write_buf.is_empty()); - - let write_buf = &mut self.get_mut().write_buf; - let mut writer = write_buf.writer(); - - let body = res.body.into_iter().flat_map(|body| body.into_data()); - for data in body { - data.encode(&mut writer)?; - } - - res.status.into_imap(tag).encode(&mut writer)?; + tracing::debug!(?tag, ?res, "transport.start_send"); + self.outbox.push(Box::pin(response_stream(res, tag))); Ok(()) } diff --git a/src/util/buf.rs b/src/util/buf.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/util/mod.rs b/src/util/mod.rs new file mode 100644 index 0000000..c05035d --- /dev/null +++ b/src/util/mod.rs @@ -0,0 +1,2 @@ +pub mod buf; +pub mod stream; diff --git a/src/util/stream.rs b/src/util/stream.rs new file mode 100644 index 0000000..5932a81 --- /dev/null +++ b/src/util/stream.rs @@ -0,0 +1,54 @@ +use std::fmt; +use std::pin::Pin; +use std::task::{self, Poll}; + +use futures::prelude::*; +use futures::stream::{FuturesOrdered, Stream, StreamFuture}; + +/// [`SelectAll`](futures::stream::SelectAll) but ordered +#[pin_project::pin_project] +pub struct ConcatAll { + #[pin] + inner: FuturesOrdered>, +} + +impl ConcatAll { + pub fn new() -> Self { + Self { + inner: FuturesOrdered::new(), + } + } + + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn push(&mut self, stream: St) { + use futures::StreamExt; + + self.inner.push(stream.into_future()); + } +} + +impl fmt::Debug for ConcatAll { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ConcatAll").finish() + } +} + +impl Stream for ConcatAll { + type Item = St::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + loop { + match futures::ready!(self.inner.poll_next_unpin(cx)) { + Some((Some(item), remaining)) => { + self.push(remaining); + return Poll::Ready(Some(item)); + } + Some((None, _)) => {} + _ => return Poll::Ready(None), + } + } + } +}