Refactor to allow mutability

This commit is contained in:
Alex 2022-06-29 12:50:44 +02:00
parent 9979671b00
commit 90b143e1c5
Signed by: lx
GPG key ID: 0E496D15096376BE
9 changed files with 163 additions and 125 deletions

6
Cargo.lock generated
View file

@ -2106,7 +2106,7 @@ dependencies = [
[[package]] [[package]]
name = "smtp-message" name = "smtp-message"
version = "0.1.0" version = "0.1.0"
source = "git+http://github.com/Alexis211/kannader?branch=feature/lmtp#245cd13212db727d4085768b813a0ee09a137bc3" source = "git+http://github.com/Alexis211/kannader?branch=feature/lmtp#0a5ceb0f9a99d76d72bf105ee4df1f11629d812a"
dependencies = [ dependencies = [
"auto_enums", "auto_enums",
"futures", "futures",
@ -2121,7 +2121,7 @@ dependencies = [
[[package]] [[package]]
name = "smtp-server" name = "smtp-server"
version = "0.1.0" version = "0.1.0"
source = "git+http://github.com/Alexis211/kannader?branch=feature/lmtp#245cd13212db727d4085768b813a0ee09a137bc3" source = "git+http://github.com/Alexis211/kannader?branch=feature/lmtp#0a5ceb0f9a99d76d72bf105ee4df1f11629d812a"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"chrono", "chrono",
@ -2135,7 +2135,7 @@ dependencies = [
[[package]] [[package]]
name = "smtp-server-types" name = "smtp-server-types"
version = "0.1.0" version = "0.1.0"
source = "git+http://github.com/Alexis211/kannader?branch=feature/lmtp#245cd13212db727d4085768b813a0ee09a137bc3" source = "git+http://github.com/Alexis211/kannader?branch=feature/lmtp#0a5ceb0f9a99d76d72bf105ee4df1f11629d812a"
dependencies = [ dependencies = [
"serde", "serde",
"smtp-message", "smtp-message",

View file

@ -1,37 +1,43 @@
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use boitalettres::proto::{res::body::Data as Body, Response}; use boitalettres::proto::{res::body::Data as Body, Request, Response};
use imap_codec::types::command::CommandBody; use imap_codec::types::command::CommandBody;
use imap_codec::types::core::{AString, Atom}; use imap_codec::types::core::{AString, Atom};
use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status}; use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status};
use crate::imap::flow; use crate::imap::flow;
use crate::imap::session::InnerContext; use crate::login::ArcLoginProvider;
//--- dispatching //--- dispatching
pub async fn dispatch<'a>(ctx: InnerContext<'a>) -> Result<(Response, flow::Transition)> { pub struct AnonymousContext<'a> {
pub req: &'a Request,
pub login_provider: Option<&'a ArcLoginProvider>,
}
pub async fn dispatch<'a>(ctx: AnonymousContext<'a>) -> Result<(Response, flow::Transition)> {
match &ctx.req.command.body { match &ctx.req.command.body {
CommandBody::Noop => Ok((Response::ok("Noop completed.")?, flow::Transition::No)), CommandBody::Noop => Ok((Response::ok("Noop completed.")?, flow::Transition::None)),
CommandBody::Capability => capability(ctx).await, CommandBody::Capability => ctx.capability().await,
CommandBody::Logout => logout(ctx).await, CommandBody::Logout => ctx.logout().await,
CommandBody::Login { username, password } => login(ctx, username, password).await, CommandBody::Login { username, password } => ctx.login(username, password).await,
_ => Ok(( _ => Ok((
Response::no("This command is not available in the ANONYMOUS state.")?, Response::no("This command is not available in the ANONYMOUS state.")?,
flow::Transition::No, flow::Transition::None,
)), )),
} }
} }
//--- Command controllers, private //--- Command controllers, private
async fn capability<'a>(ctx: InnerContext<'a>) -> Result<(Response, flow::Transition)> { impl<'a> AnonymousContext<'a> {
async fn capability(self) -> Result<(Response, flow::Transition)> {
let capabilities = vec![Capability::Imap4Rev1, Capability::Idle]; let capabilities = vec![Capability::Imap4Rev1, Capability::Idle];
let res = Response::ok("Server capabilities")?.with_body(Data::Capability(capabilities)); let res = Response::ok("Server capabilities")?.with_body(Data::Capability(capabilities));
Ok((res, flow::Transition::No)) Ok((res, flow::Transition::None))
} }
async fn login<'a>( async fn login(
ctx: InnerContext<'a>, self,
username: &AString, username: &AString,
password: &AString, password: &AString,
) -> Result<(Response, flow::Transition)> { ) -> Result<(Response, flow::Transition)> {
@ -41,10 +47,23 @@ async fn login<'a>(
); );
tracing::info!(user = %u, "command.login"); tracing::info!(user = %u, "command.login");
let creds = match ctx.login.login(&u, &p).await { let login_provider = match &self.login_provider {
Some(lp) => lp,
None => {
return Ok((
Response::no("Login command not available (already logged in)")?,
flow::Transition::None,
))
}
};
let creds = match login_provider.login(&u, &p).await {
Err(e) => { Err(e) => {
tracing::debug!(error=%e, "authentication failed"); tracing::debug!(error=%e, "authentication failed");
return Ok((Response::no("Authentication failed")?, flow::Transition::No)); return Ok((
Response::no("Authentication failed")?,
flow::Transition::None,
));
} }
Ok(c) => c, Ok(c) => c,
}; };
@ -60,10 +79,11 @@ async fn login<'a>(
flow::Transition::Authenticate(user), flow::Transition::Authenticate(user),
)) ))
} }
// C: 10 logout // C: 10 logout
// S: * BYE Logging out // S: * BYE Logging out
// S: 10 OK Logout completed. // S: 10 OK Logout completed.
async fn logout<'a>(ctx: InnerContext<'a>) -> Result<(Response, flow::Transition)> { async fn logout(self) -> Result<(Response, flow::Transition)> {
// @FIXME we should implement From<Vec<Status>> and From<Vec<ImapStatus>> in // @FIXME we should implement From<Vec<Status>> and From<Vec<ImapStatus>> in
// boitalettres/src/proto/res/body.rs // boitalettres/src/proto/res/body.rs
Ok(( Ok((
@ -74,3 +94,4 @@ async fn logout<'a>(ctx: InnerContext<'a>) -> Result<(Response, flow::Transition
flow::Transition::Logout, flow::Transition::Logout,
)) ))
} }
}

View file

@ -1,5 +1,5 @@
use anyhow::{anyhow, Error, Result}; use anyhow::{anyhow, Error, Result};
use boitalettres::proto::{res::body::Data as Body, Response}; use boitalettres::proto::{res::body::Data as Body, Request, Response};
use imap_codec::types::command::CommandBody; use imap_codec::types::command::CommandBody;
use imap_codec::types::core::Atom; use imap_codec::types::core::Atom;
use imap_codec::types::flag::Flag; use imap_codec::types::flag::Flag;
@ -19,13 +19,13 @@ const DEFAULT_FLAGS: [Flag; 5] = [
Flag::Draft, Flag::Draft,
]; ];
pub async fn dispatch<'a>( pub struct AuthenticatedContext<'a> {
inner: InnerContext<'a>, pub req: &'a Request,
user: &'a flow::User, pub user: &'a flow::User,
) -> Result<(Response, flow::Transition)> { }
let ctx = StateContext { user, inner };
match &ctx.inner.req.command.body { pub async fn dispatch<'a>(ctx: AuthenticatedContext<'a>) -> Result<(Response, flow::Transition)> {
match &ctx.req.command.body {
CommandBody::Lsub { CommandBody::Lsub {
reference, reference,
mailbox_wildcard, mailbox_wildcard,
@ -35,32 +35,33 @@ pub async fn dispatch<'a>(
mailbox_wildcard, mailbox_wildcard,
} => ctx.list(reference, mailbox_wildcard).await, } => ctx.list(reference, mailbox_wildcard).await,
CommandBody::Select { mailbox } => ctx.select(mailbox).await, CommandBody::Select { mailbox } => ctx.select(mailbox).await,
_ => anonymous::dispatch(ctx.inner).await, _ => {
let ctx = anonymous::AnonymousContext {
req: ctx.req,
login_provider: None,
};
anonymous::dispatch(ctx).await
}
} }
} }
// --- PRIVATE --- // --- PRIVATE ---
struct StateContext<'a> { impl<'a> AuthenticatedContext<'a> {
inner: InnerContext<'a>,
user: &'a flow::User,
}
impl<'a> StateContext<'a> {
async fn lsub( async fn lsub(
&self, self,
reference: &MailboxCodec, reference: &MailboxCodec,
mailbox_wildcard: &ListMailbox, mailbox_wildcard: &ListMailbox,
) -> Result<(Response, flow::Transition)> { ) -> Result<(Response, flow::Transition)> {
Ok((Response::bad("Not implemented")?, flow::Transition::No)) Ok((Response::bad("Not implemented")?, flow::Transition::None))
} }
async fn list( async fn list(
&self, self,
reference: &MailboxCodec, reference: &MailboxCodec,
mailbox_wildcard: &ListMailbox, mailbox_wildcard: &ListMailbox,
) -> Result<(Response, flow::Transition)> { ) -> Result<(Response, flow::Transition)> {
Ok((Response::bad("Not implemented")?, flow::Transition::No)) Ok((Response::bad("Not implemented")?, flow::Transition::None))
} }
/* /*
@ -91,7 +92,7 @@ impl<'a> StateContext<'a> {
* TRACE END --- * TRACE END ---
*/ */
async fn select(&self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> { async fn select(self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> {
let name = String::try_from(mailbox.clone())?; let name = String::try_from(mailbox.clone())?;
let mut mb = Mailbox::new(&self.user.creds, name.clone())?; let mut mb = Mailbox::new(&self.user.creds, name.clone())?;

View file

@ -1,4 +1,5 @@
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use boitalettres::proto::Request;
use boitalettres::proto::Response; use boitalettres::proto::Response;
use imap_codec::types::command::CommandBody; use imap_codec::types::command::CommandBody;
use imap_codec::types::core::Tag; use imap_codec::types::core::Tag;
@ -11,42 +12,38 @@ use crate::imap::flow;
use crate::imap::session::InnerContext; use crate::imap::session::InnerContext;
use crate::mail::Mailbox; use crate::mail::Mailbox;
pub async fn dispatch<'a>( pub struct SelectedContext<'a> {
inner: InnerContext<'a>, pub req: &'a Request,
user: &'a flow::User, pub user: &'a flow::User,
mailbox: &'a Mailbox, pub mailbox: &'a mut Mailbox,
) -> Result<(Response, flow::Transition)> { }
let ctx = StateContext {
inner,
user,
mailbox,
};
match &ctx.inner.req.command.body { pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response, flow::Transition)> {
match &ctx.req.command.body {
CommandBody::Fetch { CommandBody::Fetch {
sequence_set, sequence_set,
attributes, attributes,
uid, uid,
} => ctx.fetch(sequence_set, attributes, uid).await, } => ctx.fetch(sequence_set, attributes, uid).await,
_ => authenticated::dispatch(ctx.inner, user).await, _ => {
let ctx = authenticated::AuthenticatedContext {
req: ctx.req,
user: ctx.user,
};
authenticated::dispatch(ctx).await
}
} }
} }
// --- PRIVATE --- // --- PRIVATE ---
struct StateContext<'a> { impl<'a> SelectedContext<'a> {
inner: InnerContext<'a>,
user: &'a flow::User,
mailbox: &'a Mailbox,
}
impl<'a> StateContext<'a> {
pub async fn fetch( pub async fn fetch(
&self, self,
sequence_set: &SequenceSet, sequence_set: &SequenceSet,
attributes: &MacroOrFetchAttributes, attributes: &MacroOrFetchAttributes,
uid: &bool, uid: &bool,
) -> Result<(Response, flow::Transition)> { ) -> Result<(Response, flow::Transition)> {
Ok((Response::bad("Not implemented")?, flow::Transition::No)) Ok((Response::bad("Not implemented")?, flow::Transition::None))
} }
} }

View file

@ -28,7 +28,7 @@ pub enum State {
} }
pub enum Transition { pub enum Transition {
No, None,
Authenticate(User), Authenticate(User),
Select(Mailbox), Select(Mailbox),
Unselect, Unselect,
@ -40,7 +40,7 @@ pub enum Transition {
impl State { impl State {
pub fn apply(self, tr: Transition) -> Result<Self, Error> { pub fn apply(self, tr: Transition) -> Result<Self, Error> {
match (self, tr) { match (self, tr) {
(s, Transition::No) => Ok(s), (s, Transition::None) => Ok(s),
(State::NotAuthenticated, Transition::Authenticate(u)) => Ok(State::Authenticated(u)), (State::NotAuthenticated, Transition::Authenticate(u)) => Ok(State::Authenticated(u)),
(State::Authenticated(u), Transition::Select(m)) => Ok(State::Selected(u, m)), (State::Authenticated(u), Transition::Select(m)) => Ok(State::Selected(u, m)),
(State::Selected(u, _), Transition::Unselect) => Ok(State::Authenticated(u)), (State::Selected(u, _), Transition::Unselect) => Ok(State::Authenticated(u)),

View file

@ -20,6 +20,7 @@ use crate::login::ArcLoginProvider;
/// Server is a thin wrapper to register our Services in BàL /// Server is a thin wrapper to register our Services in BàL
pub struct Server(ImapServer<AddrIncoming, Instance>); pub struct Server(ImapServer<AddrIncoming, Instance>);
pub async fn new(config: ImapConfig, login: ArcLoginProvider) -> Result<Server> { pub async fn new(config: ImapConfig, login: ArcLoginProvider) -> Result<Server> {
//@FIXME add a configuration parameter //@FIXME add a configuration parameter
let incoming = AddrIncoming::new(config.bind_addr).await?; let incoming = AddrIncoming::new(config.bind_addr).await?;
@ -28,6 +29,7 @@ pub async fn new(config: ImapConfig, login: ArcLoginProvider) -> Result<Server>
let imap = ImapServer::new(incoming).serve(Instance::new(login.clone())); let imap = ImapServer::new(incoming).serve(Instance::new(login.clone()));
Ok(Server(imap)) Ok(Server(imap))
} }
impl Server { impl Server {
pub async fn run(self, mut must_exit: watch::Receiver<bool>) -> Result<()> { pub async fn run(self, mut must_exit: watch::Receiver<bool>) -> Result<()> {
tracing::info!("IMAP started!"); tracing::info!("IMAP started!");
@ -47,11 +49,13 @@ impl Server {
struct Instance { struct Instance {
login_provider: ArcLoginProvider, login_provider: ArcLoginProvider,
} }
impl Instance { impl Instance {
pub fn new(login_provider: ArcLoginProvider) -> Self { pub fn new(login_provider: ArcLoginProvider) -> Self {
Self { login_provider } Self { login_provider }
} }
} }
impl<'a> Service<&'a AddrStream> for Instance { impl<'a> Service<&'a AddrStream> for Instance {
type Response = Connection; type Response = Connection;
type Error = anyhow::Error; type Error = anyhow::Error;
@ -75,6 +79,7 @@ impl<'a> Service<&'a AddrStream> for Instance {
struct Connection { struct Connection {
session: session::Manager, session: session::Manager,
} }
impl Connection { impl Connection {
pub fn new(login_provider: ArcLoginProvider) -> Self { pub fn new(login_provider: ArcLoginProvider) -> Self {
Self { Self {
@ -82,6 +87,7 @@ impl Connection {
} }
} }
} }
impl Service<Request> for Connection { impl Service<Request> for Connection {
type Response = Response; type Response = Response;
type Error = BalError; type Error = BalError;

View file

@ -102,23 +102,36 @@ impl Instance {
tracing::debug!("starting runner"); tracing::debug!("starting runner");
while let Some(msg) = self.rx.recv().await { while let Some(msg) = self.rx.recv().await {
let ctx = InnerContext {
req: &msg.req,
state: &self.state,
login: &self.login_provider,
};
// Command behavior is modulated by the state. // Command behavior is modulated by the state.
// To prevent state error, we handle the same command in separate code paths. // To prevent state error, we handle the same command in separate code paths.
let ctrl = match &self.state { let ctrl = match &mut self.state {
flow::State::NotAuthenticated => anonymous::dispatch(ctx).await, flow::State::NotAuthenticated => {
flow::State::Authenticated(user) => authenticated::dispatch(ctx, user).await, let ctx = anonymous::AnonymousContext {
flow::State::Selected(user, mailbox) => { req: &msg.req,
selected::dispatch(ctx, user, mailbox).await login_provider: Some(&self.login_provider),
};
anonymous::dispatch(ctx).await
}
flow::State::Authenticated(ref user) => {
let ctx = authenticated::AuthenticatedContext {
req: &msg.req,
user,
};
authenticated::dispatch(ctx).await
}
flow::State::Selected(ref user, ref mut mailbox) => {
let ctx = selected::SelectedContext {
req: &msg.req,
user,
mailbox,
};
selected::dispatch(ctx).await
}
flow::State::Logout => {
Response::bad("No commands are allowed in the LOGOUT state.")
.map(|r| (r, flow::Transition::None))
.map_err(Error::msg)
} }
_ => Response::bad("No commands are allowed in the LOGOUT state.")
.map(|r| (r, flow::Transition::No))
.map_err(Error::msg),
}; };
// Process result // Process result

View file

@ -42,6 +42,8 @@ impl LmtpServer {
pub async fn run(self: &Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<()> { pub async fn run(self: &Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<()> {
let tcp = TcpListener::bind(self.bind_addr).await?; let tcp = TcpListener::bind(self.bind_addr).await?;
info!("LMTP server listening on {:#}", self.bind_addr);
let mut connections = FuturesUnordered::new(); let mut connections = FuturesUnordered::new();
while !*must_exit.borrow() { while !*must_exit.borrow() {
@ -155,17 +157,14 @@ impl Config for LmtpServer {
} }
} }
async fn handle_mail<'a, 'slife0, 'slife1, 'stream, R>( async fn handle_mail<'resp, R>(
&'slife0 self, &'resp self,
reader: &mut EscapedDataReader<'a, R>, reader: &mut EscapedDataReader<'_, R>,
meta: MailMetadata<Message>, meta: MailMetadata<Message>,
conn_meta: &'slife1 mut ConnectionMetadata<Conn>, conn_meta: &'resp mut ConnectionMetadata<Conn>,
) -> Pin<Box<dyn futures::Stream<Item = Decision<()>> + Send + 'stream>> ) -> Pin<Box<dyn futures::Stream<Item = Decision<()>> + Send + 'resp>>
where where
R: Send + Unpin + AsyncRead, R: Send + Unpin + AsyncRead,
'slife0: 'stream,
'slife1: 'stream,
Self: 'stream,
{ {
let err_response_stream = |meta: MailMetadata<Message>, msg: String| { let err_response_stream = |meta: MailMetadata<Message>, msg: String| {
Box::pin( Box::pin(

View file

@ -10,12 +10,12 @@ pub type ImapUid = NonZeroU32;
pub type ImapUidvalidity = NonZeroU32; pub type ImapUidvalidity = NonZeroU32;
pub type Flag = String; pub type Flag = String;
#[derive(Clone)]
/// A UidIndex handles the mutable part of a mailbox /// A UidIndex handles the mutable part of a mailbox
/// It is built by running the event log on it /// It is built by running the event log on it
/// Each applied log generates a new UidIndex by cloning the previous one /// Each applied log generates a new UidIndex by cloning the previous one
/// and applying the event. This is why we use immutable datastructures: /// and applying the event. This is why we use immutable datastructures:
/// they are cheap to clone. /// they are cheap to clone.
#[derive(Clone)]
pub struct UidIndex { pub struct UidIndex {
// Source of trust // Source of trust
pub table: OrdMap<MailIdent, (ImapUid, Vec<Flag>)>, pub table: OrdMap<MailIdent, (ImapUid, Vec<Flag>)>,
@ -162,6 +162,7 @@ impl BayouState for UidIndex {
} }
// ---- FlagIndex implementation ---- // ---- FlagIndex implementation ----
#[derive(Clone)] #[derive(Clone)]
pub struct FlagIndex(HashMap<Flag, OrdSet<ImapUid>>); pub struct FlagIndex(HashMap<Flag, OrdSet<ImapUid>>);
pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet<ImapUid>>; pub type FlagIter<'a> = im::hashmap::Keys<'a, Flag, OrdSet<ImapUid>>;