WIP Refactor, code is broken
This commit is contained in:
parent
41f1b02171
commit
5dd5ae8bcd
13 changed files with 381 additions and 207 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -2,3 +2,5 @@
|
||||||
.vimrc
|
.vimrc
|
||||||
env.sh
|
env.sh
|
||||||
mailrage.toml
|
mailrage.toml
|
||||||
|
*.swo
|
||||||
|
*.swp
|
||||||
|
|
141
src/command.rs
141
src/command.rs
|
@ -1,141 +0,0 @@
|
||||||
use anyhow::{Error, Result};
|
|
||||||
use boitalettres::errors::Error as BalError;
|
|
||||||
use boitalettres::proto::{Request, Response};
|
|
||||||
use imap_codec::types::core::{AString, Tag};
|
|
||||||
use imap_codec::types::fetch_attributes::MacroOrFetchAttributes;
|
|
||||||
use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec};
|
|
||||||
use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status};
|
|
||||||
use imap_codec::types::sequence::SequenceSet;
|
|
||||||
|
|
||||||
use crate::mailbox::Mailbox;
|
|
||||||
use crate::session;
|
|
||||||
|
|
||||||
pub struct Command<'a> {
|
|
||||||
tag: Tag,
|
|
||||||
session: &'a mut session::Instance,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> Command<'a> {
|
|
||||||
pub fn new(tag: Tag, session: &'a mut session::Instance) -> Self {
|
|
||||||
Self { tag, session }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn capability(&self) -> Result<Response> {
|
|
||||||
let capabilities = vec![Capability::Imap4Rev1, Capability::Idle];
|
|
||||||
let res = vec![
|
|
||||||
ImapRes::Data(Data::Capability(capabilities)),
|
|
||||||
ImapRes::Status(
|
|
||||||
Status::ok(Some(self.tag.clone()), None, "Server capabilities")
|
|
||||||
.map_err(Error::msg)?,
|
|
||||||
),
|
|
||||||
];
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn login(&mut self, username: AString, password: AString) -> Result<Response> {
|
|
||||||
let (u, p) = (String::try_from(username)?, String::try_from(password)?);
|
|
||||||
tracing::info!(user = %u, "command.login");
|
|
||||||
|
|
||||||
let creds = match self.session.login_provider.login(&u, &p).await {
|
|
||||||
Err(e) => {
|
|
||||||
tracing::debug!(error=%e, "authentication failed");
|
|
||||||
return Ok(vec![ImapRes::Status(
|
|
||||||
Status::no(Some(self.tag.clone()), None, "Authentication failed")
|
|
||||||
.map_err(Error::msg)?,
|
|
||||||
)]);
|
|
||||||
}
|
|
||||||
Ok(c) => c,
|
|
||||||
};
|
|
||||||
|
|
||||||
self.session.user = Some(session::User {
|
|
||||||
creds,
|
|
||||||
name: u.clone(),
|
|
||||||
});
|
|
||||||
|
|
||||||
tracing::info!(username=%u, "connected");
|
|
||||||
Ok(vec![
|
|
||||||
//@FIXME we could send a capability status here too
|
|
||||||
ImapRes::Status(
|
|
||||||
Status::ok(Some(self.tag.clone()), None, "completed").map_err(Error::msg)?,
|
|
||||||
),
|
|
||||||
])
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn lsub(
|
|
||||||
&self,
|
|
||||||
reference: MailboxCodec,
|
|
||||||
mailbox_wildcard: ListMailbox,
|
|
||||||
) -> Result<Response> {
|
|
||||||
Ok(vec![ImapRes::Status(
|
|
||||||
Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?,
|
|
||||||
)])
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn list(
|
|
||||||
&self,
|
|
||||||
reference: MailboxCodec,
|
|
||||||
mailbox_wildcard: ListMailbox,
|
|
||||||
) -> Result<Response> {
|
|
||||||
Ok(vec![ImapRes::Status(
|
|
||||||
Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?,
|
|
||||||
)])
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* TRACE BEGIN ---
|
|
||||||
|
|
||||||
|
|
||||||
Example: C: A142 SELECT INBOX
|
|
||||||
S: * 172 EXISTS
|
|
||||||
S: * 1 RECENT
|
|
||||||
S: * OK [UNSEEN 12] Message 12 is first unseen
|
|
||||||
S: * OK [UIDVALIDITY 3857529045] UIDs valid
|
|
||||||
S: * OK [UIDNEXT 4392] Predicted next UID
|
|
||||||
S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft)
|
|
||||||
S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited
|
|
||||||
S: A142 OK [READ-WRITE] SELECT completed
|
|
||||||
|
|
||||||
* TRACE END ---
|
|
||||||
*/
|
|
||||||
pub async fn select(&mut self, mailbox: MailboxCodec) -> Result<Response> {
|
|
||||||
let name = String::try_from(mailbox)?;
|
|
||||||
let user = match self.session.user.as_ref() {
|
|
||||||
Some(u) => u,
|
|
||||||
_ => {
|
|
||||||
return Ok(vec![ImapRes::Status(
|
|
||||||
Status::no(Some(self.tag.clone()), None, "Not implemented")
|
|
||||||
.map_err(Error::msg)?,
|
|
||||||
)])
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut mb = Mailbox::new(&user.creds, name.clone())?;
|
|
||||||
tracing::info!(username=%user.name, mailbox=%name, "mailbox.selected");
|
|
||||||
|
|
||||||
let sum = mb.summary().await?;
|
|
||||||
tracing::trace!(summary=%sum, "mailbox.summary");
|
|
||||||
|
|
||||||
let body = vec![Data::Exists(sum.exists.try_into()?), Data::Recent(0)];
|
|
||||||
|
|
||||||
self.session.selected = Some(mb);
|
|
||||||
Ok(vec![ImapRes::Status(
|
|
||||||
Status::ok(
|
|
||||||
Some(self.tag.clone()),
|
|
||||||
Some(Code::ReadWrite),
|
|
||||||
"Select completed",
|
|
||||||
)
|
|
||||||
.map_err(Error::msg)?,
|
|
||||||
)])
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn fetch(
|
|
||||||
&self,
|
|
||||||
sequence_set: SequenceSet,
|
|
||||||
attributes: MacroOrFetchAttributes,
|
|
||||||
uid: bool,
|
|
||||||
) -> Result<Response> {
|
|
||||||
Ok(vec![ImapRes::Status(
|
|
||||||
Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?,
|
|
||||||
)])
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -16,6 +16,7 @@ pub struct Config {
|
||||||
pub login_ldap: Option<LoginLdapConfig>,
|
pub login_ldap: Option<LoginLdapConfig>,
|
||||||
|
|
||||||
pub lmtp: Option<LmtpConfig>,
|
pub lmtp: Option<LmtpConfig>,
|
||||||
|
pub imap: Option<ImapConfig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
@ -71,6 +72,11 @@ pub struct LmtpConfig {
|
||||||
pub hostname: String,
|
pub hostname: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub struct ImapConfig {
|
||||||
|
pub bind_addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
pub fn read_config(config_file: PathBuf) -> Result<Config> {
|
pub fn read_config(config_file: PathBuf) -> Result<Config> {
|
||||||
let mut file = std::fs::OpenOptions::new()
|
let mut file = std::fs::OpenOptions::new()
|
||||||
.read(true)
|
.read(true)
|
||||||
|
|
29
src/imap/command.rs
Normal file
29
src/imap/command.rs
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
use anyhow::{Error, Result};
|
||||||
|
use boitalettres::errors::Error as BalError;
|
||||||
|
use boitalettres::proto::{Request, Response};
|
||||||
|
use imap_codec::types::core::{AString, Tag};
|
||||||
|
use imap_codec::types::fetch_attributes::MacroOrFetchAttributes;
|
||||||
|
use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec};
|
||||||
|
use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, Status};
|
||||||
|
use imap_codec::types::sequence::SequenceSet;
|
||||||
|
|
||||||
|
use crate::mailbox::Mailbox;
|
||||||
|
use crate::session;
|
||||||
|
|
||||||
|
pub struct Command<'a> {
|
||||||
|
tag: Tag,
|
||||||
|
session: &'a mut session::Instance,
|
||||||
|
}
|
||||||
|
|
||||||
|
// @FIXME better handle errors, our conversions are bad due to my fork of BàL
|
||||||
|
// @FIXME store the IMAP state in the session as an enum.
|
||||||
|
impl<'a> Command<'a> {
|
||||||
|
pub fn new(tag: Tag, session: &'a mut session::Instance) -> Self {
|
||||||
|
Self { tag, session }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
60
src/imap/command/anonymous.rs
Normal file
60
src/imap/command/anonymous.rs
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
|
||||||
|
use boitalettres::proto::{Request, Response};
|
||||||
|
use crate::login::ArcLoginProvider;
|
||||||
|
use crate::imap::Context;
|
||||||
|
|
||||||
|
//--- dispatching
|
||||||
|
|
||||||
|
pub async fn dispatch(ctx: Context) -> Result<Response> {
|
||||||
|
match ctx.req.body {
|
||||||
|
CommandBody::Capability => anonymous::capability(ctx).await,
|
||||||
|
CommandBody::Login { username, password } => anonymous::login(ctx, username, password).await,
|
||||||
|
_ => Status::no(Some(ctx.req.tag.clone()), None, "This command is not available in the ANONYMOUS state.")
|
||||||
|
.map(|s| vec![ImapRes::Status(s)])
|
||||||
|
.map_err(Error::msg),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//--- Command controllers
|
||||||
|
|
||||||
|
pub async fn capability(ctx: Context) -> Result<Response> {
|
||||||
|
let capabilities = vec![Capability::Imap4Rev1, Capability::Idle];
|
||||||
|
let res = vec![
|
||||||
|
ImapRes::Data(Data::Capability(capabilities)),
|
||||||
|
ImapRes::Status(
|
||||||
|
Status::ok(Some(ctx.req.tag.clone()), None, "Server capabilities")
|
||||||
|
.map_err(Error::msg)?,
|
||||||
|
),
|
||||||
|
];
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn login(ctx: Context, username: AString, password: AString) -> Result<Response> {
|
||||||
|
let (u, p) = (String::try_from(username)?, String::try_from(password)?);
|
||||||
|
tracing::info!(user = %u, "command.login");
|
||||||
|
|
||||||
|
let creds = match ctx.login_provider.login(&u, &p).await {
|
||||||
|
Err(e) => {
|
||||||
|
tracing::debug!(error=%e, "authentication failed");
|
||||||
|
return Ok(vec![ImapRes::Status(
|
||||||
|
Status::no(Some(ctx.req.tag.clone()), None, "Authentication failed")
|
||||||
|
.map_err(Error::msg)?,
|
||||||
|
)]);
|
||||||
|
}
|
||||||
|
Ok(c) => c,
|
||||||
|
};
|
||||||
|
|
||||||
|
let user = flow::User {
|
||||||
|
creds,
|
||||||
|
name: u.clone(),
|
||||||
|
};
|
||||||
|
ctx.state.authenticate(user)?;
|
||||||
|
|
||||||
|
tracing::info!(username=%u, "connected");
|
||||||
|
Ok(vec![
|
||||||
|
//@FIXME we could send a capability status here too
|
||||||
|
ImapRes::Status(
|
||||||
|
Status::ok(Some(ctx.req.tag.clone()), None, "completed").map_err(Error::msg)?,
|
||||||
|
),
|
||||||
|
])
|
||||||
|
}
|
91
src/imap/command/authenticated.rs
Normal file
91
src/imap/command/authenticated.rs
Normal file
|
@ -0,0 +1,91 @@
|
||||||
|
pub async fn dispatch(ctx: Context) -> Result<Response> {
|
||||||
|
match req.body {
|
||||||
|
CommandBody::Capability => anonymous::capability().await, // we use the same implem for now
|
||||||
|
CommandBody::Lsub { reference, mailbox_wildcard, } => authenticated::lsub(reference, mailbox_wildcard).await,
|
||||||
|
CommandBody::List { reference, mailbox_wildcard, } => authenticated::list(reference, mailbox_wildcard).await,
|
||||||
|
CommandBody::Select { mailbox } => authenticated::select(user, mailbox).await.and_then(|(mailbox, response)| {
|
||||||
|
self.state.select(mailbox);
|
||||||
|
Ok(response)
|
||||||
|
}),
|
||||||
|
_ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the AUTHENTICATED state.")
|
||||||
|
.map(|s| vec![ImapRes::Status(s)])
|
||||||
|
.map_err(Error::msg),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn lsub(
|
||||||
|
&self,
|
||||||
|
reference: MailboxCodec,
|
||||||
|
mailbox_wildcard: ListMailbox,
|
||||||
|
) -> Result<Response> {
|
||||||
|
Ok(vec![ImapRes::Status(
|
||||||
|
Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?,
|
||||||
|
)])
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list(
|
||||||
|
&self,
|
||||||
|
reference: MailboxCodec,
|
||||||
|
mailbox_wildcard: ListMailbox,
|
||||||
|
) -> Result<Response> {
|
||||||
|
Ok(vec![ImapRes::Status(
|
||||||
|
Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?,
|
||||||
|
)])
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TRACE BEGIN ---
|
||||||
|
|
||||||
|
|
||||||
|
Example: C: A142 SELECT INBOX
|
||||||
|
S: * 172 EXISTS
|
||||||
|
S: * 1 RECENT
|
||||||
|
S: * OK [UNSEEN 12] Message 12 is first unseen
|
||||||
|
S: * OK [UIDVALIDITY 3857529045] UIDs valid
|
||||||
|
S: * OK [UIDNEXT 4392] Predicted next UID
|
||||||
|
S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft)
|
||||||
|
S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited
|
||||||
|
S: A142 OK [READ-WRITE] SELECT completed
|
||||||
|
|
||||||
|
* TRACE END ---
|
||||||
|
*/
|
||||||
|
pub async fn select(&mut self, mailbox: MailboxCodec) -> Result<Response> {
|
||||||
|
let name = String::try_from(mailbox)?;
|
||||||
|
let user = match self.session.user.as_ref() {
|
||||||
|
Some(u) => u,
|
||||||
|
_ => {
|
||||||
|
return Ok(vec![ImapRes::Status(
|
||||||
|
Status::no(Some(self.tag.clone()), None, "Not implemented")
|
||||||
|
.map_err(Error::msg)?,
|
||||||
|
)])
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut mb = Mailbox::new(&user.creds, name.clone())?;
|
||||||
|
tracing::info!(username=%user.name, mailbox=%name, "mailbox.selected");
|
||||||
|
|
||||||
|
let sum = mb.summary().await?;
|
||||||
|
tracing::trace!(summary=%sum, "mailbox.summary");
|
||||||
|
|
||||||
|
let body = vec![Data::Exists(sum.exists.try_into()?), Data::Recent(0)];
|
||||||
|
|
||||||
|
self.session.selected = Some(mb);
|
||||||
|
|
||||||
|
let r_unseen = Status::ok(None, Some(Code::Unseen(0)), "").map_err(Error::msg)?;
|
||||||
|
let r_permanentflags = Status::ok(None, Some(Code::
|
||||||
|
|
||||||
|
Ok(vec![
|
||||||
|
ImapRes::Data(Data::Exists(0)),
|
||||||
|
ImapRes::Data(Data::Recent(0)),
|
||||||
|
ImapRes::Data(Data::Flags(vec![]),
|
||||||
|
ImapRes::Status(),
|
||||||
|
ImapRes::Status(),
|
||||||
|
ImapRes::Status()
|
||||||
|
Status::ok(
|
||||||
|
Some(self.tag.clone()),
|
||||||
|
Some(Code::ReadWrite),
|
||||||
|
"Select completed",
|
||||||
|
)
|
||||||
|
.map_err(Error::msg)?,
|
||||||
|
)])
|
||||||
|
}
|
10
src/imap/command/selected.rs
Normal file
10
src/imap/command/selected.rs
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
pub async fn fetch(
|
||||||
|
&self,
|
||||||
|
sequence_set: SequenceSet,
|
||||||
|
attributes: MacroOrFetchAttributes,
|
||||||
|
uid: bool,
|
||||||
|
) -> Result<Response> {
|
||||||
|
Ok(vec![ImapRes::Status(
|
||||||
|
Status::bad(Some(self.tag.clone()), None, "Not implemented").map_err(Error::msg)?,
|
||||||
|
)])
|
||||||
|
}
|
50
src/imap/flow.rs
Normal file
50
src/imap/flow.rs
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
use crate::mailbox::Mailbox;
|
||||||
|
|
||||||
|
pub struct User {
|
||||||
|
pub name: String,
|
||||||
|
pub creds: Credentials,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum State {
|
||||||
|
NotAuthenticated,
|
||||||
|
Authenticated(User),
|
||||||
|
Selected(User, Mailbox),
|
||||||
|
Logout
|
||||||
|
}
|
||||||
|
pub enum Error {
|
||||||
|
ForbiddenTransition,
|
||||||
|
}
|
||||||
|
|
||||||
|
// See RFC3501 section 3.
|
||||||
|
// https://datatracker.ietf.org/doc/html/rfc3501#page-13
|
||||||
|
impl State {
|
||||||
|
pub fn authenticate(&mut self, user: User) -> Result<(), Error> {
|
||||||
|
self = match state {
|
||||||
|
State::NotAuthenticated => State::Authenticated(user),
|
||||||
|
_ => return Err(ForbiddenTransition),
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn logout(&mut self) -> Self {
|
||||||
|
self = State::Logout;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn select(&mut self, mailbox: Mailbox) -> Result<(), Error> {
|
||||||
|
self = match state {
|
||||||
|
State::Authenticated(user) => State::Selected(user, mailbox),
|
||||||
|
_ => return Err(ForbiddenTransition),
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unselect(state: State) -> Result<(), Error> {
|
||||||
|
self = match state {
|
||||||
|
State::Selected(user, _) => State::Authenticated(user),
|
||||||
|
_ => return Err(ForbiddenTransition),
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,3 +1,7 @@
|
||||||
|
mod session;
|
||||||
|
mod flow;
|
||||||
|
mod command;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
@ -5,14 +9,44 @@ use anyhow::Result;
|
||||||
use boitalettres::errors::Error as BalError;
|
use boitalettres::errors::Error as BalError;
|
||||||
use boitalettres::proto::{Request, Response};
|
use boitalettres::proto::{Request, Response};
|
||||||
use boitalettres::server::accept::addr::AddrStream;
|
use boitalettres::server::accept::addr::AddrStream;
|
||||||
|
use boitalettres::server::Server as ImapServer;
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use futures::future::FutureExt;
|
use futures::future::FutureExt;
|
||||||
use tower::Service;
|
use tower::Service;
|
||||||
|
|
||||||
use crate::session;
|
|
||||||
use crate::LoginProvider;
|
use crate::LoginProvider;
|
||||||
|
|
||||||
pub struct Instance {
|
/// Server is a thin wrapper to register our Services in BàL
|
||||||
|
pub struct Server(ImapServer<AddrIncoming, service::Instance>);
|
||||||
|
pub async fn new(
|
||||||
|
config: ImapConfig,
|
||||||
|
login: Arc<dyn LoginProvider + Send + Sync>,
|
||||||
|
) -> Result<Server> {
|
||||||
|
|
||||||
|
//@FIXME add a configuration parameter
|
||||||
|
let incoming = AddrIncoming::new(config.bind_addr).await?;
|
||||||
|
let imap = ImapServer::new(incoming).serve(service::Instance::new(login.clone()));
|
||||||
|
|
||||||
|
tracing::info!("IMAP activated, will listen on {:#}", self.imap.incoming.local_addr);
|
||||||
|
Server(imap)
|
||||||
|
}
|
||||||
|
impl Server {
|
||||||
|
pub async fn run(&self, mut must_exit: watch::Receiver<bool>) -> Result<()> {
|
||||||
|
tracing::info!("IMAP started!");
|
||||||
|
tokio::select! {
|
||||||
|
s = self => s?,
|
||||||
|
_ = must_exit.changed() => tracing::info!("Stopped IMAP server"),
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//---
|
||||||
|
|
||||||
|
/// Instance is the main Tokio Tower service that we register in BàL.
|
||||||
|
/// It receives new connection demands and spawn a dedicated service.
|
||||||
|
struct Instance {
|
||||||
login_provider: Arc<dyn LoginProvider + Send + Sync>,
|
login_provider: Arc<dyn LoginProvider + Send + Sync>,
|
||||||
}
|
}
|
||||||
impl Instance {
|
impl Instance {
|
||||||
|
@ -36,7 +70,11 @@ impl<'a> Service<&'a AddrStream> for Instance {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Connection {
|
//---
|
||||||
|
|
||||||
|
/// Connection is the per-connection Tokio Tower service we register in BàL.
|
||||||
|
/// It handles a single TCP connection, and thus has a business logic.
|
||||||
|
struct Connection {
|
||||||
session: session::Manager,
|
session: session::Manager,
|
||||||
}
|
}
|
||||||
impl Connection {
|
impl Connection {
|
|
@ -10,7 +10,7 @@ use imap_codec::types::response::{Capability, Code, Data, Response as ImapRes, S
|
||||||
use tokio::sync::mpsc::error::TrySendError;
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
use crate::command;
|
use crate::command::{anonymous,authenticated,selected};
|
||||||
use crate::login::Credentials;
|
use crate::login::Credentials;
|
||||||
use crate::mailbox::Mailbox;
|
use crate::mailbox::Mailbox;
|
||||||
use crate::LoginProvider;
|
use crate::LoginProvider;
|
||||||
|
@ -26,13 +26,14 @@ struct Message {
|
||||||
tx: oneshot::Sender<Result<Response, BalError>>,
|
tx: oneshot::Sender<Result<Response, BalError>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//-----
|
||||||
|
|
||||||
pub struct Manager {
|
pub struct Manager {
|
||||||
tx: mpsc::Sender<Message>,
|
tx: mpsc::Sender<Message>,
|
||||||
}
|
}
|
||||||
|
|
||||||
//@FIXME we should garbage collect the Instance when the Manager is destroyed.
|
|
||||||
impl Manager {
|
impl Manager {
|
||||||
pub fn new(login_provider: Arc<dyn LoginProvider + Send + Sync>) -> Self {
|
pub fn new(login_provider: ArcLoginProvider) -> Self {
|
||||||
let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
|
let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut instance = Instance::new(login_provider, rx);
|
let mut instance = Instance::new(login_provider, rx);
|
||||||
|
@ -86,28 +87,29 @@ impl Manager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct User {
|
//-----
|
||||||
pub name: String,
|
|
||||||
pub creds: Credentials,
|
pub struct Context<'a> {
|
||||||
|
req: &'a Request,
|
||||||
|
state: &'a mut flow::State,
|
||||||
|
login: ArcLoginProvider,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Instance {
|
pub struct Instance {
|
||||||
rx: mpsc::Receiver<Message>,
|
rx: mpsc::Receiver<Message>,
|
||||||
|
|
||||||
pub login_provider: Arc<dyn LoginProvider + Send + Sync>,
|
pub login_provider: ArcLoginProvider,
|
||||||
pub selected: Option<Mailbox>,
|
pub state: flow::State,
|
||||||
pub user: Option<User>,
|
|
||||||
}
|
}
|
||||||
impl Instance {
|
impl Instance {
|
||||||
fn new(
|
fn new(
|
||||||
login_provider: Arc<dyn LoginProvider + Send + Sync>,
|
login_provider: ArcLoginProvider,
|
||||||
rx: mpsc::Receiver<Message>,
|
rx: mpsc::Receiver<Message>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
login_provider,
|
login_provider,
|
||||||
rx,
|
rx,
|
||||||
selected: None,
|
state: flow::State::NotAuthenticated,
|
||||||
user: None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,42 +122,75 @@ impl Instance {
|
||||||
tracing::debug!("starting runner");
|
tracing::debug!("starting runner");
|
||||||
|
|
||||||
while let Some(msg) = self.rx.recv().await {
|
while let Some(msg) = self.rx.recv().await {
|
||||||
let mut cmd = command::Command::new(msg.req.tag.clone(), self);
|
let ctx = Context { req: &msg.req, state: &mut self.state, login: self.login_provider };
|
||||||
let res = match msg.req.body {
|
|
||||||
CommandBody::Capability => cmd.capability().await,
|
// Command behavior is modulated by the state.
|
||||||
CommandBody::Login { username, password } => cmd.login(username, password).await,
|
// To prevent state error, we handle the same command in separate code path depending
|
||||||
CommandBody::Lsub {
|
// on the State.
|
||||||
reference,
|
let cmd_res = match self.state {
|
||||||
mailbox_wildcard,
|
flow::State::NotAuthenticated => anonymous::dispatch(ctx).await,
|
||||||
} => cmd.lsub(reference, mailbox_wildcard).await,
|
flow::State::Authenticated(user) => authenticated::dispatch(ctx).await,
|
||||||
CommandBody::List {
|
flow::State::Selected(user, mailbox) => selected::dispatch(ctx).await,
|
||||||
reference,
|
flow::State::Logout => Status::bad(Some(ctx.req.tag.clone()), None, "No commands are allowed in the LOGOUT state.")
|
||||||
mailbox_wildcard,
|
|
||||||
} => cmd.list(reference, mailbox_wildcard).await,
|
|
||||||
CommandBody::Select { mailbox } => cmd.select(mailbox).await,
|
|
||||||
CommandBody::Fetch {
|
|
||||||
sequence_set,
|
|
||||||
attributes,
|
|
||||||
uid,
|
|
||||||
} => cmd.fetch(sequence_set, attributes, uid).await,
|
|
||||||
_ => Status::bad(Some(msg.req.tag.clone()), None, "Unknown command")
|
|
||||||
.map(|s| vec![ImapRes::Status(s)])
|
.map(|s| vec![ImapRes::Status(s)])
|
||||||
.map_err(Error::msg),
|
.map_err(Error::msg),
|
||||||
};
|
};
|
||||||
|
|
||||||
let wrapped_res = res.or_else(|e| match e.downcast::<BalError>() {
|
/*
|
||||||
Ok(be) => Err(be),
|
|
||||||
Err(ae) => {
|
match req.body {
|
||||||
tracing::warn!(error=%ae, "internal.error");
|
CommandBody::Capability => anonymous::capability().await,
|
||||||
Status::bad(Some(msg.req.tag.clone()), None, "Internal error")
|
CommandBody::Login { username, password } => anonymous::login(self.login_provider, username, password).await.and_then(|(user, response)| {
|
||||||
|
self.state.authenticate(user)?;
|
||||||
|
Ok(response)
|
||||||
|
},
|
||||||
|
_ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the ANONYMOUS state.")
|
||||||
.map(|s| vec![ImapRes::Status(s)])
|
.map(|s| vec![ImapRes::Status(s)])
|
||||||
.map_err(|e| BalError::Text(e.to_string()))
|
.map_err(Error::msg),
|
||||||
|
|
||||||
|
},
|
||||||
|
flow::State::Authenticated(user) => match req.body {
|
||||||
|
CommandBody::Capability => anonymous::capability().await, // we use the same implem for now
|
||||||
|
CommandBody::Lsub { reference, mailbox_wildcard, } => authenticated::lsub(reference, mailbox_wildcard).await,
|
||||||
|
CommandBody::List { reference, mailbox_wildcard, } => authenticated::list(reference, mailbox_wildcard).await,
|
||||||
|
CommandBody::Select { mailbox } => authenticated::select(user, mailbox).await.and_then(|(mailbox, response)| {
|
||||||
|
self.state.select(mailbox);
|
||||||
|
Ok(response)
|
||||||
|
}),
|
||||||
|
_ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the AUTHENTICATED state.")
|
||||||
|
.map(|s| vec![ImapRes::Status(s)])
|
||||||
|
.map_err(Error::msg),
|
||||||
|
},
|
||||||
|
flow::State::Selected(user, mailbox) => match req.body {
|
||||||
|
CommandBody::Capability => anonymous::capability().await, // we use the same implem for now
|
||||||
|
CommandBody::Fetch { sequence_set, attributes, uid, } => selected::fetch(sequence_set, attributes, uid).await,
|
||||||
|
_ => Status::no(Some(msg.req.tag.clone()), None, "This command is not available in the SELECTED state.")
|
||||||
|
.map(|s| vec![ImapRes::Status(s)])
|
||||||
|
.map_err(Error::msg),
|
||||||
|
},
|
||||||
|
flow::State::Logout => Status::bad(Some(msg.req.tag.clone()), None, "No commands are allowed in the LOGOUT state.")
|
||||||
|
.map(|s| vec![ImapRes::Status(s)])
|
||||||
|
.map_err(Error::msg),
|
||||||
}
|
}
|
||||||
});
|
*/
|
||||||
|
|
||||||
|
let imap_res = match cmd_res {
|
||||||
|
Ok(new_state, imap_res) => {
|
||||||
|
self.state = new_state;
|
||||||
|
Ok(imap_res)
|
||||||
|
},
|
||||||
|
Err(e) if Ok(be) = e.downcast::<BalError>() => Err(be),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error=%e, "internal.error");
|
||||||
|
Ok(Status::bad(Some(msg.req.tag.clone()), None, "Internal error")
|
||||||
|
.map(|s| vec![ImapRes::Status(s)])
|
||||||
|
.map_err(|e| BalError::Text(e.to_string())))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
//@FIXME I think we should quit this thread on error and having our manager watch it,
|
//@FIXME I think we should quit this thread on error and having our manager watch it,
|
||||||
// and then abort the session as it is corrupted.
|
// and then abort the session as it is corrupted.
|
||||||
msg.tx.send(wrapped_res).unwrap_or_else(|e| {
|
msg.tx.send(imap_res).unwrap_or_else(|e| {
|
||||||
tracing::warn!("failed to send imap response to manager: {:#?}", e)
|
tracing::warn!("failed to send imap response to manager: {:#?}", e)
|
||||||
});
|
});
|
||||||
}
|
}
|
|
@ -2,6 +2,7 @@ pub mod ldap_provider;
|
||||||
pub mod static_provider;
|
pub mod static_provider;
|
||||||
|
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::{anyhow, bail, Context, Result};
|
use anyhow::{anyhow, bail, Context, Result};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
@ -29,6 +30,10 @@ pub trait LoginProvider {
|
||||||
async fn public_login(&self, email: &str) -> Result<PublicCredentials>;
|
async fn public_login(&self, email: &str) -> Result<PublicCredentials>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// ArcLoginProvider is simply an alias on a structure that is used
|
||||||
|
/// in many places in the code
|
||||||
|
pub type ArcLoginProvider = Arc<dyn LoginProvider + Send + Sync>;
|
||||||
|
|
||||||
/// The struct Credentials represent all of the necessary information to interact
|
/// The struct Credentials represent all of the necessary information to interact
|
||||||
/// with a user account's data after they are logged in.
|
/// with a user account's data after they are logged in.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
|
|
@ -1,14 +1,12 @@
|
||||||
mod bayou;
|
mod bayou;
|
||||||
mod command;
|
|
||||||
mod config;
|
mod config;
|
||||||
mod cryptoblob;
|
mod cryptoblob;
|
||||||
|
mod imap;
|
||||||
mod lmtp;
|
mod lmtp;
|
||||||
mod login;
|
mod login;
|
||||||
mod mail_ident;
|
mod mail_ident;
|
||||||
mod mailbox;
|
mod mailbox;
|
||||||
mod server;
|
mod server;
|
||||||
mod service;
|
|
||||||
mod session;
|
|
||||||
mod time;
|
mod time;
|
||||||
mod uidindex;
|
mod uidindex;
|
||||||
|
|
||||||
|
|
|
@ -15,31 +15,24 @@ use crate::config::*;
|
||||||
use crate::lmtp::*;
|
use crate::lmtp::*;
|
||||||
use crate::login::{ldap_provider::*, static_provider::*, *};
|
use crate::login::{ldap_provider::*, static_provider::*, *};
|
||||||
use crate::mailbox::Mailbox;
|
use crate::mailbox::Mailbox;
|
||||||
use crate::service;
|
use crate::imap;
|
||||||
|
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
lmtp_server: Option<Arc<LmtpServer>>,
|
lmtp_server: Option<Arc<LmtpServer>>,
|
||||||
imap_server: ImapServer<AddrIncoming, service::Instance>,
|
imap_server: Option<imap::Server>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
pub async fn new(config: Config) -> Result<Self> {
|
pub async fn new(config: Config) -> Result<Self> {
|
||||||
let lmtp_config = config.lmtp.clone(); //@FIXME
|
let (login, lmtp_conf, imap_conf) = build(config)?;
|
||||||
let login = authenticator(config)?;
|
|
||||||
|
|
||||||
let lmtp = lmtp_config.map(|cfg| LmtpServer::new(cfg, login.clone()));
|
let lmtp_server = lmtp_conf.map(|cfg| LmtpServer::new(cfg, login.clone()));
|
||||||
|
let imap_server = imap_conf.map(|cfg| imap::new(cfg, login.clone()));
|
||||||
|
|
||||||
let incoming = AddrIncoming::new("127.0.0.1:4567").await?;
|
Ok(Self { lmtp_server, imap_server })
|
||||||
let imap = ImapServer::new(incoming).serve(service::Instance::new(login.clone()));
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
lmtp_server: lmtp,
|
|
||||||
imap_server: imap,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(self) -> Result<()> {
|
pub async fn run(self) -> Result<()> {
|
||||||
//tracing::info!("Starting server on {:#}", self.imap.incoming.local_addr);
|
|
||||||
tracing::info!("Starting Aerogramme...");
|
tracing::info!("Starting Aerogramme...");
|
||||||
|
|
||||||
let (exit_signal, provoke_exit) = watch_ctrl_c();
|
let (exit_signal, provoke_exit) = watch_ctrl_c();
|
||||||
|
@ -55,14 +48,11 @@ impl Server {
|
||||||
Some(s) => s.run(exit_signal.clone()).await,
|
Some(s) => s.run(exit_signal.clone()).await,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
//@FIXME handle ctrl + c
|
|
||||||
async {
|
async {
|
||||||
let mut must_exit = exit_signal.clone();
|
match self.imap_server.as_ref() {
|
||||||
tokio::select! {
|
None => Ok(()),
|
||||||
s = self.imap_server => s?,
|
Some(s) => s.run(exit_signal.clone()).await,
|
||||||
_ = must_exit.changed() => tracing::info!("IMAP server received CTRL+C, exiting."),
|
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
@ -70,7 +60,7 @@ impl Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn authenticator(config: Config) -> Result<Arc<dyn LoginProvider + Send + Sync>> {
|
fn build(config: Config) -> Result<(Arc<dyn LoginProvider + Send + Sync>, Option<LmtpConfig>, Option<ImapConfig>> {
|
||||||
let s3_region = Region::Custom {
|
let s3_region = Region::Custom {
|
||||||
name: config.aws_region.clone(),
|
name: config.aws_region.clone(),
|
||||||
endpoint: config.s3_endpoint,
|
endpoint: config.s3_endpoint,
|
||||||
|
@ -88,7 +78,8 @@ fn authenticator(config: Config) -> Result<Arc<dyn LoginProvider + Send + Sync>>
|
||||||
}
|
}
|
||||||
(None, None) => bail!("No login provider is set up in config file"),
|
(None, None) => bail!("No login provider is set up in config file"),
|
||||||
};
|
};
|
||||||
Ok(lp)
|
|
||||||
|
Ok(lp, self.lmtp_config, self.imap_config)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) {
|
pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) {
|
||||||
|
|
Loading…
Reference in a new issue