Implement imap-flow #34

Merged
quentin merged 18 commits from refactor/imap-flow into main 2024-01-02 22:44:29 +00:00
11 changed files with 313 additions and 310 deletions
Showing only changes of commit 0d667a3030 - Show all commits

View file

@ -1,7 +1,6 @@
use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody};
use imap_codec::imap_types::core::{AString, NonEmptyVec};
use imap_codec::imap_types::response::{Capability, Data};
use imap_codec::imap_types::core::AString;
use imap_codec::imap_types::secret::Secret;
use crate::imap::command::anystate;
@ -13,16 +12,16 @@ use crate::mail::user::User;
//--- dispatching
pub struct AnonymousContext<'a> {
pub req: &'a Command<'a>,
pub req: &'a Command<'static>,
pub login_provider: &'a ArcLoginProvider,
}
pub async fn dispatch<'a>(ctx: AnonymousContext<'a>) -> Result<(Response<'a>, flow::Transition)> {
pub async fn dispatch(ctx: AnonymousContext<'_>) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any State
CommandBody::Noop => anystate::noop_nothing(ctx.req.tag.clone()),
CommandBody::Capability => anystate::capability(ctx.req.tag.clone()),
CommandBody::Logout => Ok((Response::bye()?, flow::Transition::Logout)),
CommandBody::Logout => anystate::logout(),
// Specific to anonymous context (3 commands)
CommandBody::Login { username, password } => ctx.login(username, password).await,
@ -39,22 +38,11 @@ pub async fn dispatch<'a>(ctx: AnonymousContext<'a>) -> Result<(Response<'a>, fl
//--- Command controllers, private
impl<'a> AnonymousContext<'a> {
async fn capability(self) -> Result<(Response<'a>, flow::Transition)> {
let capabilities: NonEmptyVec<Capability> =
(vec![Capability::Imap4Rev1, Capability::Idle]).try_into()?;
let res = Response::build()
.to_req(self.req)
.message("Server capabilities")
.data(Data::Capability(capabilities))
.ok()?;
Ok((res, flow::Transition::None))
}
async fn login(
self,
username: &AString<'a>,
password: &Secret<AString<'a>>,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
let (u, p) = (
std::str::from_utf8(username.as_ref())?,
std::str::from_utf8(password.declassify().as_ref())?,

View file

@ -5,7 +5,7 @@ use imap_codec::imap_types::response::{Capability, Data};
use crate::imap::flow;
use crate::imap::response::Response;
pub(crate) fn capability<'a>(tag: Tag<'a>) -> Result<(Response<'a>, flow::Transition)> {
pub(crate) fn capability(tag: Tag<'static>) -> Result<(Response<'static>, flow::Transition)> {
let capabilities: NonEmptyVec<Capability> =
(vec![Capability::Imap4Rev1, Capability::Idle]).try_into()?;
let res = Response::build()
@ -17,7 +17,7 @@ pub(crate) fn capability<'a>(tag: Tag<'a>) -> Result<(Response<'a>, flow::Transi
Ok((res, flow::Transition::None))
}
pub(crate) fn noop_nothing<'a>(tag: Tag<'a>) -> Result<(Response<'a>, flow::Transition)> {
pub(crate) fn noop_nothing(tag: Tag<'static>) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build().tag(tag).message("Noop completed.").ok()?,
flow::Transition::None,
@ -41,7 +41,7 @@ pub(crate) fn not_implemented<'a>(
))
}
pub(crate) fn wrong_state<'a>(tag: Tag<'a>) -> Result<(Response<'a>, flow::Transition)> {
pub(crate) fn wrong_state(tag: Tag<'static>) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.tag(tag)

View file

@ -21,18 +21,18 @@ use crate::mail::user::{User, MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW}
use crate::mail::IMF;
pub struct AuthenticatedContext<'a> {
pub req: &'a Command<'a>,
pub req: &'a Command<'static>,
pub user: &'a Arc<User>,
}
pub async fn dispatch<'a>(
ctx: AuthenticatedContext<'a>,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any state
CommandBody::Noop => anystate::noop_nothing(ctx.req.tag.clone()),
CommandBody::Capability => anystate::capability(ctx.req.tag.clone()),
CommandBody::Logout => Ok((Response::bye()?, flow::Transition::Logout)),
CommandBody::Logout => anystate::logout(),
// Specific to this state (11 commands)
CommandBody::Create { mailbox } => ctx.create(mailbox).await,
@ -68,7 +68,10 @@ pub async fn dispatch<'a>(
// --- PRIVATE ---
impl<'a> AuthenticatedContext<'a> {
async fn create(self, mailbox: &MailboxCodec<'a>) -> Result<(Response<'a>, flow::Transition)> {
async fn create(
self,
mailbox: &MailboxCodec<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
let name = match mailbox {
MailboxCodec::Inbox => {
return Ok((
@ -100,7 +103,10 @@ impl<'a> AuthenticatedContext<'a> {
}
}
async fn delete(self, mailbox: &MailboxCodec<'a>) -> Result<(Response<'a>, flow::Transition)> {
async fn delete(
self,
mailbox: &MailboxCodec<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
match self.user.delete_mailbox(&name).await {
@ -125,7 +131,7 @@ impl<'a> AuthenticatedContext<'a> {
self,
from: &MailboxCodec<'a>,
to: &MailboxCodec<'a>,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(from).try_into()?;
let new_name: &str = MailboxName(to).try_into()?;
@ -152,7 +158,7 @@ impl<'a> AuthenticatedContext<'a> {
reference: &MailboxCodec<'a>,
mailbox_wildcard: &ListMailbox<'a>,
is_lsub: bool,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
let mbx_hier_delim: QuotedChar = QuotedChar::unvalidated(MBX_HIER_DELIM_RAW);
let reference: &str = MailboxName(reference).try_into()?;
@ -259,9 +265,9 @@ impl<'a> AuthenticatedContext<'a> {
async fn status(
self,
mailbox: &MailboxCodec<'a>,
mailbox: &MailboxCodec<'static>,
attributes: &[StatusDataItemName],
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(name).await?;
let mb = match mb_opt {
@ -316,7 +322,7 @@ impl<'a> AuthenticatedContext<'a> {
async fn subscribe(
self,
mailbox: &MailboxCodec<'a>,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
if self.user.has_mailbox(&name).await? {
@ -341,7 +347,7 @@ impl<'a> AuthenticatedContext<'a> {
async fn unsubscribe(
self,
mailbox: &MailboxCodec<'a>,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
if self.user.has_mailbox(&name).await? {
@ -399,7 +405,10 @@ impl<'a> AuthenticatedContext<'a> {
* TRACE END ---
*/
async fn select(self, mailbox: &MailboxCodec<'a>) -> Result<(Response<'a>, flow::Transition)> {
async fn select(
self,
mailbox: &MailboxCodec<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
@ -430,7 +439,10 @@ impl<'a> AuthenticatedContext<'a> {
))
}
async fn examine(self, mailbox: &MailboxCodec<'a>) -> Result<(Response<'a>, flow::Transition)> {
async fn examine(
self,
mailbox: &MailboxCodec<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
@ -468,7 +480,7 @@ impl<'a> AuthenticatedContext<'a> {
flags: &[Flag<'a>],
date: &Option<DateTime>,
message: &Literal<'a>,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
let append_tag = self.req.tag.clone();
match self.append_internal(mailbox, flags, date, message).await {
Ok((_mb, uidvalidity, uid)) => Ok((

View file

@ -14,17 +14,17 @@ use crate::imap::response::Response;
use crate::mail::user::User;
pub struct ExaminedContext<'a> {
pub req: &'a Command<'a>,
pub req: &'a Command<'static>,
pub user: &'a Arc<User>,
pub mailbox: &'a mut MailboxView,
}
pub async fn dispatch<'a>(ctx: ExaminedContext<'a>) -> Result<(Response<'a>, flow::Transition)> {
pub async fn dispatch(ctx: ExaminedContext<'_>) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any State
// noop is specific to this state
CommandBody::Capability => anystate::capability(ctx.req.tag.clone()),
CommandBody::Logout => Ok((Response::bye()?, flow::Transition::Logout)),
CommandBody::Logout => anystate::logout(),
// Specific to the EXAMINE state (specialization of the SELECTED state)
// ~3 commands -> close, fetch, search + NOOP
@ -58,7 +58,7 @@ pub async fn dispatch<'a>(ctx: ExaminedContext<'a>) -> Result<(Response<'a>, flo
impl<'a> ExaminedContext<'a> {
/// CLOSE in examined state is not the same as in selected state
/// (in selected state it also does an EXPUNGE, here it doesn't)
async fn close(self) -> Result<(Response<'a>, flow::Transition)> {
async fn close(self) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.to_req(self.req)
@ -71,9 +71,9 @@ impl<'a> ExaminedContext<'a> {
pub async fn fetch(
self,
sequence_set: &SequenceSet,
attributes: &'a MacroOrMessageDataItemNames<'a>,
attributes: &'a MacroOrMessageDataItemNames<'static>,
uid: &bool,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
match self.mailbox.fetch(sequence_set, attributes, uid).await {
Ok(resp) => Ok((
Response::build()
@ -98,7 +98,7 @@ impl<'a> ExaminedContext<'a> {
_charset: &Option<Charset<'a>>,
_criteria: &SearchKey<'a>,
_uid: &bool,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.to_req(self.req)
@ -108,7 +108,7 @@ impl<'a> ExaminedContext<'a> {
))
}
pub async fn noop(self) -> Result<(Response<'a>, flow::Transition)> {
pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.mailbox.force_sync().await?;
let updates = self.mailbox.update().await?;

View file

@ -18,17 +18,19 @@ use crate::imap::response::Response;
use crate::mail::user::User;
pub struct SelectedContext<'a> {
pub req: &'a Command<'a>,
pub req: &'a Command<'static>,
pub user: &'a Arc<User>,
pub mailbox: &'a mut MailboxView,
}
pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response<'a>, flow::Transition)> {
pub async fn dispatch<'a>(
ctx: SelectedContext<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any State
// noop is specific to this state
CommandBody::Capability => anystate::capability(ctx.req.tag.clone()),
CommandBody::Logout => Ok((Response::bye()?, flow::Transition::Logout)),
CommandBody::Logout => anystate::logout(),
// Specific to this state (7 commands + NOOP)
CommandBody::Close => ctx.close().await,
@ -65,7 +67,7 @@ pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response<'a>, flo
// --- PRIVATE ---
impl<'a> SelectedContext<'a> {
async fn close(self) -> Result<(Response<'a>, flow::Transition)> {
async fn close(self) -> Result<(Response<'static>, flow::Transition)> {
// We expunge messages,
// but we don't send the untagged EXPUNGE responses
let tag = self.req.tag.clone();
@ -79,9 +81,9 @@ impl<'a> SelectedContext<'a> {
pub async fn fetch(
self,
sequence_set: &SequenceSet,
attributes: &'a MacroOrMessageDataItemNames<'a>,
attributes: &'a MacroOrMessageDataItemNames<'static>,
uid: &bool,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
match self.mailbox.fetch(sequence_set, attributes, uid).await {
Ok(resp) => Ok((
Response::build()
@ -106,7 +108,7 @@ impl<'a> SelectedContext<'a> {
_charset: &Option<Charset<'a>>,
_criteria: &SearchKey<'a>,
_uid: &bool,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.to_req(self.req)
@ -116,7 +118,7 @@ impl<'a> SelectedContext<'a> {
))
}
pub async fn noop(self) -> Result<(Response<'a>, flow::Transition)> {
pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.mailbox.force_sync().await?;
let updates = self.mailbox.update().await?;
@ -130,7 +132,7 @@ impl<'a> SelectedContext<'a> {
))
}
async fn expunge(self) -> Result<(Response<'a>, flow::Transition)> {
async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> {
let tag = self.req.tag.clone();
let data = self.mailbox.expunge().await?;
@ -151,7 +153,7 @@ impl<'a> SelectedContext<'a> {
response: &StoreResponse,
flags: &[Flag<'a>],
uid: &bool,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
let data = self
.mailbox
.store(sequence_set, kind, response, flags, uid)
@ -172,7 +174,7 @@ impl<'a> SelectedContext<'a> {
sequence_set: &SequenceSet,
mailbox: &MailboxCodec<'a>,
uid: &bool,
) -> Result<(Response<'a>, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;

View file

@ -37,23 +37,27 @@ pub enum Transition {
// See RFC3501 section 3.
// https://datatracker.ietf.org/doc/html/rfc3501#page-13
impl State {
pub fn apply(self, tr: Transition) -> Result<Self, Error> {
match (self, tr) {
(s, Transition::None) => Ok(s),
(State::NotAuthenticated, Transition::Authenticate(u)) => Ok(State::Authenticated(u)),
pub fn apply(&mut self, tr: Transition) -> Result<(), Error> {
let new_state = match (&self, tr) {
(_s, Transition::None) => return Ok(()),
(State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u),
(
State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
Transition::Select(m),
) => Ok(State::Selected(u, m)),
) => State::Selected(u.clone(), m),
(
State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
Transition::Examine(m),
) => Ok(State::Examined(u, m)),
) => State::Examined(u.clone(), m),
(State::Selected(u, _) | State::Examined(u, _), Transition::Unselect) => {
Ok(State::Authenticated(u))
State::Authenticated(u.clone())
}
(_, Transition::Logout) => Ok(State::Logout),
_ => Err(Error::ForbiddenTransition),
}
(_, Transition::Logout) => State::Logout,
_ => return Err(Error::ForbiddenTransition),
};
*self = new_state;
Ok(())
}
}

View file

@ -75,14 +75,26 @@ impl<'a> FetchedMail<'a> {
}
}
pub struct AttributesProxy<'a> {
attrs: Vec<MessageDataItemName<'a>>,
pub struct AttributesProxy {
attrs: Vec<MessageDataItemName<'static>>,
}
impl<'a> AttributesProxy<'a> {
fn new(attrs: &'a MacroOrMessageDataItemNames<'a>, is_uid_fetch: bool) -> Self {
impl AttributesProxy {
fn new(attrs: &MacroOrMessageDataItemNames<'static>, is_uid_fetch: bool) -> Self {
// Expand macros
let mut fetch_attrs = match attrs {
MacroOrMessageDataItemNames::Macro(m) => m.expand(),
MacroOrMessageDataItemNames::Macro(m) => {
use imap_codec::imap_types::fetch::Macro;
use MessageDataItemName::*;
match m {
Macro::All => vec![Flags, InternalDate, Rfc822Size, Envelope],
Macro::Fast => vec![Flags, InternalDate, Rfc822Size],
Macro::Full => vec![Flags, InternalDate, Rfc822Size, Envelope, Body],
_ => {
tracing::error!("unimplemented macro");
vec![]
}
}
}
MacroOrMessageDataItemNames::MessageDataItemNames(a) => a.clone(),
};
@ -248,7 +260,7 @@ impl<'a> MailView<'a> {
Ok(MessageDataItem::InternalDate(DateTime::unvalidated(dt)))
}
fn filter<'b>(&self, ap: &AttributesProxy<'b>) -> Result<(Body<'b>, SeenFlag)> {
fn filter<'b>(&self, ap: &AttributesProxy) -> Result<(Body<'static>, SeenFlag)> {
let mut seen = SeenFlag::DoNothing;
let res_attrs = ap
.attrs
@ -593,9 +605,9 @@ impl MailboxView {
pub async fn fetch<'b>(
&self,
sequence_set: &SequenceSet,
attributes: &'b MacroOrMessageDataItemNames<'b>,
attributes: &'b MacroOrMessageDataItemNames<'static>,
is_uid_fetch: &bool,
) -> Result<Vec<Body<'b>>> {
) -> Result<Vec<Body<'static>>> {
let ap = AttributesProxy::new(attributes, *is_uid_fetch);
// Prepare data

View file

@ -4,104 +4,183 @@ mod mailbox_view;
mod response;
mod session;
use std::task::{Context, Poll};
use std::net::SocketAddr;
use anyhow::Result;
//use boitalettres::errors::Error as BalError;
//use boitalettres::proto::{Request, Response};
//use boitalettres::server::accept::addr::AddrIncoming;
//use boitalettres::server::accept::addr::AddrStream;
//use boitalettres::server::Server as ImapServer;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener;
use tokio::sync::watch;
use imap_codec::imap_types::response::Greeting;
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
use imap_flow::stream::AnyStream;
use crate::config::ImapConfig;
use crate::login::ArcLoginProvider;
/// Server is a thin wrapper to register our Services in BàL
pub struct Server {}
pub async fn new(config: ImapConfig, login: ArcLoginProvider) -> Result<Server> {
unimplemented!();
/* let incoming = AddrIncoming::new(config.bind_addr).await?;
tracing::info!("IMAP activated, will listen on {:#}", incoming.local_addr);
let imap = ImapServer::new(incoming).serve(Instance::new(login.clone()));
Ok(Server(imap))*/
}
impl Server {
pub async fn run(self, mut must_exit: watch::Receiver<bool>) -> Result<()> {
tracing::info!("IMAP started!");
unimplemented!();
/*tokio::select! {
s = self.0 => s?,
_ = must_exit.changed() => tracing::info!("Stopped IMAP server"),
}
Ok(())*/
}
}
//---
/*
/// Instance is the main Tokio Tower service that we register in BàL.
/// It receives new connection demands and spawn a dedicated service.
struct Instance {
pub struct Server {
bind_addr: SocketAddr,
login_provider: ArcLoginProvider,
}
impl Instance {
pub fn new(login_provider: ArcLoginProvider) -> Self {
Self { login_provider }
struct ClientContext {
stream: AnyStream,
addr: SocketAddr,
login_provider: ArcLoginProvider,
must_exit: watch::Receiver<bool>,
}
pub fn new(config: ImapConfig, login: ArcLoginProvider) -> Server {
Server {
bind_addr: config.bind_addr,
login_provider: login,
}
}
impl<'a> Service<&'a AddrStream> for Instance {
type Response = Connection;
type Error = anyhow::Error;
type Future = BoxFuture<'static, Result<Self::Response>>;
impl Server {
pub async fn run(self: Self, mut must_exit: watch::Receiver<bool>) -> Result<()> {
let tcp = TcpListener::bind(self.bind_addr).await?;
tracing::info!("IMAP server listening on {:#}", self.bind_addr);
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
let mut connections = FuturesUnordered::new();
fn call(&mut self, addr: &'a AddrStream) -> Self::Future {
tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept");
let lp = self.login_provider.clone();
async { Ok(Connection::new(lp)) }.boxed()
while !*must_exit.borrow() {
let wait_conn_finished = async {
if connections.is_empty() {
futures::future::pending().await
} else {
connections.next().await
}
};
let (socket, remote_addr) = tokio::select! {
a = tcp.accept() => a?,
_ = wait_conn_finished => continue,
_ = must_exit.changed() => continue,
};
tracing::info!("IMAP: accepted connection from {}", remote_addr);
let client = ClientContext {
stream: AnyStream::new(socket),
addr: remote_addr.clone(),
login_provider: self.login_provider.clone(),
must_exit: must_exit.clone(),
};
let conn = tokio::spawn(client_wrapper(client));
connections.push(conn);
}
drop(tcp);
tracing::info!("IMAP server shutting down, draining remaining connections...");
while connections.next().await.is_some() {}
Ok(())
}
}
//---
/// Connection is the per-connection Tokio Tower service we register in BàL.
/// It handles a single TCP connection, and thus has a business logic.
struct Connection {
session: session::Manager,
}
impl Connection {
pub fn new(login_provider: ArcLoginProvider) -> Self {
Self {
session: session::Manager::new(login_provider),
async fn client_wrapper(ctx: ClientContext) {
let addr = ctx.addr.clone();
match client(ctx).await {
Ok(()) => {
tracing::info!("closing successful session for {:?}", addr);
}
Err(e) => {
tracing::error!("closing errored session for {:?}: {}", addr, e);
}
}
}
impl Service<Request> for Connection {
type Response = Response;
type Error = BalError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
async fn client(mut ctx: ClientContext) -> Result<()> {
// Send greeting
let (mut server, _) = ServerFlow::send_greeting(
ctx.stream,
ServerFlowOptions::default(),
Greeting::ok(None, "Aerogramme").unwrap(),
)
.await?;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
use crate::imap::response::{Body, Response as MyResponse};
use crate::imap::session::Instance;
use imap_codec::imap_types::command::Command;
use imap_codec::imap_types::response::{Response, Status};
use tokio::sync::mpsc;
let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command<'static>>(10);
let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<MyResponse<'static>>();
let bckgrnd = tokio::spawn(async move {
let mut session = Instance::new(ctx.login_provider);
loop {
let cmd = match cmd_rx.recv().await {
None => break,
Some(cmd_recv) => cmd_recv,
};
let maybe_response = session.command(cmd).await;
match resp_tx.send(maybe_response) {
Err(_) => break,
Ok(_) => (),
};
}
tracing::info!("runner is quitting");
});
// Main loop
loop {
tokio::select! {
// Managing imap_flow stuff
srv_evt = server.progress() => match srv_evt? {
ServerFlowEvent::ResponseSent { handle: _handle, response } => {
match response {
Response::Status(Status::Bye(_)) => break,
_ => tracing::trace!("sent to {} content {:?}", ctx.addr, response),
}
},
ServerFlowEvent::CommandReceived { command } => {
match cmd_tx.try_send(command) {
Ok(_) => (),
Err(mpsc::error::TrySendError::Full(_)) => {
server.enqueue_status(Status::bye(None, "Too fast").unwrap());
tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr);
}
_ => {
server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", ctx.addr);
}
}
},
},
// Managing response generated by Aerogramme
maybe_msg = resp_rx.recv() => {
let response = match maybe_msg {
None => {
server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", ctx.addr);
continue
},
Some(r) => r,
};
for body_elem in response.body.into_iter() {
let _handle = match body_elem {
Body::Data(d) => server.enqueue_data(d),
Body::Status(s) => server.enqueue_status(s),
};
}
server.enqueue_status(response.completion);
},
// When receiving a CTRL+C
_ = ctx.must_exit.changed() => {
server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
},
};
}
fn call(&mut self, req: Request) -> Self::Future {
tracing::debug!("Got request: {:#?}", req.command);
self.session.process(req)
}
drop(cmd_tx);
bckgrnd.await?;
Ok(())
}
*/

View file

@ -47,11 +47,13 @@ impl<'a> ResponseBuilder<'a> {
self
}
#[allow(dead_code)]
pub fn info(mut self, status: Status<'a>) -> Self {
self.body.push(Body::Status(status));
self
}
#[allow(dead_code)]
pub fn many_info(mut self, status: Vec<Status<'a>>) -> Self {
for d in status.into_iter() {
self = self.info(d);
@ -87,8 +89,8 @@ impl<'a> ResponseBuilder<'a> {
}
pub struct Response<'a> {
body: Vec<Body<'a>>,
completion: Status<'a>,
pub body: Vec<Body<'a>>,
pub completion: Status<'a>,
}
impl<'a> Response<'a> {

View file

@ -1,182 +1,86 @@
use anyhow::Error;
//use boitalettres::errors::Error as BalError;
//use boitalettres::proto::{Request, Response};
use futures::future::BoxFuture;
use futures::future::FutureExt;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, oneshot};
use crate::imap::command::{anonymous, authenticated, examined, selected};
use crate::imap::flow;
use crate::imap::response::Response;
use crate::login::ArcLoginProvider;
/*
/* This constant configures backpressure in the system,
* or more specifically, how many pipelined messages are allowed
* before refusing them
*/
const MAX_PIPELINED_COMMANDS: usize = 10;
struct Message {
req: Request,
tx: oneshot::Sender<Result<Response, BalError>>,
}
use imap_codec::imap_types::command::Command;
//-----
pub struct Manager {
tx: mpsc::Sender<Message>,
}
impl Manager {
pub fn new(login_provider: ArcLoginProvider) -> Self {
let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
tokio::spawn(async move {
let instance = Instance::new(login_provider, rx);
instance.start().await;
});
Self { tx }
}
pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> {
let (tx, rx) = oneshot::channel();
let msg = Message { req, tx };
// We use try_send on a bounded channel to protect the daemons from DoS.
// Pipelining requests in IMAP are a special case: they should not occure often
// and in a limited number (like 3 requests). Someone filling the channel
// will probably be malicious so we "rate limit" them.
match self.tx.try_send(msg) {
Ok(()) => (),
Err(TrySendError::Full(_)) => {
return async { Response::bad("Too fast! Send less pipelined requests.") }.boxed()
}
Err(TrySendError::Closed(_)) => {
return async { Err(BalError::Text("Terminated session".to_string())) }.boxed()
}
};
// @FIXME add a timeout, handle a session that fails.
async {
match rx.await {
Ok(r) => r,
Err(e) => {
tracing::warn!("Got error {:#?}", e);
Response::bad("No response from the session handler")
}
}
}
.boxed()
}
}
*/
//-----
/*
pub struct Instance {
rx: mpsc::Receiver<Message>,
pub login_provider: ArcLoginProvider,
pub state: flow::State,
}
impl Instance {
fn new(login_provider: ArcLoginProvider, rx: mpsc::Receiver<Message>) -> Self {
pub fn new(login_provider: ArcLoginProvider) -> Self {
Self {
login_provider,
rx,
state: flow::State::NotAuthenticated,
}
}
//@FIXME add a function that compute the runner's name from its local info
// to ease debug
// fn name(&self) -> String { }
async fn start(mut self) {
//@FIXME add more info about the runner
tracing::debug!("starting runner");
while let Some(msg) = self.rx.recv().await {
// Command behavior is modulated by the state.
// To prevent state error, we handle the same command in separate code paths.
let ctrl = match &mut self.state {
flow::State::NotAuthenticated => {
let ctx = anonymous::AnonymousContext {
req: &msg.req,
login_provider: Some(&self.login_provider),
};
anonymous::dispatch(ctx).await
}
flow::State::Authenticated(ref user) => {
let ctx = authenticated::AuthenticatedContext {
req: &msg.req,
user,
};
authenticated::dispatch(ctx).await
}
flow::State::Selected(ref user, ref mut mailbox) => {
let ctx = selected::SelectedContext {
req: &msg.req,
user,
mailbox,
};
selected::dispatch(ctx).await
}
flow::State::Examined(ref user, ref mut mailbox) => {
let ctx = examined::ExaminedContext {
req: &msg.req,
user,
mailbox,
};
examined::dispatch(ctx).await
}
flow::State::Logout => {
Response::bad("No commands are allowed in the LOGOUT state.")
.map(|r| (r, flow::Transition::None))
.map_err(Error::msg)
}
};
// Process result
let res = match ctrl {
Ok((res, tr)) => {
//@FIXME remove unwrap
self.state = match self.state.apply(tr) {
Ok(new_state) => new_state,
Err(e) => {
tracing::error!("Invalid transition: {}, exiting", e);
break;
}
};
//@FIXME enrich here the command with some global status
Ok(res)
}
// Cast from anyhow::Error to Bal::Error
// @FIXME proper error handling would be great
Err(e) => match e.downcast::<BalError>() {
Ok(be) => Err(be),
Err(e) => {
tracing::warn!(error=%e, "internal.error");
Response::bad("Internal error")
}
},
};
//@FIXME I think we should quit this thread on error and having our manager watch it,
// and then abort the session as it is corrupted.
msg.tx.send(res).unwrap_or_else(|e| {
tracing::warn!("failed to send imap response to manager: {:#?}", e)
});
if let flow::State::Logout = &self.state {
break;
pub async fn command(&mut self, cmd: Command<'static>) -> Response<'static> {
// Command behavior is modulated by the state.
// To prevent state error, we handle the same command in separate code paths.
let (resp, tr) = match &mut self.state {
flow::State::NotAuthenticated => {
let ctx = anonymous::AnonymousContext {
req: &cmd,
login_provider: &self.login_provider,
};
anonymous::dispatch(ctx).await
}
flow::State::Authenticated(ref user) => {
let ctx = authenticated::AuthenticatedContext { req: &cmd, user };
authenticated::dispatch(ctx).await
}
flow::State::Selected(ref user, ref mut mailbox) => {
let ctx = selected::SelectedContext {
req: &cmd,
user,
mailbox,
};
selected::dispatch(ctx).await
}
flow::State::Examined(ref user, ref mut mailbox) => {
let ctx = examined::ExaminedContext {
req: &cmd,
user,
mailbox,
};
examined::dispatch(ctx).await
}
flow::State::Logout => Response::build()
.tag(cmd.tag.clone())
.message("No commands are allowed in the LOGOUT state.")
.bad()
.map(|r| (r, flow::Transition::None)),
}
.unwrap_or_else(|err| {
tracing::error!("Command error {:?} occured while processing {:?}", err, cmd);
(
Response::build()
.to_req(&cmd)
.message("Internal error while processing command")
.bad()
.unwrap(),
flow::Transition::None,
)
});
if let Err(e) = self.state.apply(tr) {
tracing::error!(
"Transition error {:?} occured while processing on command {:?}",
e,
cmd
);
return Response::build()
.to_req(&cmd)
.message(
"Internal error, processing command triggered an illegal IMAP state transition",
)
.bad()
.unwrap();
}
//@FIXME add more info about the runner
tracing::debug!("exiting runner");
resp
}
}
*/

View file

@ -25,7 +25,7 @@ impl Server {
let login = Arc::new(StaticLoginProvider::new(config.users).await?);
let lmtp_server = None;
let imap_server = Some(imap::new(config.imap, login.clone()).await?);
let imap_server = Some(imap::new(config.imap, login.clone()));
Ok(Self {
lmtp_server,
imap_server,
@ -42,7 +42,7 @@ impl Server {
};
let lmtp_server = Some(LmtpServer::new(config.lmtp, login.clone()));
let imap_server = Some(imap::new(config.imap, login.clone()).await?);
let imap_server = Some(imap::new(config.imap, login.clone()));
Ok(Self {
lmtp_server,