From 1b3147d4936e15eee9f92699f12c490603d60721 Mon Sep 17 00:00:00 2001 From: KokaKiwi Date: Wed, 11 May 2022 19:15:12 +0200 Subject: [PATCH] refactor: Better connection handling --- src/server/conn.rs | 93 +++++++++++++++++++++++++++++++++++++++++ src/server/mod.rs | 94 ++---------------------------------------- src/server/pipeline.rs | 11 ++++- 3 files changed, 105 insertions(+), 93 deletions(-) create mode 100644 src/server/conn.rs diff --git a/src/server/conn.rs b/src/server/conn.rs new file mode 100644 index 0000000..669321d --- /dev/null +++ b/src/server/conn.rs @@ -0,0 +1,93 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; + +use futures::io::{AsyncRead, AsyncWrite}; +use tower::Service; + +use super::pipeline::Connection; +use super::Imap; +use crate::proto::{Request, Response}; + +#[pin_project::pin_project] +pub struct Connecting { + pub conn: Connection, + #[pin] + pub state: ConnectingState, + pub protocol: Imap, +} + +#[pin_project::pin_project(project = ConnectingStateProj)] +pub enum ConnectingState { + Waiting { + #[pin] + service_fut: F, + }, + Ready { + service: S, + }, +} + +impl Future for Connecting +where + C: AsyncRead + AsyncWrite + Unpin, + F: Future>, + ME: std::fmt::Display, + S: Service, + S::Error: std::fmt::Display, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + use tokio_tower::pipeline::Server as PipelineServer; + + let mut this = self.project(); + + loop { + let next = match this.state.as_mut().project() { + ConnectingStateProj::Waiting { service_fut } => { + let service = match futures::ready!(service_fut.poll(cx)) { + Ok(service) => service, + Err(err) => { + tracing::error!("Connection error: {}", err); + return Poll::Ready(()); + } + }; + + // TODO: Properly handle server greeting + { + use imap_codec::types::response::{Response, Status}; + + let status = match Status::ok(None, None, "Hello") { + Ok(status) => status, + Err(err) => { + tracing::error!("Connection error: {}", err); + return Poll::Ready(()); + } + }; + let res = Response::Status(status); + + if let Err(err) = this.conn.send(res) { + tracing::error!("Connection error: {}", err); + return Poll::Ready(()); + }; + } + + ConnectingState::Ready { service } + } + ConnectingStateProj::Ready { service } => { + let server = PipelineServer::new(this.conn, service); + futures::pin_mut!(server); + + return server.poll(cx).map(|res| { + if let Err(err) = res { + tracing::debug!("Connection error: {}", err); + } + }); + } + }; + + this.state.set(next); + } + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 1c5748b..1e0158b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,9 +7,11 @@ use imap_codec::types::response::Capability; use crate::proto::{Request, Response}; use accept::Accept; +use pipeline::Connection; pub use service::MakeServiceRef; pub mod accept; +mod conn; mod pipeline; mod service; @@ -70,7 +72,7 @@ where let service_fut = this.make_service.make_service_ref(&conn); tokio::task::spawn(conn::Connecting { - conn, + conn: Connection::new(conn), state: conn::ConnectingState::Waiting { service_fut }, protocol: this.protocol.clone(), }); @@ -115,93 +117,3 @@ impl Builder { } } } - -mod conn { - use std::future::Future; - use std::pin::Pin; - use std::task::{self, Poll}; - - use futures::io::{AsyncRead, AsyncWrite}; - use tower::Service; - - use super::Imap; - use crate::proto::{Request, Response}; - - #[pin_project::pin_project] - pub struct Connecting { - pub conn: C, - #[pin] - pub state: ConnectingState, - pub protocol: Imap, - } - - #[pin_project::pin_project(project = ConnectingStateProj)] - pub enum ConnectingState { - Waiting { - #[pin] - service_fut: F, - }, - Ready { - service: S, - }, - } - - impl Future for Connecting - where - C: AsyncRead + AsyncWrite + Unpin, - F: Future>, - ME: std::fmt::Display, - S: Service, - S::Error: std::fmt::Display, - { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - use tokio_tower::pipeline::Server as PipelineServer; - - use super::pipeline::Connection; - - let mut this = self.project(); - - loop { - let next = match this.state.as_mut().project() { - ConnectingStateProj::Waiting { service_fut } => { - let service = match futures::ready!(service_fut.poll(cx)) { - Ok(service) => service, - Err(err) => { - tracing::debug!("Connection error: {}", err); - return Poll::Ready(()); - } - }; - - ConnectingState::Ready { service } - } - ConnectingStateProj::Ready { service } => { - let mut conn = Connection::new(this.conn); - - // TODO: Properly handle server greeting - { - use imap_codec::types::response::{Response, Status}; - - let status = Status::ok(None, None, "Hello").unwrap(); - let res = Response::Status(status); - - conn.send(res).unwrap(); - } - - let server = PipelineServer::new(conn, service); - futures::pin_mut!(server); - - if let Err(err) = futures::ready!(server.poll(cx)) { - tracing::debug!("Connection error: {}", err); - } - - return Poll::Ready(()); - } - }; - - this.state.set(next); - } - } - } -} diff --git a/src/server/pipeline.rs b/src/server/pipeline.rs index bf3d2d5..f284348 100644 --- a/src/server/pipeline.rs +++ b/src/server/pipeline.rs @@ -44,7 +44,13 @@ where Ok(res) => res, Err(e) if e.is_incomplete() => { let mut buf = [0u8; 256]; + let read = futures::ready!(this.conn.as_mut().poll_read(cx, &mut buf))?; + tracing::trace!(read = read, "transport.poll_next"); + + if read == 0 { + return Poll::Ready(None); + } let data = &buf[..read]; this.read_buf.extend(data); @@ -55,7 +61,7 @@ where return Poll::Ready(Some(Err(format!("Error: {:?}", e).into()))); } }; - tracing::debug!("Received: {:#?}", command); + tracing::debug!(command = ?command, "transport.recv"); *this.read_buf = input.into(); @@ -74,6 +80,7 @@ where let mut this = self.project(); + tracing::debug!(size = this.write_buf.len(), "transport.flush_buffer"); while !this.write_buf.is_empty() { let written = futures::ready!(this.conn.as_mut().poll_write(cx, this.write_buf))?; this.write_buf.advance(written); @@ -88,7 +95,7 @@ where let mut writer = BufMut::writer(&mut self.write_buf); - tracing::debug!("Sending: {:#?}", item); + tracing::debug!(item = ?item, "transport.send"); item.encode(&mut writer)?; Ok(())