use std::future::Future; use std::pin::Pin; use std::task::{self, Poll}; use futures::io::{AsyncRead, AsyncWrite}; use imap_codec::types::core::Tag; use tokio_tower::pipeline::Server as PipelineServer; use tower::Service; use super::pipeline::Connection; use super::Imap; use crate::proto::{Request, Response}; pub struct Connecting where C: AsyncRead + AsyncWrite + Unpin, S: Service, S::Future: Send + 'static, { pub state: Option>, pub protocol: Imap, } pub enum ConnectingState where C: AsyncRead + AsyncWrite + Unpin, S: Service, S::Future: Send + 'static, { Waiting { conn: Connection, service_fut: F, }, Ready { conn: Connection, service: S, }, Serving { server: PipelineServer, PipelineService>, }, Finished, } impl ConnectingState where C: AsyncRead + AsyncWrite + Unpin, F: Future> + Unpin, ME: std::fmt::Display, S: Service, S::Future: Send + 'static, S::Error: std::fmt::Display, { fn poll_new_state(self, cx: &mut task::Context) -> (Self, Option>) { match self { ConnectingState::Waiting { conn, mut service_fut, } => { let service = match Pin::new(&mut service_fut).poll(cx) { Poll::Ready(Ok(service)) => service, Poll::Ready(Err(err)) => { tracing::error!("Connection error: {}", err); return ( ConnectingState::Waiting { conn, service_fut }, Some(Poll::Ready(())), ); } Poll::Pending => { return ( ConnectingState::Waiting { conn, service_fut }, Some(Poll::Pending), ) } }; let mut conn = conn; // TODO: Properly handle server greeting { use futures::SinkExt; let greeting = Response::ok("Hello").unwrap(); // "Hello" is a valid // greeting conn.start_send_unpin((None, greeting)).unwrap(); } (ConnectingState::Ready { conn, service }, None) } ConnectingState::Ready { conn, service } => ( ConnectingState::Serving { server: PipelineServer::new(conn, PipelineService { inner: service }), }, None, ), ConnectingState::Serving { mut server } => match Pin::new(&mut server).poll(cx) { Poll::Ready(Ok(_)) => (ConnectingState::Finished, Some(Poll::Ready(()))), Poll::Ready(Err(err)) => { tracing::debug!("Connecting error: {}", err); (ConnectingState::Finished, Some(Poll::Ready(()))) } Poll::Pending => (ConnectingState::Serving { server }, Some(Poll::Pending)), }, ConnectingState::Finished => (self, Some(Poll::Ready(()))), } } } impl Future for Connecting where C: AsyncRead + AsyncWrite + Unpin, F: Future> + Unpin, ME: std::fmt::Display, S: Service, S::Future: Send + 'static, S::Error: std::fmt::Display, { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { loop { let state = self.as_mut().state.take().unwrap(); let (next, res) = state.poll_new_state(cx); self.state = Some(next); if let Some(res) = res { return res; } } } } impl Unpin for Connecting where C: AsyncRead + AsyncWrite + Unpin, S: Service, S::Future: Send + 'static, { } pub struct PipelineService { inner: S, } impl Service for PipelineService where S: Service, S::Future: Send + 'static, { type Response = (Option, S::Response); type Error = S::Error; type Future = futures::future::BoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { self.inner.poll_ready(cx) } fn call(&mut self, req: Request) -> Self::Future { use futures::{FutureExt, TryFutureExt}; let tag = req.command.tag.clone(); self.inner.call(req).map_ok(|res| (Some(tag), res)).boxed() } }