From 28ded269ea2545b98299afb3a3321d5fcaaff88a Mon Sep 17 00:00:00 2001 From: KokaKiwi Date: Mon, 9 May 2022 19:15:52 +0200 Subject: [PATCH] Initial commit. --- .editorconfig | 5 ++ .gitignore | 8 ++ Cargo.toml | 25 ++++++ LICENSE | 24 ++++++ examples/simple.rs | 32 ++++++++ rust-toolchain.toml | 3 + scripts/test_imap.py | 7 ++ src/lib.rs | 2 + src/proto/mod.rs | 5 ++ src/proto/req.rs | 3 + src/proto/res.rs | 1 + src/server/accept/addr.rs | 83 +++++++++++++++++++ src/server/accept/mod.rs | 79 ++++++++++++++++++ src/server/mod.rs | 168 ++++++++++++++++++++++++++++++++++++++ src/server/pipeline.rs | 136 ++++++++++++++++++++++++++++++ src/server/service.rs | 50 ++++++++++++ 16 files changed, 631 insertions(+) create mode 100644 .editorconfig create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 examples/simple.rs create mode 100644 rust-toolchain.toml create mode 100644 scripts/test_imap.py create mode 100644 src/lib.rs create mode 100644 src/proto/mod.rs create mode 100644 src/proto/req.rs create mode 100644 src/proto/res.rs create mode 100644 src/server/accept/addr.rs create mode 100644 src/server/accept/mod.rs create mode 100644 src/server/mod.rs create mode 100644 src/server/pipeline.rs create mode 100644 src/server/service.rs diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..eb6465a --- /dev/null +++ b/.editorconfig @@ -0,0 +1,5 @@ +root = true + +[*.rs] +indent_style = space +indent_size = 4 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4f35a5b --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +# Rust stuff +/target +/Cargo.lock + +# Misc stuff +.* +!.editorconfig +!.gitignore diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..9eefd0c --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "boitalettres" +version = "0.0.1" +license = "BSD-3-Clause" +edition = "2021" + +[dependencies] +bytes = "1.1" +thiserror = "1.0" +tracing = "0.1" + +# IMAP +imap-codec = "0.5" + +# Async +async-compat = "0.2" +futures = "0.3" +pin-project = "1.0" +tokio = { version = "1.18", features = ["full"] } +tokio-tower = "0.6" +tower = { version = "0.4", features = ["full"] } + +[dev-dependencies] +eyre = "0.6" +tracing-subscriber = "0.3" diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..b02b2b0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,24 @@ +Copyright (c) 2022 KokaKiwi + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: +1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. +3. Neither the name of the author nor the names of its contributors may + be used to endorse or promote products derived from this software + +THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +SUCH DAMAGE. diff --git a/examples/simple.rs b/examples/simple.rs new file mode 100644 index 0000000..d627a97 --- /dev/null +++ b/examples/simple.rs @@ -0,0 +1,32 @@ +use boitalettres::proto::{Request, Response}; +use boitalettres::server::accept::addr::{AddrIncoming, AddrStream}; +use boitalettres::server::Server; + +async fn handle_req(req: Request) -> eyre::Result { + use imap_codec::types::response::Status; + + tracing::debug!("Got request: {:#?}", req); + + Ok(Response::Status( + Status::ok(Some(req.tag), None, "Ok").map_err(|e| eyre::eyre!(e))?, + )) +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::TRACE) + .init(); + + let incoming = AddrIncoming::new("127.0.0.1:4567").await?; + + let make_service = tower::service_fn(|addr: &AddrStream| { + tracing::debug!("Accept: {} -> {}", addr.remote_addr, addr.local_addr); + futures::future::ok::<_, std::convert::Infallible>(tower::service_fn(handle_req)) + }); + + let server = Server::new(incoming).serve(make_service); + let _ = server.await; + + Ok(()) +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..ee144d5 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "stable" +components = ["rustc-dev", "rust-src"] diff --git a/scripts/test_imap.py b/scripts/test_imap.py new file mode 100644 index 0000000..f8bdf17 --- /dev/null +++ b/scripts/test_imap.py @@ -0,0 +1,7 @@ +#!/usr/bin/python +from imaplib import IMAP4 + +conn = IMAP4('127.0.0.1', port=4567) +conn.noop() + +conn.logout() diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..c6eebbb --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,2 @@ +pub mod proto; +pub mod server; diff --git a/src/proto/mod.rs b/src/proto/mod.rs new file mode 100644 index 0000000..852952c --- /dev/null +++ b/src/proto/mod.rs @@ -0,0 +1,5 @@ +pub use self::req::Request; +pub use self::res::Response; + +pub mod req; +pub mod res; diff --git a/src/proto/req.rs b/src/proto/req.rs new file mode 100644 index 0000000..8f0bcc9 --- /dev/null +++ b/src/proto/req.rs @@ -0,0 +1,3 @@ +use imap_codec::types::command::Command; + +pub type Request = Command; diff --git a/src/proto/res.rs b/src/proto/res.rs new file mode 100644 index 0000000..1519ca5 --- /dev/null +++ b/src/proto/res.rs @@ -0,0 +1 @@ +pub type Response = imap_codec::types::response::Response; diff --git a/src/server/accept/addr.rs b/src/server/accept/addr.rs new file mode 100644 index 0000000..4f77a94 --- /dev/null +++ b/src/server/accept/addr.rs @@ -0,0 +1,83 @@ +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{self, Poll}; + +use async_compat::Compat as AsyncCompat; +use futures::io::{AsyncRead, AsyncWrite}; +use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; + +use super::Accept; + +#[pin_project::pin_project] +pub struct AddrIncoming { + pub local_addr: SocketAddr, + #[pin] + listener: TcpListener, +} + +impl AddrIncoming { + pub async fn new(addr: impl ToSocketAddrs) -> std::io::Result { + let listener = TcpListener::bind(addr).await?; + let local_addr = listener.local_addr()?; + + Ok(Self { + local_addr, + listener, + }) + } +} + +impl Accept for AddrIncoming { + type Conn = AddrStream; + type Error = std::io::Error; + + fn poll_accept( + self: Pin<&mut Self>, + cx: &mut task::Context, + ) -> task::Poll>> { + let this = self.project(); + + let (stream, remote_addr) = futures::ready!(this.listener.poll_accept(cx))?; + Poll::Ready(Some(Ok(AddrStream { + local_addr: *this.local_addr, + remote_addr, + stream: AsyncCompat::new(stream), + }))) + } +} + +#[pin_project::pin_project] +pub struct AddrStream { + pub local_addr: SocketAddr, + pub remote_addr: SocketAddr, + #[pin] + stream: AsyncCompat, +} + +impl AsyncRead for AddrStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.project().stream.poll_read(cx, buf) + } +} + +impl AsyncWrite for AddrStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().stream.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().stream.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().stream.poll_close(cx) + } +} diff --git a/src/server/accept/mod.rs b/src/server/accept/mod.rs new file mode 100644 index 0000000..81e4c22 --- /dev/null +++ b/src/server/accept/mod.rs @@ -0,0 +1,79 @@ +use std::error::Error as StdError; +use std::pin::Pin; +use std::task; + +use futures::io; +use futures::Stream; + +pub mod addr; + +pub trait Accept { + type Conn: io::AsyncRead + io::AsyncWrite; + type Error: StdError; + + fn poll_accept( + self: Pin<&mut Self>, + cx: &mut task::Context, + ) -> task::Poll>>; +} + +pub fn poll_fn(f: F) -> impl Accept +where + F: FnMut(&mut task::Context) -> task::Poll>>, + IO: io::AsyncRead + io::AsyncWrite, + E: StdError, +{ + struct PollFn(F); + + impl Unpin for PollFn {} + + impl Accept for PollFn + where + F: FnMut(&mut task::Context) -> task::Poll>>, + IO: io::AsyncRead + io::AsyncWrite, + E: StdError, + { + type Conn = IO; + type Error = E; + + fn poll_accept( + self: Pin<&mut Self>, + cx: &mut task::Context, + ) -> task::Poll>> { + (self.get_mut().0)(cx) + } + } + + PollFn(f) +} + +pub fn from_stream(stream: S) -> impl Accept +where + S: Stream>, + IO: io::AsyncRead + io::AsyncWrite, + E: StdError, +{ + use pin_project::pin_project; + + #[pin_project] + struct FromStream(#[pin] S); + + impl Accept for FromStream + where + S: Stream>, + IO: io::AsyncRead + io::AsyncWrite, + E: StdError, + { + type Conn = IO; + type Error = E; + + fn poll_accept( + self: Pin<&mut Self>, + cx: &mut task::Context, + ) -> task::Poll>> { + self.project().0.poll_next(cx) + } + } + + FromStream(stream) +} diff --git a/src/server/mod.rs b/src/server/mod.rs new file mode 100644 index 0000000..694f1d2 --- /dev/null +++ b/src/server/mod.rs @@ -0,0 +1,168 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; + +use futures::io::{AsyncRead, AsyncWrite}; +use imap_codec::types::response::Capability; +use tower::Service; + +use crate::proto::{Request, Response}; +use accept::Accept; +pub use service::MakeServiceRef; + +pub mod accept; +mod pipeline; +mod service; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + Accept(#[source] A), + MakeService(#[source] Box), +} + +#[derive(Debug, Default, Clone)] +pub struct Imap { + pub capabilities: Vec, +} + +#[pin_project::pin_project] +pub struct Server { + #[pin] + incoming: I, + make_service: S, + protocol: Imap, +} + +impl Server +where + I: Accept, +{ + #[allow(clippy::new_ret_no_self)] + pub fn new(incoming: I) -> Builder { + Builder::new(incoming) + } +} + +impl Future for Server +where + I: Accept, + I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static, + S: MakeServiceRef, + S::MakeError: Into> + std::fmt::Display, + S::Error: std::fmt::Display, + S::Future: Send + 'static, +{ + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + loop { + let this = self.as_mut().project(); + + if let Some(conn) = futures::ready!(this.incoming.poll_accept(cx)) { + let conn = conn.map_err(Error::Accept)?; + + futures::ready!(this.make_service.poll_ready_ref(cx)) + .map_err(Into::into) + .map_err(Error::MakeService)?; + + let service_fut = this.make_service.make_service_ref(&conn); + + tokio::task::spawn(Connecting { + conn, + service_fut, + protocol: this.protocol.clone(), + }); + } else { + return Poll::Ready(Ok(())); + } + } + } +} + +pub struct Builder { + incoming: I, + protocol: Imap, +} + +#[allow(clippy::needless_update)] +impl Builder { + pub fn new(incoming: I) -> Self { + Self { + incoming, + protocol: Default::default(), + } + } + + pub fn capabilities(self, capabilities: impl IntoIterator) -> Self { + let protocol = Imap { + capabilities: capabilities.into_iter().collect(), + ..self.protocol + }; + + Self { protocol, ..self } + } + + pub fn serve(self, make_service: S) -> Server + where + S: MakeServiceRef, + { + Server { + incoming: self.incoming, + make_service, + protocol: self.protocol, + } + } +} + +#[pin_project::pin_project] +struct Connecting { + conn: C, + #[pin] + service_fut: F, + protocol: Imap, +} + +impl Future for Connecting +where + C: AsyncRead + AsyncWrite + Unpin, + F: Future>, + ME: std::fmt::Display, + S: Service, + S::Error: std::fmt::Display, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + use tokio_tower::pipeline::Server as PipelineServer; + + use pipeline::Connection; + + let this = self.project(); + + let service = match futures::ready!(this.service_fut.poll(cx)) { + Ok(service) => service, + Err(err) => { + tracing::debug!("Connection error: {}", err); + return Poll::Ready(()); + } + }; + let mut conn = Connection::new(this.conn); + + // TODO: Properly handle server greeting + { + use imap_codec::types::response::{Response, Status}; + + let status = Status::ok(None, None, "Hello").unwrap(); + let res = Response::Status(status); + + conn.send(res).unwrap(); + } + + let mut server = PipelineServer::new(conn, service); + if let Err(err) = futures::ready!(Future::poll(Pin::new(&mut server), cx)) { + tracing::debug!("Connection error: {}", err); + } + + Poll::Ready(()) + } +} diff --git a/src/server/pipeline.rs b/src/server/pipeline.rs new file mode 100644 index 0000000..bf3d2d5 --- /dev/null +++ b/src/server/pipeline.rs @@ -0,0 +1,136 @@ +use std::pin::Pin; +use std::task::{self, Poll}; + +use bytes::BytesMut; +use futures::io::{AsyncRead, AsyncWrite}; +use futures::sink::Sink; +use futures::stream::Stream; + +use crate::proto::{Request, Response}; + +type Error = Box; + +#[pin_project::pin_project] +pub struct Connection { + #[pin] + conn: C, + read_buf: BytesMut, + write_buf: BytesMut, +} + +impl Connection { + pub fn new(conn: C) -> Self { + Self { + conn, + read_buf: BytesMut::new(), + write_buf: BytesMut::new(), + } + } +} + +impl Stream for Connection +where + C: AsyncRead + Unpin, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + use imap_codec::parse::command::command as parse_command; + + let mut this = self.project(); + + loop { + let (input, command) = match parse_command(this.read_buf) { + Ok(res) => res, + Err(e) if e.is_incomplete() => { + let mut buf = [0u8; 256]; + let read = futures::ready!(this.conn.as_mut().poll_read(cx, &mut buf))?; + + let data = &buf[..read]; + this.read_buf.extend(data); + + continue; + } + Err(e) => { + return Poll::Ready(Some(Err(format!("Error: {:?}", e).into()))); + } + }; + tracing::debug!("Received: {:#?}", command); + + *this.read_buf = input.into(); + + let req = command; + return Poll::Ready(Some(Ok(req))); + } + } +} + +impl Connection +where + C: AsyncWrite, +{ + fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { + use bytes::Buf; + + let mut this = self.project(); + + while !this.write_buf.is_empty() { + let written = futures::ready!(this.conn.as_mut().poll_write(cx, this.write_buf))?; + this.write_buf.advance(written); + } + + Poll::Ready(Ok(())) + } + + pub(crate) fn send(&mut self, item: Response) -> Result<(), Error> { + use bytes::BufMut; + use imap_codec::codec::Encode; + + let mut writer = BufMut::writer(&mut self.write_buf); + + tracing::debug!("Sending: {:#?}", item); + item.encode(&mut writer)?; + + Ok(()) + } +} + +impl Sink for Connection +where + C: AsyncWrite + Unpin, +{ + type Error = Error; + + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll> { + futures::ready!(self.poll_flush_buffer(cx))?; + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> { + debug_assert!(self.write_buf.is_empty()); + self.get_mut().send(item)?; + + Ok(()) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll> { + futures::ready!(self.as_mut().poll_flush_buffer(cx))?; + futures::ready!(self.project().conn.poll_flush(cx))?; + Poll::Ready(Ok(())) + } + + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll> { + futures::ready!(self.as_mut().poll_flush_buffer(cx))?; + futures::ready!(self.project().conn.poll_close(cx))?; + Poll::Ready(Ok(())) + } +} diff --git a/src/server/service.rs b/src/server/service.rs new file mode 100644 index 0000000..e931634 --- /dev/null +++ b/src/server/service.rs @@ -0,0 +1,50 @@ +use std::future::Future; +use std::task::Context; +use std::task::Poll; + +use tower::Service; + +pub trait MakeServiceRef: self::sealed::Sealed<(Target, Request)> { + type Response; + type Error; + type Service: Service; + + type MakeError; + type Future: Future>; + + fn poll_ready_ref(&mut self, cx: &mut Context) -> Poll>; + fn make_service_ref(&mut self, target: &Target) -> Self::Future; +} + +impl self::sealed::Sealed<(Target, Request)> for M +where + M: for<'a> Service<&'a Target, Response = S>, + S: Service, +{ +} + +impl MakeServiceRef for M +where + M: for<'a> Service<&'a Target, Response = S, Future = MF, Error = ME>, + MF: Future>, + S: Service, +{ + type Response = S::Response; + type Error = S::Error; + type Service = S; + + type MakeError = ME; + type Future = MF; + + fn poll_ready_ref(&mut self, cx: &mut Context) -> Poll> { + self.poll_ready(cx) + } + + fn make_service_ref(&mut self, target: &Target) -> Self::Future { + self.call(target) + } +} + +mod sealed { + pub trait Sealed {} +}