From 59c7ff0fec7fb635bf400e2bcdfd29770c534ea9 Mon Sep 17 00:00:00 2001 From: KokaKiwi Date: Tue, 10 May 2022 16:22:52 +0200 Subject: [PATCH] server: Fix connection double-polling --- src/server/mod.rs | 129 +++++++++++++++++++++++++++++----------------- 1 file changed, 83 insertions(+), 46 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index 694f1d2..f8ab86a 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -4,7 +4,6 @@ use std::task::{self, Poll}; use futures::io::{AsyncRead, AsyncWrite}; use imap_codec::types::response::Capability; -use tower::Service; use crate::proto::{Request, Response}; use accept::Accept; @@ -51,6 +50,7 @@ where S::MakeError: Into> + std::fmt::Display, S::Error: std::fmt::Display, S::Future: Send + 'static, + S::Service: Send + 'static, { type Output = Result<(), Error>; @@ -67,9 +67,9 @@ where let service_fut = this.make_service.make_service_ref(&conn); - tokio::task::spawn(Connecting { + tokio::task::spawn(conn::Connecting { conn, - service_fut, + state: conn::ConnectingState::Waiting { service_fut }, protocol: this.protocol.clone(), }); } else { @@ -114,55 +114,92 @@ impl Builder { } } -#[pin_project::pin_project] -struct Connecting { - conn: C, - #[pin] - service_fut: F, - protocol: Imap, -} +mod conn { + use std::future::Future; + use std::pin::Pin; + use std::task::{self, Poll}; -impl Future for Connecting -where - C: AsyncRead + AsyncWrite + Unpin, - F: Future>, - ME: std::fmt::Display, - S: Service, - S::Error: std::fmt::Display, -{ - type Output = (); + use futures::io::{AsyncRead, AsyncWrite}; + use tower::Service; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - use tokio_tower::pipeline::Server as PipelineServer; + use super::Imap; + use crate::proto::{Request, Response}; - use pipeline::Connection; + #[pin_project::pin_project] + pub struct Connecting { + pub conn: C, + #[pin] + pub state: ConnectingState, + pub protocol: Imap, + } - let this = self.project(); + #[pin_project::pin_project(project = ConnectingStateProj)] + pub enum ConnectingState { + Waiting { + #[pin] + service_fut: F, + }, + Ready { + service: S, + }, + } - let service = match futures::ready!(this.service_fut.poll(cx)) { - Ok(service) => service, - Err(err) => { - tracing::debug!("Connection error: {}", err); - return Poll::Ready(()); + impl Future for Connecting + where + C: AsyncRead + AsyncWrite + Unpin, + F: Future>, + ME: std::fmt::Display, + S: Service, + S::Error: std::fmt::Display, + { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + use tokio_tower::pipeline::Server as PipelineServer; + + use super::pipeline::Connection; + + let mut this = self.project(); + + loop { + let next = match this.state.as_mut().project() { + ConnectingStateProj::Waiting { service_fut } => { + let service = match futures::ready!(service_fut.poll(cx)) { + Ok(service) => service, + Err(err) => { + tracing::debug!("Connection error: {}", err); + return Poll::Ready(()); + } + }; + + ConnectingState::Ready { service } + } + ConnectingStateProj::Ready { service } => { + let mut conn = Connection::new(this.conn); + + // TODO: Properly handle server greeting + { + use imap_codec::types::response::{Response, Status}; + + let status = Status::ok(None, None, "Hello").unwrap(); + let res = Response::Status(status); + + conn.send(res).unwrap(); + } + + let server = PipelineServer::new(conn, service); + futures::pin_mut!(server); + + if let Err(err) = futures::ready!(server.poll(cx)) { + tracing::debug!("Connection error: {}", err); + } + + return Poll::Ready(()); + } + }; + + this.state.set(next); } - }; - let mut conn = Connection::new(this.conn); - - // TODO: Properly handle server greeting - { - use imap_codec::types::response::{Response, Status}; - - let status = Status::ok(None, None, "Hello").unwrap(); - let res = Response::Status(status); - - conn.send(res).unwrap(); } - - let mut server = PipelineServer::new(conn, service); - if let Err(err) = futures::ready!(Future::poll(Pin::new(&mut server), cx)) { - tracing::debug!("Connection error: {}", err); - } - - Poll::Ready(()) } }