use std::pin::Pin; use std::task::{self, Poll}; use bytes::BytesMut; use futures::prelude::*; use futures::stream::BoxStream; use imap_codec::types::core::Tag; use crate::proto::res::body::Data; use crate::proto::{Request, Response}; use crate::util::stream::ConcatAll; type Error = tower::BoxError; type Result = std::result::Result; #[pin_project::pin_project] pub struct Connection { #[pin] pub conn: C, read_buf: BytesMut, #[pin] outbox: ConcatAll>, write_buf: BytesMut, close: bool, } impl Connection { pub fn new(conn: C) -> Self { Self { conn, read_buf: BytesMut::with_capacity(1024), outbox: ConcatAll::new(), write_buf: BytesMut::new(), close: false, } } } impl Stream for Connection where C: AsyncRead + Unpin, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { use imap_codec::parse::command::command as parse_command; let mut this = self.project(); loop { let (input, command) = match parse_command(this.read_buf) { Ok(res) => res, Err(e) if e.is_incomplete() => { let mut buf = [0u8; 256]; tracing::trace!("transport.poll_read"); // let read = futures::ready!(this.conn.as_mut().poll_read(cx, &mut buf))?; let read = match this.conn.as_mut().poll_read(cx, &mut buf) { Poll::Ready(res) => res?, Poll::Pending => { tracing::trace!("transport.pending"); return Poll::Pending; } }; tracing::trace!(read = read, "transport.poll_next"); if read == 0 { return Poll::Ready(None); } let data = &buf[..read]; this.read_buf.extend(data); continue; } Err(e) => { return Poll::Ready(Some(Err(format!("Error: {:?}", e).into()))); } }; tracing::debug!(command = ?command, "transport.recv"); *this.read_buf = input.into(); let req = Request { command }; return Poll::Ready(Some(Ok(req))); } } } impl Connection where C: AsyncWrite, { fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { use bytes::{Buf, BufMut}; use imap_codec::codec::Encode; let mut this = self.project(); tracing::debug!(size = this.outbox.len(), "transport.flush_outbox"); let mut writer = this.write_buf.writer(); while let Poll::Ready(Some(data)) = this.outbox.as_mut().poll_next(cx) { tracing::trace!(?data, "transport.write_buf"); if let Data::Close = data { *this.close = true; } if let Err(err) = data.encode(&mut writer) { tracing::error!(?err, "transport.encode_error"); return Poll::Ready(Err(Box::new(err))); } } tracing::debug!(size = this.write_buf.len(), "transport.flush_buffer"); while !this.write_buf.is_empty() { let written = futures::ready!(this.conn.as_mut().poll_write(cx, this.write_buf))?; this.write_buf.advance(written); } this.write_buf.clear(); if *this.close { futures::ready!(this.conn.as_mut().poll_close(cx))?; } Poll::Ready(Ok(())) } } impl Sink<(Option, Response)> for Connection where C: AsyncWrite + Unpin, { type Error = Error; fn poll_ready( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll> { futures::ready!(self.poll_flush_buffer(cx))?; Poll::Ready(Ok(())) } fn start_send( mut self: Pin<&mut Self>, (tag, res): (Option, Response), ) -> Result<(), Self::Error> { use crate::proto::res::stream::response_stream; tracing::debug!(?tag, ?res, "transport.start_send"); self.outbox.push(Box::pin(response_stream(res, tag))); Ok(()) } fn poll_flush( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll> { futures::ready!(self.as_mut().poll_flush_buffer(cx))?; futures::ready!(self.project().conn.poll_flush(cx))?; Poll::Ready(Ok(())) } fn poll_close( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll> { futures::ready!(self.as_mut().poll_flush_buffer(cx))?; futures::ready!(self.project().conn.poll_close(cx))?; Poll::Ready(Ok(())) } } impl Sink for Connection where Self: Sink<(Option, Response)>, { type Error = as Sink<(Option, Response)>>::Error; fn poll_ready( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll> { , Response)>>::poll_ready(self, cx) } fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> { , Response)>>::start_send(self, (None, item)) } fn poll_flush( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll> { , Response)>>::poll_flush(self, cx) } fn poll_close( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll> { , Response)>>::poll_close(self, cx) } }