From 0d667a30301bec47c03314ff0e449a220ad3b913 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 2 Jan 2024 20:23:33 +0100 Subject: [PATCH] compile with imap-flow --- src/imap/command/anonymous.rs | 22 +-- src/imap/command/anystate.rs | 6 +- src/imap/command/authenticated.rs | 40 ++++-- src/imap/command/examined.rs | 16 +-- src/imap/command/selected.rs | 24 ++-- src/imap/flow.rs | 24 ++-- src/imap/mailbox_view.rs | 28 ++-- src/imap/mod.rs | 227 ++++++++++++++++++++---------- src/imap/response.rs | 6 +- src/imap/session.rs | 226 +++++++++-------------------- src/server.rs | 4 +- 11 files changed, 313 insertions(+), 310 deletions(-) diff --git a/src/imap/command/anonymous.rs b/src/imap/command/anonymous.rs index 4de5fbd..fbd10e9 100644 --- a/src/imap/command/anonymous.rs +++ b/src/imap/command/anonymous.rs @@ -1,7 +1,6 @@ use anyhow::Result; use imap_codec::imap_types::command::{Command, CommandBody}; -use imap_codec::imap_types::core::{AString, NonEmptyVec}; -use imap_codec::imap_types::response::{Capability, Data}; +use imap_codec::imap_types::core::AString; use imap_codec::imap_types::secret::Secret; use crate::imap::command::anystate; @@ -13,16 +12,16 @@ use crate::mail::user::User; //--- dispatching pub struct AnonymousContext<'a> { - pub req: &'a Command<'a>, + pub req: &'a Command<'static>, pub login_provider: &'a ArcLoginProvider, } -pub async fn dispatch<'a>(ctx: AnonymousContext<'a>) -> Result<(Response<'a>, flow::Transition)> { +pub async fn dispatch(ctx: AnonymousContext<'_>) -> Result<(Response<'static>, flow::Transition)> { match &ctx.req.body { // Any State CommandBody::Noop => anystate::noop_nothing(ctx.req.tag.clone()), CommandBody::Capability => anystate::capability(ctx.req.tag.clone()), - CommandBody::Logout => Ok((Response::bye()?, flow::Transition::Logout)), + CommandBody::Logout => anystate::logout(), // Specific to anonymous context (3 commands) CommandBody::Login { username, password } => ctx.login(username, password).await, @@ -39,22 +38,11 @@ pub async fn dispatch<'a>(ctx: AnonymousContext<'a>) -> Result<(Response<'a>, fl //--- Command controllers, private impl<'a> AnonymousContext<'a> { - async fn capability(self) -> Result<(Response<'a>, flow::Transition)> { - let capabilities: NonEmptyVec = - (vec![Capability::Imap4Rev1, Capability::Idle]).try_into()?; - let res = Response::build() - .to_req(self.req) - .message("Server capabilities") - .data(Data::Capability(capabilities)) - .ok()?; - Ok((res, flow::Transition::None)) - } - async fn login( self, username: &AString<'a>, password: &Secret>, - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { let (u, p) = ( std::str::from_utf8(username.as_ref())?, std::str::from_utf8(password.declassify().as_ref())?, diff --git a/src/imap/command/anystate.rs b/src/imap/command/anystate.rs index ea3bc16..42fe645 100644 --- a/src/imap/command/anystate.rs +++ b/src/imap/command/anystate.rs @@ -5,7 +5,7 @@ use imap_codec::imap_types::response::{Capability, Data}; use crate::imap::flow; use crate::imap::response::Response; -pub(crate) fn capability<'a>(tag: Tag<'a>) -> Result<(Response<'a>, flow::Transition)> { +pub(crate) fn capability(tag: Tag<'static>) -> Result<(Response<'static>, flow::Transition)> { let capabilities: NonEmptyVec = (vec![Capability::Imap4Rev1, Capability::Idle]).try_into()?; let res = Response::build() @@ -17,7 +17,7 @@ pub(crate) fn capability<'a>(tag: Tag<'a>) -> Result<(Response<'a>, flow::Transi Ok((res, flow::Transition::None)) } -pub(crate) fn noop_nothing<'a>(tag: Tag<'a>) -> Result<(Response<'a>, flow::Transition)> { +pub(crate) fn noop_nothing(tag: Tag<'static>) -> Result<(Response<'static>, flow::Transition)> { Ok(( Response::build().tag(tag).message("Noop completed.").ok()?, flow::Transition::None, @@ -41,7 +41,7 @@ pub(crate) fn not_implemented<'a>( )) } -pub(crate) fn wrong_state<'a>(tag: Tag<'a>) -> Result<(Response<'a>, flow::Transition)> { +pub(crate) fn wrong_state(tag: Tag<'static>) -> Result<(Response<'static>, flow::Transition)> { Ok(( Response::build() .tag(tag) diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index c9f9ff7..74ebbfa 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -21,18 +21,18 @@ use crate::mail::user::{User, MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW} use crate::mail::IMF; pub struct AuthenticatedContext<'a> { - pub req: &'a Command<'a>, + pub req: &'a Command<'static>, pub user: &'a Arc, } pub async fn dispatch<'a>( ctx: AuthenticatedContext<'a>, -) -> Result<(Response<'a>, flow::Transition)> { +) -> Result<(Response<'static>, flow::Transition)> { match &ctx.req.body { // Any state CommandBody::Noop => anystate::noop_nothing(ctx.req.tag.clone()), CommandBody::Capability => anystate::capability(ctx.req.tag.clone()), - CommandBody::Logout => Ok((Response::bye()?, flow::Transition::Logout)), + CommandBody::Logout => anystate::logout(), // Specific to this state (11 commands) CommandBody::Create { mailbox } => ctx.create(mailbox).await, @@ -68,7 +68,10 @@ pub async fn dispatch<'a>( // --- PRIVATE --- impl<'a> AuthenticatedContext<'a> { - async fn create(self, mailbox: &MailboxCodec<'a>) -> Result<(Response<'a>, flow::Transition)> { + async fn create( + self, + mailbox: &MailboxCodec<'a>, + ) -> Result<(Response<'static>, flow::Transition)> { let name = match mailbox { MailboxCodec::Inbox => { return Ok(( @@ -100,7 +103,10 @@ impl<'a> AuthenticatedContext<'a> { } } - async fn delete(self, mailbox: &MailboxCodec<'a>) -> Result<(Response<'a>, flow::Transition)> { + async fn delete( + self, + mailbox: &MailboxCodec<'a>, + ) -> Result<(Response<'static>, flow::Transition)> { let name: &str = MailboxName(mailbox).try_into()?; match self.user.delete_mailbox(&name).await { @@ -125,7 +131,7 @@ impl<'a> AuthenticatedContext<'a> { self, from: &MailboxCodec<'a>, to: &MailboxCodec<'a>, - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { let name: &str = MailboxName(from).try_into()?; let new_name: &str = MailboxName(to).try_into()?; @@ -152,7 +158,7 @@ impl<'a> AuthenticatedContext<'a> { reference: &MailboxCodec<'a>, mailbox_wildcard: &ListMailbox<'a>, is_lsub: bool, - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { let mbx_hier_delim: QuotedChar = QuotedChar::unvalidated(MBX_HIER_DELIM_RAW); let reference: &str = MailboxName(reference).try_into()?; @@ -259,9 +265,9 @@ impl<'a> AuthenticatedContext<'a> { async fn status( self, - mailbox: &MailboxCodec<'a>, + mailbox: &MailboxCodec<'static>, attributes: &[StatusDataItemName], - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { let name: &str = MailboxName(mailbox).try_into()?; let mb_opt = self.user.open_mailbox(name).await?; let mb = match mb_opt { @@ -316,7 +322,7 @@ impl<'a> AuthenticatedContext<'a> { async fn subscribe( self, mailbox: &MailboxCodec<'a>, - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { let name: &str = MailboxName(mailbox).try_into()?; if self.user.has_mailbox(&name).await? { @@ -341,7 +347,7 @@ impl<'a> AuthenticatedContext<'a> { async fn unsubscribe( self, mailbox: &MailboxCodec<'a>, - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { let name: &str = MailboxName(mailbox).try_into()?; if self.user.has_mailbox(&name).await? { @@ -399,7 +405,10 @@ impl<'a> AuthenticatedContext<'a> { * TRACE END --- */ - async fn select(self, mailbox: &MailboxCodec<'a>) -> Result<(Response<'a>, flow::Transition)> { + async fn select( + self, + mailbox: &MailboxCodec<'a>, + ) -> Result<(Response<'static>, flow::Transition)> { let name: &str = MailboxName(mailbox).try_into()?; let mb_opt = self.user.open_mailbox(&name).await?; @@ -430,7 +439,10 @@ impl<'a> AuthenticatedContext<'a> { )) } - async fn examine(self, mailbox: &MailboxCodec<'a>) -> Result<(Response<'a>, flow::Transition)> { + async fn examine( + self, + mailbox: &MailboxCodec<'a>, + ) -> Result<(Response<'static>, flow::Transition)> { let name: &str = MailboxName(mailbox).try_into()?; let mb_opt = self.user.open_mailbox(&name).await?; @@ -468,7 +480,7 @@ impl<'a> AuthenticatedContext<'a> { flags: &[Flag<'a>], date: &Option, message: &Literal<'a>, - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { let append_tag = self.req.tag.clone(); match self.append_internal(mailbox, flags, date, message).await { Ok((_mb, uidvalidity, uid)) => Ok(( diff --git a/src/imap/command/examined.rs b/src/imap/command/examined.rs index 7f9c39c..eec85cd 100644 --- a/src/imap/command/examined.rs +++ b/src/imap/command/examined.rs @@ -14,17 +14,17 @@ use crate::imap::response::Response; use crate::mail::user::User; pub struct ExaminedContext<'a> { - pub req: &'a Command<'a>, + pub req: &'a Command<'static>, pub user: &'a Arc, pub mailbox: &'a mut MailboxView, } -pub async fn dispatch<'a>(ctx: ExaminedContext<'a>) -> Result<(Response<'a>, flow::Transition)> { +pub async fn dispatch(ctx: ExaminedContext<'_>) -> Result<(Response<'static>, flow::Transition)> { match &ctx.req.body { // Any State // noop is specific to this state CommandBody::Capability => anystate::capability(ctx.req.tag.clone()), - CommandBody::Logout => Ok((Response::bye()?, flow::Transition::Logout)), + CommandBody::Logout => anystate::logout(), // Specific to the EXAMINE state (specialization of the SELECTED state) // ~3 commands -> close, fetch, search + NOOP @@ -58,7 +58,7 @@ pub async fn dispatch<'a>(ctx: ExaminedContext<'a>) -> Result<(Response<'a>, flo impl<'a> ExaminedContext<'a> { /// CLOSE in examined state is not the same as in selected state /// (in selected state it also does an EXPUNGE, here it doesn't) - async fn close(self) -> Result<(Response<'a>, flow::Transition)> { + async fn close(self) -> Result<(Response<'static>, flow::Transition)> { Ok(( Response::build() .to_req(self.req) @@ -71,9 +71,9 @@ impl<'a> ExaminedContext<'a> { pub async fn fetch( self, sequence_set: &SequenceSet, - attributes: &'a MacroOrMessageDataItemNames<'a>, + attributes: &'a MacroOrMessageDataItemNames<'static>, uid: &bool, - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { match self.mailbox.fetch(sequence_set, attributes, uid).await { Ok(resp) => Ok(( Response::build() @@ -98,7 +98,7 @@ impl<'a> ExaminedContext<'a> { _charset: &Option>, _criteria: &SearchKey<'a>, _uid: &bool, - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { Ok(( Response::build() .to_req(self.req) @@ -108,7 +108,7 @@ impl<'a> ExaminedContext<'a> { )) } - pub async fn noop(self) -> Result<(Response<'a>, flow::Transition)> { + pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> { self.mailbox.mailbox.force_sync().await?; let updates = self.mailbox.update().await?; diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index cd5d221..d5dcd61 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -18,17 +18,19 @@ use crate::imap::response::Response; use crate::mail::user::User; pub struct SelectedContext<'a> { - pub req: &'a Command<'a>, + pub req: &'a Command<'static>, pub user: &'a Arc, pub mailbox: &'a mut MailboxView, } -pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response<'a>, flow::Transition)> { +pub async fn dispatch<'a>( + ctx: SelectedContext<'a>, +) -> Result<(Response<'static>, flow::Transition)> { match &ctx.req.body { // Any State // noop is specific to this state CommandBody::Capability => anystate::capability(ctx.req.tag.clone()), - CommandBody::Logout => Ok((Response::bye()?, flow::Transition::Logout)), + CommandBody::Logout => anystate::logout(), // Specific to this state (7 commands + NOOP) CommandBody::Close => ctx.close().await, @@ -65,7 +67,7 @@ pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response<'a>, flo // --- PRIVATE --- impl<'a> SelectedContext<'a> { - async fn close(self) -> Result<(Response<'a>, flow::Transition)> { + async fn close(self) -> Result<(Response<'static>, flow::Transition)> { // We expunge messages, // but we don't send the untagged EXPUNGE responses let tag = self.req.tag.clone(); @@ -79,9 +81,9 @@ impl<'a> SelectedContext<'a> { pub async fn fetch( self, sequence_set: &SequenceSet, - attributes: &'a MacroOrMessageDataItemNames<'a>, + attributes: &'a MacroOrMessageDataItemNames<'static>, uid: &bool, - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { match self.mailbox.fetch(sequence_set, attributes, uid).await { Ok(resp) => Ok(( Response::build() @@ -106,7 +108,7 @@ impl<'a> SelectedContext<'a> { _charset: &Option>, _criteria: &SearchKey<'a>, _uid: &bool, - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { Ok(( Response::build() .to_req(self.req) @@ -116,7 +118,7 @@ impl<'a> SelectedContext<'a> { )) } - pub async fn noop(self) -> Result<(Response<'a>, flow::Transition)> { + pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> { self.mailbox.mailbox.force_sync().await?; let updates = self.mailbox.update().await?; @@ -130,7 +132,7 @@ impl<'a> SelectedContext<'a> { )) } - async fn expunge(self) -> Result<(Response<'a>, flow::Transition)> { + async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> { let tag = self.req.tag.clone(); let data = self.mailbox.expunge().await?; @@ -151,7 +153,7 @@ impl<'a> SelectedContext<'a> { response: &StoreResponse, flags: &[Flag<'a>], uid: &bool, - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { let data = self .mailbox .store(sequence_set, kind, response, flags, uid) @@ -172,7 +174,7 @@ impl<'a> SelectedContext<'a> { sequence_set: &SequenceSet, mailbox: &MailboxCodec<'a>, uid: &bool, - ) -> Result<(Response<'a>, flow::Transition)> { + ) -> Result<(Response<'static>, flow::Transition)> { let name: &str = MailboxName(mailbox).try_into()?; let mb_opt = self.user.open_mailbox(&name).await?; diff --git a/src/imap/flow.rs b/src/imap/flow.rs index eb94bb5..95810c1 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -37,23 +37,27 @@ pub enum Transition { // See RFC3501 section 3. // https://datatracker.ietf.org/doc/html/rfc3501#page-13 impl State { - pub fn apply(self, tr: Transition) -> Result { - match (self, tr) { - (s, Transition::None) => Ok(s), - (State::NotAuthenticated, Transition::Authenticate(u)) => Ok(State::Authenticated(u)), + pub fn apply(&mut self, tr: Transition) -> Result<(), Error> { + let new_state = match (&self, tr) { + (_s, Transition::None) => return Ok(()), + (State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u), ( State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _), Transition::Select(m), - ) => Ok(State::Selected(u, m)), + ) => State::Selected(u.clone(), m), ( State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _), Transition::Examine(m), - ) => Ok(State::Examined(u, m)), + ) => State::Examined(u.clone(), m), (State::Selected(u, _) | State::Examined(u, _), Transition::Unselect) => { - Ok(State::Authenticated(u)) + State::Authenticated(u.clone()) } - (_, Transition::Logout) => Ok(State::Logout), - _ => Err(Error::ForbiddenTransition), - } + (_, Transition::Logout) => State::Logout, + _ => return Err(Error::ForbiddenTransition), + }; + + *self = new_state; + + Ok(()) } } diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index 2e5444b..fd58de7 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -75,14 +75,26 @@ impl<'a> FetchedMail<'a> { } } -pub struct AttributesProxy<'a> { - attrs: Vec>, +pub struct AttributesProxy { + attrs: Vec>, } -impl<'a> AttributesProxy<'a> { - fn new(attrs: &'a MacroOrMessageDataItemNames<'a>, is_uid_fetch: bool) -> Self { +impl AttributesProxy { + fn new(attrs: &MacroOrMessageDataItemNames<'static>, is_uid_fetch: bool) -> Self { // Expand macros let mut fetch_attrs = match attrs { - MacroOrMessageDataItemNames::Macro(m) => m.expand(), + MacroOrMessageDataItemNames::Macro(m) => { + use imap_codec::imap_types::fetch::Macro; + use MessageDataItemName::*; + match m { + Macro::All => vec![Flags, InternalDate, Rfc822Size, Envelope], + Macro::Fast => vec![Flags, InternalDate, Rfc822Size], + Macro::Full => vec![Flags, InternalDate, Rfc822Size, Envelope, Body], + _ => { + tracing::error!("unimplemented macro"); + vec![] + } + } + } MacroOrMessageDataItemNames::MessageDataItemNames(a) => a.clone(), }; @@ -248,7 +260,7 @@ impl<'a> MailView<'a> { Ok(MessageDataItem::InternalDate(DateTime::unvalidated(dt))) } - fn filter<'b>(&self, ap: &AttributesProxy<'b>) -> Result<(Body<'b>, SeenFlag)> { + fn filter<'b>(&self, ap: &AttributesProxy) -> Result<(Body<'static>, SeenFlag)> { let mut seen = SeenFlag::DoNothing; let res_attrs = ap .attrs @@ -593,9 +605,9 @@ impl MailboxView { pub async fn fetch<'b>( &self, sequence_set: &SequenceSet, - attributes: &'b MacroOrMessageDataItemNames<'b>, + attributes: &'b MacroOrMessageDataItemNames<'static>, is_uid_fetch: &bool, - ) -> Result>> { + ) -> Result>> { let ap = AttributesProxy::new(attributes, *is_uid_fetch); // Prepare data diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 589231b..31eeaa8 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -4,104 +4,183 @@ mod mailbox_view; mod response; mod session; -use std::task::{Context, Poll}; +use std::net::SocketAddr; use anyhow::Result; -//use boitalettres::errors::Error as BalError; -//use boitalettres::proto::{Request, Response}; -//use boitalettres::server::accept::addr::AddrIncoming; -//use boitalettres::server::accept::addr::AddrStream; -//use boitalettres::server::Server as ImapServer; -use futures::future::BoxFuture; -use futures::future::FutureExt; +use futures::stream::{FuturesUnordered, StreamExt}; + +use tokio::net::TcpListener; use tokio::sync::watch; +use imap_codec::imap_types::response::Greeting; +use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions}; +use imap_flow::stream::AnyStream; + use crate::config::ImapConfig; use crate::login::ArcLoginProvider; /// Server is a thin wrapper to register our Services in BàL -pub struct Server {} - -pub async fn new(config: ImapConfig, login: ArcLoginProvider) -> Result { - unimplemented!(); - /* let incoming = AddrIncoming::new(config.bind_addr).await?; - tracing::info!("IMAP activated, will listen on {:#}", incoming.local_addr); - - let imap = ImapServer::new(incoming).serve(Instance::new(login.clone())); - Ok(Server(imap))*/ -} - -impl Server { - pub async fn run(self, mut must_exit: watch::Receiver) -> Result<()> { - tracing::info!("IMAP started!"); - unimplemented!(); - /*tokio::select! { - s = self.0 => s?, - _ = must_exit.changed() => tracing::info!("Stopped IMAP server"), - } - - Ok(())*/ - } -} - -//--- -/* -/// Instance is the main Tokio Tower service that we register in BàL. -/// It receives new connection demands and spawn a dedicated service. -struct Instance { +pub struct Server { + bind_addr: SocketAddr, login_provider: ArcLoginProvider, } -impl Instance { - pub fn new(login_provider: ArcLoginProvider) -> Self { - Self { login_provider } +struct ClientContext { + stream: AnyStream, + addr: SocketAddr, + login_provider: ArcLoginProvider, + must_exit: watch::Receiver, +} + +pub fn new(config: ImapConfig, login: ArcLoginProvider) -> Server { + Server { + bind_addr: config.bind_addr, + login_provider: login, } } -impl<'a> Service<&'a AddrStream> for Instance { - type Response = Connection; - type Error = anyhow::Error; - type Future = BoxFuture<'static, Result>; +impl Server { + pub async fn run(self: Self, mut must_exit: watch::Receiver) -> Result<()> { + let tcp = TcpListener::bind(self.bind_addr).await?; + tracing::info!("IMAP server listening on {:#}", self.bind_addr); - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + let mut connections = FuturesUnordered::new(); - fn call(&mut self, addr: &'a AddrStream) -> Self::Future { - tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept"); - let lp = self.login_provider.clone(); - async { Ok(Connection::new(lp)) }.boxed() + while !*must_exit.borrow() { + let wait_conn_finished = async { + if connections.is_empty() { + futures::future::pending().await + } else { + connections.next().await + } + }; + let (socket, remote_addr) = tokio::select! { + a = tcp.accept() => a?, + _ = wait_conn_finished => continue, + _ = must_exit.changed() => continue, + }; + tracing::info!("IMAP: accepted connection from {}", remote_addr); + + let client = ClientContext { + stream: AnyStream::new(socket), + addr: remote_addr.clone(), + login_provider: self.login_provider.clone(), + must_exit: must_exit.clone(), + }; + let conn = tokio::spawn(client_wrapper(client)); + connections.push(conn); + } + drop(tcp); + + tracing::info!("IMAP server shutting down, draining remaining connections..."); + while connections.next().await.is_some() {} + + Ok(()) } } -//--- - -/// Connection is the per-connection Tokio Tower service we register in BàL. -/// It handles a single TCP connection, and thus has a business logic. -struct Connection { - session: session::Manager, -} - -impl Connection { - pub fn new(login_provider: ArcLoginProvider) -> Self { - Self { - session: session::Manager::new(login_provider), +async fn client_wrapper(ctx: ClientContext) { + let addr = ctx.addr.clone(); + match client(ctx).await { + Ok(()) => { + tracing::info!("closing successful session for {:?}", addr); + } + Err(e) => { + tracing::error!("closing errored session for {:?}: {}", addr, e); } } } -impl Service for Connection { - type Response = Response; - type Error = BalError; - type Future = BoxFuture<'static, Result>; +async fn client(mut ctx: ClientContext) -> Result<()> { + // Send greeting + let (mut server, _) = ServerFlow::send_greeting( + ctx.stream, + ServerFlowOptions::default(), + Greeting::ok(None, "Aerogramme").unwrap(), + ) + .await?; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + use crate::imap::response::{Body, Response as MyResponse}; + use crate::imap::session::Instance; + use imap_codec::imap_types::command::Command; + use imap_codec::imap_types::response::{Response, Status}; + + use tokio::sync::mpsc; + let (cmd_tx, mut cmd_rx) = mpsc::channel::>(10); + let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::>(); + + let bckgrnd = tokio::spawn(async move { + let mut session = Instance::new(ctx.login_provider); + loop { + let cmd = match cmd_rx.recv().await { + None => break, + Some(cmd_recv) => cmd_recv, + }; + + let maybe_response = session.command(cmd).await; + + match resp_tx.send(maybe_response) { + Err(_) => break, + Ok(_) => (), + }; + } + tracing::info!("runner is quitting"); + }); + + // Main loop + loop { + tokio::select! { + // Managing imap_flow stuff + srv_evt = server.progress() => match srv_evt? { + ServerFlowEvent::ResponseSent { handle: _handle, response } => { + match response { + Response::Status(Status::Bye(_)) => break, + _ => tracing::trace!("sent to {} content {:?}", ctx.addr, response), + } + }, + ServerFlowEvent::CommandReceived { command } => { + match cmd_tx.try_send(command) { + Ok(_) => (), + Err(mpsc::error::TrySendError::Full(_)) => { + server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr); + } + _ => { + server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", ctx.addr); + } + } + }, + }, + + // Managing response generated by Aerogramme + maybe_msg = resp_rx.recv() => { + let response = match maybe_msg { + None => { + server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", ctx.addr); + continue + }, + Some(r) => r, + }; + + for body_elem in response.body.into_iter() { + let _handle = match body_elem { + Body::Data(d) => server.enqueue_data(d), + Body::Status(s) => server.enqueue_status(s), + }; + } + server.enqueue_status(response.completion); + }, + + // When receiving a CTRL+C + _ = ctx.must_exit.changed() => { + server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + }, + }; } - fn call(&mut self, req: Request) -> Self::Future { - tracing::debug!("Got request: {:#?}", req.command); - self.session.process(req) - } + drop(cmd_tx); + bckgrnd.await?; + Ok(()) } -*/ diff --git a/src/imap/response.rs b/src/imap/response.rs index 012c8ed..d20e58e 100644 --- a/src/imap/response.rs +++ b/src/imap/response.rs @@ -47,11 +47,13 @@ impl<'a> ResponseBuilder<'a> { self } + #[allow(dead_code)] pub fn info(mut self, status: Status<'a>) -> Self { self.body.push(Body::Status(status)); self } + #[allow(dead_code)] pub fn many_info(mut self, status: Vec>) -> Self { for d in status.into_iter() { self = self.info(d); @@ -87,8 +89,8 @@ impl<'a> ResponseBuilder<'a> { } pub struct Response<'a> { - body: Vec>, - completion: Status<'a>, + pub body: Vec>, + pub completion: Status<'a>, } impl<'a> Response<'a> { diff --git a/src/imap/session.rs b/src/imap/session.rs index e2af18b..5c67f8e 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -1,182 +1,86 @@ -use anyhow::Error; -//use boitalettres::errors::Error as BalError; -//use boitalettres::proto::{Request, Response}; -use futures::future::BoxFuture; -use futures::future::FutureExt; - -use tokio::sync::mpsc::error::TrySendError; -use tokio::sync::{mpsc, oneshot}; - use crate::imap::command::{anonymous, authenticated, examined, selected}; use crate::imap::flow; +use crate::imap::response::Response; use crate::login::ArcLoginProvider; - -/* -/* This constant configures backpressure in the system, - * or more specifically, how many pipelined messages are allowed - * before refusing them - */ -const MAX_PIPELINED_COMMANDS: usize = 10; - -struct Message { - req: Request, - tx: oneshot::Sender>, -} +use imap_codec::imap_types::command::Command; //----- - -pub struct Manager { - tx: mpsc::Sender, -} - -impl Manager { - pub fn new(login_provider: ArcLoginProvider) -> Self { - let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); - tokio::spawn(async move { - let instance = Instance::new(login_provider, rx); - instance.start().await; - }); - Self { tx } - } - - pub fn process(&self, req: Request) -> BoxFuture<'static, Result> { - let (tx, rx) = oneshot::channel(); - let msg = Message { req, tx }; - - // We use try_send on a bounded channel to protect the daemons from DoS. - // Pipelining requests in IMAP are a special case: they should not occure often - // and in a limited number (like 3 requests). Someone filling the channel - // will probably be malicious so we "rate limit" them. - match self.tx.try_send(msg) { - Ok(()) => (), - Err(TrySendError::Full(_)) => { - return async { Response::bad("Too fast! Send less pipelined requests.") }.boxed() - } - Err(TrySendError::Closed(_)) => { - return async { Err(BalError::Text("Terminated session".to_string())) }.boxed() - } - }; - - // @FIXME add a timeout, handle a session that fails. - async { - match rx.await { - Ok(r) => r, - Err(e) => { - tracing::warn!("Got error {:#?}", e); - Response::bad("No response from the session handler") - } - } - } - .boxed() - } -} -*/ -//----- -/* pub struct Instance { - rx: mpsc::Receiver, - pub login_provider: ArcLoginProvider, pub state: flow::State, } impl Instance { - fn new(login_provider: ArcLoginProvider, rx: mpsc::Receiver) -> Self { + pub fn new(login_provider: ArcLoginProvider) -> Self { Self { login_provider, - rx, state: flow::State::NotAuthenticated, } } - //@FIXME add a function that compute the runner's name from its local info - // to ease debug - // fn name(&self) -> String { } - - async fn start(mut self) { - //@FIXME add more info about the runner - tracing::debug!("starting runner"); - - while let Some(msg) = self.rx.recv().await { - // Command behavior is modulated by the state. - // To prevent state error, we handle the same command in separate code paths. - let ctrl = match &mut self.state { - flow::State::NotAuthenticated => { - let ctx = anonymous::AnonymousContext { - req: &msg.req, - login_provider: Some(&self.login_provider), - }; - anonymous::dispatch(ctx).await - } - flow::State::Authenticated(ref user) => { - let ctx = authenticated::AuthenticatedContext { - req: &msg.req, - user, - }; - authenticated::dispatch(ctx).await - } - flow::State::Selected(ref user, ref mut mailbox) => { - let ctx = selected::SelectedContext { - req: &msg.req, - user, - mailbox, - }; - selected::dispatch(ctx).await - } - flow::State::Examined(ref user, ref mut mailbox) => { - let ctx = examined::ExaminedContext { - req: &msg.req, - user, - mailbox, - }; - examined::dispatch(ctx).await - } - flow::State::Logout => { - Response::bad("No commands are allowed in the LOGOUT state.") - .map(|r| (r, flow::Transition::None)) - .map_err(Error::msg) - } - }; - - // Process result - let res = match ctrl { - Ok((res, tr)) => { - //@FIXME remove unwrap - self.state = match self.state.apply(tr) { - Ok(new_state) => new_state, - Err(e) => { - tracing::error!("Invalid transition: {}, exiting", e); - break; - } - }; - - //@FIXME enrich here the command with some global status - - Ok(res) - } - // Cast from anyhow::Error to Bal::Error - // @FIXME proper error handling would be great - Err(e) => match e.downcast::() { - Ok(be) => Err(be), - Err(e) => { - tracing::warn!(error=%e, "internal.error"); - Response::bad("Internal error") - } - }, - }; - - //@FIXME I think we should quit this thread on error and having our manager watch it, - // and then abort the session as it is corrupted. - msg.tx.send(res).unwrap_or_else(|e| { - tracing::warn!("failed to send imap response to manager: {:#?}", e) - }); - - if let flow::State::Logout = &self.state { - break; + pub async fn command(&mut self, cmd: Command<'static>) -> Response<'static> { + // Command behavior is modulated by the state. + // To prevent state error, we handle the same command in separate code paths. + let (resp, tr) = match &mut self.state { + flow::State::NotAuthenticated => { + let ctx = anonymous::AnonymousContext { + req: &cmd, + login_provider: &self.login_provider, + }; + anonymous::dispatch(ctx).await } + flow::State::Authenticated(ref user) => { + let ctx = authenticated::AuthenticatedContext { req: &cmd, user }; + authenticated::dispatch(ctx).await + } + flow::State::Selected(ref user, ref mut mailbox) => { + let ctx = selected::SelectedContext { + req: &cmd, + user, + mailbox, + }; + selected::dispatch(ctx).await + } + flow::State::Examined(ref user, ref mut mailbox) => { + let ctx = examined::ExaminedContext { + req: &cmd, + user, + mailbox, + }; + examined::dispatch(ctx).await + } + flow::State::Logout => Response::build() + .tag(cmd.tag.clone()) + .message("No commands are allowed in the LOGOUT state.") + .bad() + .map(|r| (r, flow::Transition::None)), + } + .unwrap_or_else(|err| { + tracing::error!("Command error {:?} occured while processing {:?}", err, cmd); + ( + Response::build() + .to_req(&cmd) + .message("Internal error while processing command") + .bad() + .unwrap(), + flow::Transition::None, + ) + }); + + if let Err(e) = self.state.apply(tr) { + tracing::error!( + "Transition error {:?} occured while processing on command {:?}", + e, + cmd + ); + return Response::build() + .to_req(&cmd) + .message( + "Internal error, processing command triggered an illegal IMAP state transition", + ) + .bad() + .unwrap(); } - //@FIXME add more info about the runner - tracing::debug!("exiting runner"); + resp } } -*/ diff --git a/src/server.rs b/src/server.rs index 8bfde98..bd2fd5d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -25,7 +25,7 @@ impl Server { let login = Arc::new(StaticLoginProvider::new(config.users).await?); let lmtp_server = None; - let imap_server = Some(imap::new(config.imap, login.clone()).await?); + let imap_server = Some(imap::new(config.imap, login.clone())); Ok(Self { lmtp_server, imap_server, @@ -42,7 +42,7 @@ impl Server { }; let lmtp_server = Some(LmtpServer::new(config.lmtp, login.clone())); - let imap_server = Some(imap::new(config.imap, login.clone()).await?); + let imap_server = Some(imap::new(config.imap, login.clone())); Ok(Self { lmtp_server,