use std::pin::Pin; use std::task::{self, Poll}; use bytes::BytesMut; use futures::io::{AsyncRead, AsyncWrite}; use futures::sink::Sink; use futures::stream::Stream; use imap_codec::types::core::Tag; use crate::proto::{Request, Response}; type Error = tower::BoxError; type Result = std::result::Result; #[pin_project::pin_project] pub struct Connection { #[pin] conn: C, read_buf: BytesMut, write_buf: BytesMut, } impl Connection { pub fn new(conn: C) -> Self { Self { conn, read_buf: BytesMut::new(), write_buf: BytesMut::new(), } } } impl Stream for Connection where C: AsyncRead + Unpin, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { use imap_codec::parse::command::command as parse_command; let mut this = self.project(); loop { let (input, command) = match parse_command(this.read_buf) { Ok(res) => res, Err(e) if e.is_incomplete() => { let mut buf = [0u8; 256]; let read = futures::ready!(this.conn.as_mut().poll_read(cx, &mut buf))?; tracing::trace!(read = read, "transport.poll_next"); if read == 0 { return Poll::Ready(None); } let data = &buf[..read]; this.read_buf.extend(data); continue; } Err(e) => { return Poll::Ready(Some(Err(format!("Error: {:?}", e).into()))); } }; tracing::debug!(command = ?command, "transport.recv"); *this.read_buf = input.into(); let req = command; return Poll::Ready(Some(Ok(req))); } } } impl Connection where C: AsyncWrite, { fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { use bytes::Buf; let mut this = self.project(); tracing::debug!(size = this.write_buf.len(), "transport.flush_buffer"); while !this.write_buf.is_empty() { let written = futures::ready!(this.conn.as_mut().poll_write(cx, this.write_buf))?; this.write_buf.advance(written); } Poll::Ready(Ok(())) } } impl Sink<(Option, Response)> for Connection where C: AsyncWrite + Unpin, { type Error = Error; fn poll_ready( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll> { futures::ready!(self.poll_flush_buffer(cx))?; Poll::Ready(Ok(())) } fn start_send( self: Pin<&mut Self>, (tag, res): (Option, Response), ) -> Result<(), Self::Error> { use bytes::BufMut; use imap_codec::codec::Encode; debug_assert!(self.write_buf.is_empty()); let write_buf = &mut self.get_mut().write_buf; let mut writer = write_buf.writer(); let body = res.body.into_iter().flat_map(|body| body.into_data()); for data in body { data.encode(&mut writer)?; } res.status.into_imap(tag).encode(&mut writer)?; Ok(()) } fn poll_flush( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll> { futures::ready!(self.as_mut().poll_flush_buffer(cx))?; futures::ready!(self.project().conn.poll_flush(cx))?; Poll::Ready(Ok(())) } fn poll_close( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll> { futures::ready!(self.as_mut().poll_flush_buffer(cx))?; futures::ready!(self.project().conn.poll_close(cx))?; Poll::Ready(Ok(())) } }