From 90b143e1c57c6561998176878b2cc586b2d89c80 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 29 Jun 2022 12:50:44 +0200 Subject: [PATCH] Refactor to allow mutability --- Cargo.lock | 6 +- src/imap/command/anonymous.rs | 135 +++++++++++++++++------------- src/imap/command/authenticated.rs | 39 ++++----- src/imap/command/selected.rs | 39 ++++----- src/imap/flow.rs | 4 +- src/imap/mod.rs | 6 ++ src/imap/session.rs | 41 +++++---- src/lmtp.rs | 15 ++-- src/mail/uidindex.rs | 3 +- 9 files changed, 163 insertions(+), 125 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 44c9cb1..b433689 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2106,7 +2106,7 @@ dependencies = [ [[package]] name = "smtp-message" version = "0.1.0" -source = "git+http://github.com/Alexis211/kannader?branch=feature/lmtp#245cd13212db727d4085768b813a0ee09a137bc3" +source = "git+http://github.com/Alexis211/kannader?branch=feature/lmtp#0a5ceb0f9a99d76d72bf105ee4df1f11629d812a" dependencies = [ "auto_enums", "futures", @@ -2121,7 +2121,7 @@ dependencies = [ [[package]] name = "smtp-server" version = "0.1.0" -source = "git+http://github.com/Alexis211/kannader?branch=feature/lmtp#245cd13212db727d4085768b813a0ee09a137bc3" +source = "git+http://github.com/Alexis211/kannader?branch=feature/lmtp#0a5ceb0f9a99d76d72bf105ee4df1f11629d812a" dependencies = [ "async-trait", "chrono", @@ -2135,7 +2135,7 @@ dependencies = [ [[package]] name = "smtp-server-types" version = "0.1.0" -source = "git+http://github.com/Alexis211/kannader?branch=feature/lmtp#245cd13212db727d4085768b813a0ee09a137bc3" +source = "git+http://github.com/Alexis211/kannader?branch=feature/lmtp#0a5ceb0f9a99d76d72bf105ee4df1f11629d812a" dependencies = [ "serde", "smtp-message", diff --git a/src/imap/command/anonymous.rs b/src/imap/command/anonymous.rs index 2ab3f97..f5707ef 100644 --- a/src/imap/command/anonymous.rs +++ b/src/imap/command/anonymous.rs @@ -1,76 +1,97 @@ use anyhow::{Error, Result}; -use boitalettres::proto::{res::body::Data as Body, Response}; +use boitalettres::proto::{res::body::Data as Body, Request, Response}; use imap_codec::types::command::CommandBody; use imap_codec::types::core::{AString, Atom}; use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status}; use crate::imap::flow; -use crate::imap::session::InnerContext; +use crate::login::ArcLoginProvider; //--- dispatching -pub async fn dispatch<'a>(ctx: InnerContext<'a>) -> Result<(Response, flow::Transition)> { +pub struct AnonymousContext<'a> { + pub req: &'a Request, + pub login_provider: Option<&'a ArcLoginProvider>, +} + +pub async fn dispatch<'a>(ctx: AnonymousContext<'a>) -> Result<(Response, flow::Transition)> { match &ctx.req.command.body { - CommandBody::Noop => Ok((Response::ok("Noop completed.")?, flow::Transition::No)), - CommandBody::Capability => capability(ctx).await, - CommandBody::Logout => logout(ctx).await, - CommandBody::Login { username, password } => login(ctx, username, password).await, + CommandBody::Noop => Ok((Response::ok("Noop completed.")?, flow::Transition::None)), + CommandBody::Capability => ctx.capability().await, + CommandBody::Logout => ctx.logout().await, + CommandBody::Login { username, password } => ctx.login(username, password).await, _ => Ok(( Response::no("This command is not available in the ANONYMOUS state.")?, - flow::Transition::No, + flow::Transition::None, )), } } //--- Command controllers, private -async fn capability<'a>(ctx: InnerContext<'a>) -> Result<(Response, flow::Transition)> { - let capabilities = vec![Capability::Imap4Rev1, Capability::Idle]; - let res = Response::ok("Server capabilities")?.with_body(Data::Capability(capabilities)); - Ok((res, flow::Transition::No)) -} - -async fn login<'a>( - ctx: InnerContext<'a>, - username: &AString, - password: &AString, -) -> Result<(Response, flow::Transition)> { - let (u, p) = ( - String::try_from(username.clone())?, - String::try_from(password.clone())?, - ); - tracing::info!(user = %u, "command.login"); - - let creds = match ctx.login.login(&u, &p).await { - Err(e) => { - tracing::debug!(error=%e, "authentication failed"); - return Ok((Response::no("Authentication failed")?, flow::Transition::No)); - } - Ok(c) => c, - }; - - let user = flow::User { - creds, - name: u.clone(), - }; - - tracing::info!(username=%u, "connected"); - Ok(( - Response::ok("Completed")?, - flow::Transition::Authenticate(user), - )) -} -// C: 10 logout -// S: * BYE Logging out -// S: 10 OK Logout completed. -async fn logout<'a>(ctx: InnerContext<'a>) -> Result<(Response, flow::Transition)> { - // @FIXME we should implement From> and From> in - // boitalettres/src/proto/res/body.rs - Ok(( - Response::ok("Logout completed")?.with_body(vec![Body::Status( - Status::bye(None, "Logging out") - .map_err(|e| Error::msg(e).context("Unable to generate IMAP status"))?, - )]), - flow::Transition::Logout, - )) +impl<'a> AnonymousContext<'a> { + async fn capability(self) -> Result<(Response, flow::Transition)> { + let capabilities = vec![Capability::Imap4Rev1, Capability::Idle]; + let res = Response::ok("Server capabilities")?.with_body(Data::Capability(capabilities)); + Ok((res, flow::Transition::None)) + } + + async fn login( + self, + username: &AString, + password: &AString, + ) -> Result<(Response, flow::Transition)> { + let (u, p) = ( + String::try_from(username.clone())?, + String::try_from(password.clone())?, + ); + tracing::info!(user = %u, "command.login"); + + let login_provider = match &self.login_provider { + Some(lp) => lp, + None => { + return Ok(( + Response::no("Login command not available (already logged in)")?, + flow::Transition::None, + )) + } + }; + + let creds = match login_provider.login(&u, &p).await { + Err(e) => { + tracing::debug!(error=%e, "authentication failed"); + return Ok(( + Response::no("Authentication failed")?, + flow::Transition::None, + )); + } + Ok(c) => c, + }; + + let user = flow::User { + creds, + name: u.clone(), + }; + + tracing::info!(username=%u, "connected"); + Ok(( + Response::ok("Completed")?, + flow::Transition::Authenticate(user), + )) + } + + // C: 10 logout + // S: * BYE Logging out + // S: 10 OK Logout completed. + async fn logout(self) -> Result<(Response, flow::Transition)> { + // @FIXME we should implement From> and From> in + // boitalettres/src/proto/res/body.rs + Ok(( + Response::ok("Logout completed")?.with_body(vec![Body::Status( + Status::bye(None, "Logging out") + .map_err(|e| Error::msg(e).context("Unable to generate IMAP status"))?, + )]), + flow::Transition::Logout, + )) + } } diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index f22fcc4..c7e5642 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Error, Result}; -use boitalettres::proto::{res::body::Data as Body, Response}; +use boitalettres::proto::{res::body::Data as Body, Request, Response}; use imap_codec::types::command::CommandBody; use imap_codec::types::core::Atom; use imap_codec::types::flag::Flag; @@ -19,13 +19,13 @@ const DEFAULT_FLAGS: [Flag; 5] = [ Flag::Draft, ]; -pub async fn dispatch<'a>( - inner: InnerContext<'a>, - user: &'a flow::User, -) -> Result<(Response, flow::Transition)> { - let ctx = StateContext { user, inner }; +pub struct AuthenticatedContext<'a> { + pub req: &'a Request, + pub user: &'a flow::User, +} - match &ctx.inner.req.command.body { +pub async fn dispatch<'a>(ctx: AuthenticatedContext<'a>) -> Result<(Response, flow::Transition)> { + match &ctx.req.command.body { CommandBody::Lsub { reference, mailbox_wildcard, @@ -35,32 +35,33 @@ pub async fn dispatch<'a>( mailbox_wildcard, } => ctx.list(reference, mailbox_wildcard).await, CommandBody::Select { mailbox } => ctx.select(mailbox).await, - _ => anonymous::dispatch(ctx.inner).await, + _ => { + let ctx = anonymous::AnonymousContext { + req: ctx.req, + login_provider: None, + }; + anonymous::dispatch(ctx).await + } } } // --- PRIVATE --- -struct StateContext<'a> { - inner: InnerContext<'a>, - user: &'a flow::User, -} - -impl<'a> StateContext<'a> { +impl<'a> AuthenticatedContext<'a> { async fn lsub( - &self, + self, reference: &MailboxCodec, mailbox_wildcard: &ListMailbox, ) -> Result<(Response, flow::Transition)> { - Ok((Response::bad("Not implemented")?, flow::Transition::No)) + Ok((Response::bad("Not implemented")?, flow::Transition::None)) } async fn list( - &self, + self, reference: &MailboxCodec, mailbox_wildcard: &ListMailbox, ) -> Result<(Response, flow::Transition)> { - Ok((Response::bad("Not implemented")?, flow::Transition::No)) + Ok((Response::bad("Not implemented")?, flow::Transition::None)) } /* @@ -91,7 +92,7 @@ impl<'a> StateContext<'a> { * TRACE END --- */ - async fn select(&self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> { + async fn select(self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> { let name = String::try_from(mailbox.clone())?; let mut mb = Mailbox::new(&self.user.creds, name.clone())?; diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index cf0b71b..4e41561 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -1,4 +1,5 @@ use anyhow::{Error, Result}; +use boitalettres::proto::Request; use boitalettres::proto::Response; use imap_codec::types::command::CommandBody; use imap_codec::types::core::Tag; @@ -11,42 +12,38 @@ use crate::imap::flow; use crate::imap::session::InnerContext; use crate::mail::Mailbox; -pub async fn dispatch<'a>( - inner: InnerContext<'a>, - user: &'a flow::User, - mailbox: &'a Mailbox, -) -> Result<(Response, flow::Transition)> { - let ctx = StateContext { - inner, - user, - mailbox, - }; +pub struct SelectedContext<'a> { + pub req: &'a Request, + pub user: &'a flow::User, + pub mailbox: &'a mut Mailbox, +} - match &ctx.inner.req.command.body { +pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response, flow::Transition)> { + match &ctx.req.command.body { CommandBody::Fetch { sequence_set, attributes, uid, } => ctx.fetch(sequence_set, attributes, uid).await, - _ => authenticated::dispatch(ctx.inner, user).await, + _ => { + let ctx = authenticated::AuthenticatedContext { + req: ctx.req, + user: ctx.user, + }; + authenticated::dispatch(ctx).await + } } } // --- PRIVATE --- -struct StateContext<'a> { - inner: InnerContext<'a>, - user: &'a flow::User, - mailbox: &'a Mailbox, -} - -impl<'a> StateContext<'a> { +impl<'a> SelectedContext<'a> { pub async fn fetch( - &self, + self, sequence_set: &SequenceSet, attributes: &MacroOrFetchAttributes, uid: &bool, ) -> Result<(Response, flow::Transition)> { - Ok((Response::bad("Not implemented")?, flow::Transition::No)) + Ok((Response::bad("Not implemented")?, flow::Transition::None)) } } diff --git a/src/imap/flow.rs b/src/imap/flow.rs index bd4c484..7370bd1 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -28,7 +28,7 @@ pub enum State { } pub enum Transition { - No, + None, Authenticate(User), Select(Mailbox), Unselect, @@ -40,7 +40,7 @@ pub enum Transition { impl State { pub fn apply(self, tr: Transition) -> Result { match (self, tr) { - (s, Transition::No) => Ok(s), + (s, Transition::None) => Ok(s), (State::NotAuthenticated, Transition::Authenticate(u)) => Ok(State::Authenticated(u)), (State::Authenticated(u), Transition::Select(m)) => Ok(State::Selected(u, m)), (State::Selected(u, _), Transition::Unselect) => Ok(State::Authenticated(u)), diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 63f0220..b725859 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -20,6 +20,7 @@ use crate::login::ArcLoginProvider; /// Server is a thin wrapper to register our Services in BàL pub struct Server(ImapServer); + pub async fn new(config: ImapConfig, login: ArcLoginProvider) -> Result { //@FIXME add a configuration parameter let incoming = AddrIncoming::new(config.bind_addr).await?; @@ -28,6 +29,7 @@ pub async fn new(config: ImapConfig, login: ArcLoginProvider) -> Result let imap = ImapServer::new(incoming).serve(Instance::new(login.clone())); Ok(Server(imap)) } + impl Server { pub async fn run(self, mut must_exit: watch::Receiver) -> Result<()> { tracing::info!("IMAP started!"); @@ -47,11 +49,13 @@ impl Server { struct Instance { login_provider: ArcLoginProvider, } + impl Instance { pub fn new(login_provider: ArcLoginProvider) -> Self { Self { login_provider } } } + impl<'a> Service<&'a AddrStream> for Instance { type Response = Connection; type Error = anyhow::Error; @@ -75,6 +79,7 @@ impl<'a> Service<&'a AddrStream> for Instance { struct Connection { session: session::Manager, } + impl Connection { pub fn new(login_provider: ArcLoginProvider) -> Self { Self { @@ -82,6 +87,7 @@ impl Connection { } } } + impl Service for Connection { type Response = Response; type Error = BalError; diff --git a/src/imap/session.rs b/src/imap/session.rs index 30885d1..72dd9d8 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -102,23 +102,36 @@ impl Instance { tracing::debug!("starting runner"); while let Some(msg) = self.rx.recv().await { - let ctx = InnerContext { - req: &msg.req, - state: &self.state, - login: &self.login_provider, - }; - // Command behavior is modulated by the state. // To prevent state error, we handle the same command in separate code paths. - let ctrl = match &self.state { - flow::State::NotAuthenticated => anonymous::dispatch(ctx).await, - flow::State::Authenticated(user) => authenticated::dispatch(ctx, user).await, - flow::State::Selected(user, mailbox) => { - selected::dispatch(ctx, user, mailbox).await + let ctrl = match &mut self.state { + flow::State::NotAuthenticated => { + let ctx = anonymous::AnonymousContext { + req: &msg.req, + login_provider: Some(&self.login_provider), + }; + anonymous::dispatch(ctx).await + } + flow::State::Authenticated(ref user) => { + let ctx = authenticated::AuthenticatedContext { + req: &msg.req, + user, + }; + authenticated::dispatch(ctx).await + } + flow::State::Selected(ref user, ref mut mailbox) => { + let ctx = selected::SelectedContext { + req: &msg.req, + user, + mailbox, + }; + selected::dispatch(ctx).await + } + flow::State::Logout => { + Response::bad("No commands are allowed in the LOGOUT state.") + .map(|r| (r, flow::Transition::None)) + .map_err(Error::msg) } - _ => Response::bad("No commands are allowed in the LOGOUT state.") - .map(|r| (r, flow::Transition::No)) - .map_err(Error::msg), }; // Process result diff --git a/src/lmtp.rs b/src/lmtp.rs index a0dafa5..29d61af 100644 --- a/src/lmtp.rs +++ b/src/lmtp.rs @@ -42,6 +42,8 @@ impl LmtpServer { pub async fn run(self: &Arc, mut must_exit: watch::Receiver) -> Result<()> { let tcp = TcpListener::bind(self.bind_addr).await?; + info!("LMTP server listening on {:#}", self.bind_addr); + let mut connections = FuturesUnordered::new(); while !*must_exit.borrow() { @@ -155,17 +157,14 @@ impl Config for LmtpServer { } } - async fn handle_mail<'a, 'slife0, 'slife1, 'stream, R>( - &'slife0 self, - reader: &mut EscapedDataReader<'a, R>, + async fn handle_mail<'resp, R>( + &'resp self, + reader: &mut EscapedDataReader<'_, R>, meta: MailMetadata, - conn_meta: &'slife1 mut ConnectionMetadata, - ) -> Pin> + Send + 'stream>> + conn_meta: &'resp mut ConnectionMetadata, + ) -> Pin> + Send + 'resp>> where R: Send + Unpin + AsyncRead, - 'slife0: 'stream, - 'slife1: 'stream, - Self: 'stream, { let err_response_stream = |meta: MailMetadata, msg: String| { Box::pin( diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs index ad27c1b..1e78d54 100644 --- a/src/mail/uidindex.rs +++ b/src/mail/uidindex.rs @@ -10,12 +10,12 @@ pub type ImapUid = NonZeroU32; pub type ImapUidvalidity = NonZeroU32; pub type Flag = String; -#[derive(Clone)] /// A UidIndex handles the mutable part of a mailbox /// It is built by running the event log on it /// Each applied log generates a new UidIndex by cloning the previous one /// and applying the event. This is why we use immutable datastructures: /// they are cheap to clone. +#[derive(Clone)] pub struct UidIndex { // Source of trust pub table: OrdMap)>, @@ -162,6 +162,7 @@ impl BayouState for UidIndex { } // ---- FlagIndex implementation ---- + #[derive(Clone)] pub struct FlagIndex(HashMap>); pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet>;