Some refactoring on mailbox structures and views

This commit is contained in:
Alex 2022-06-29 15:39:54 +02:00
parent 8b7eb1ca91
commit b95028f89e
Signed by: lx
GPG key ID: 0E496D15096376BE
15 changed files with 330 additions and 155 deletions

20
Cargo.lock generated
View file

@ -689,6 +689,15 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "encoding_rs"
version = "0.8.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b"
dependencies = [
"cfg-if",
]
[[package]]
name = "env_logger"
version = "0.7.1"
@ -1259,6 +1268,16 @@ dependencies = [
"value-bag",
]
[[package]]
name = "mail-parser"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c46a841ae5276aba5218ade7bb76896358f9f95a925c7b3deea6a0ec0fb8e2a7"
dependencies = [
"encoding_rs",
"serde",
]
[[package]]
name = "mailrage"
version = "0.0.1"
@ -1279,6 +1298,7 @@ dependencies = [
"lazy_static",
"ldap3",
"log",
"mail-parser",
"pretty_env_logger",
"rand",
"rmp-serde",

View file

@ -20,6 +20,7 @@ itertools = "0.10"
lazy_static = "1.4"
ldap3 = { version = "0.10", default-features = false, features = ["tls"] }
log = "0.4"
mail-parser = "0.4.8"
pretty_env_logger = "0.4"
rusoto_core = "0.48.0"
rusoto_credential = "0.48.0"

View file

@ -8,18 +8,11 @@ use imap_codec::types::response::{Code, Data, Status};
use crate::imap::command::anonymous;
use crate::imap::flow;
use crate::imap::mailbox_view::MailboxView;
use crate::mail::mailbox::Mailbox;
use crate::mail::user::User;
const DEFAULT_FLAGS: [Flag; 5] = [
Flag::Seen,
Flag::Answered,
Flag::Flagged,
Flag::Deleted,
Flag::Draft,
];
pub struct AuthenticatedContext<'a> {
pub req: &'a Request,
pub user: &'a User,
@ -96,59 +89,24 @@ impl<'a> AuthenticatedContext<'a> {
async fn select(self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> {
let name = String::try_from(mailbox.clone())?;
let mut mb = self.user.open_mailbox(name)?;
let mb_opt = self.user.open_mailbox(&name).await?;
let mb = match mb_opt {
Some(mb) => mb,
None => {
return Ok((
Response::no("Mailbox does not exist")?,
flow::Transition::None,
))
}
};
tracing::info!(username=%self.user.username, mailbox=%name, "mailbox.selected");
let sum = mb.summary().await?;
tracing::trace!(summary=%sum, "mailbox.summary");
let mut res = Vec::<Body>::new();
res.push(Body::Data(Data::Exists(sum.exists)));
res.push(Body::Data(Data::Recent(sum.recent)));
let mut flags: Vec<Flag> = sum.flags.map(|f| match f.chars().next() {
Some('\\') => None,
Some('$') if f == "$unseen" => None,
Some(_) => match Atom::try_from(f.clone()) {
Err(_) => {
tracing::error!(username=%self.user.username, mailbox=%name, flag=%f, "Unable to encode flag as IMAP atom");
None
},
Ok(a) => Some(Flag::Keyword(a)),
},
None => None,
}).flatten().collect();
flags.extend_from_slice(&DEFAULT_FLAGS);
res.push(Body::Data(Data::Flags(flags.clone())));
let uid_validity = Status::ok(None, Some(Code::UidValidity(sum.validity)), "UIDs valid")
.map_err(Error::msg)?;
res.push(Body::Status(uid_validity));
let next_uid = Status::ok(None, Some(Code::UidNext(sum.next)), "Predict next UID")
.map_err(Error::msg)?;
res.push(Body::Status(next_uid));
if let Some(unseen) = sum.unseen {
let status_unseen =
Status::ok(None, Some(Code::Unseen(unseen.clone())), "First unseen UID")
.map_err(Error::msg)?;
res.push(Body::Status(status_unseen));
}
flags.push(Flag::Permanent);
let permanent_flags =
Status::ok(None, Some(Code::PermanentFlags(flags)), "Flags permitted")
.map_err(Error::msg)?;
res.push(Body::Status(permanent_flags));
let (mb, data) = MailboxView::new(mb).await?;
Ok((
Response::ok("Select completed")?
.with_extra_code(Code::ReadWrite)
.with_body(res),
.with_body(data),
flow::Transition::Select(mb),
))
}

View file

@ -9,6 +9,7 @@ use imap_codec::types::sequence::SequenceSet;
use crate::imap::command::authenticated;
use crate::imap::flow;
use crate::imap::mailbox_view::MailboxView;
use crate::mail::mailbox::Mailbox;
use crate::mail::user::User;
@ -16,7 +17,7 @@ use crate::mail::user::User;
pub struct SelectedContext<'a> {
pub req: &'a Request,
pub user: &'a User,
pub mailbox: &'a mut Mailbox,
pub mailbox: &'a mut MailboxView,
}
pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response, flow::Transition)> {

View file

@ -1,6 +1,7 @@
use std::error::Error as StdError;
use std::fmt;
use crate::imap::mailbox_view::MailboxView;
use crate::mail::mailbox::Mailbox;
use crate::mail::user::User;
@ -18,14 +19,14 @@ impl StdError for Error {}
pub enum State {
NotAuthenticated,
Authenticated(User),
Selected(User, Mailbox),
Selected(User, MailboxView),
Logout,
}
pub enum Transition {
None,
Authenticate(User),
Select(Mailbox),
Select(MailboxView),
Unselect,
Logout,
}

154
src/imap/mailbox_view.rs Normal file
View file

@ -0,0 +1,154 @@
use std::sync::Arc;
use anyhow::{Error, Result};
use boitalettres::proto::{res::body::Data as Body, Request, Response};
use imap_codec::types::command::CommandBody;
use imap_codec::types::core::Atom;
use imap_codec::types::flag::Flag;
use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec};
use imap_codec::types::response::{Code, Data, Status};
use crate::mail::mailbox::{Mailbox, Summary};
use crate::mail::uidindex::UidIndex;
const DEFAULT_FLAGS: [Flag; 5] = [
Flag::Seen,
Flag::Answered,
Flag::Flagged,
Flag::Deleted,
Flag::Draft,
];
/// A MailboxView is responsible for giving the client the information
/// it needs about a mailbox, such as an initial summary of the mailbox's
/// content and continuous updates indicating when the content
/// of the mailbox has been changed.
/// To do this, it keeps a variable `known_state` that corresponds to
/// what the client knows, and produces IMAP messages to be sent to the
/// client that go along updates to `known_state`.
pub struct MailboxView {
mailbox: Arc<Mailbox>,
known_state: UidIndex,
}
impl MailboxView {
/// Creates a new IMAP view into a mailbox.
/// Generates the necessary IMAP messages so that the client
/// has a satisfactory summary of the current mailbox's state.
pub async fn new(mailbox: Arc<Mailbox>) -> Result<(Self, Vec<Body>)> {
let state = mailbox.current_uid_index().await;
let new_view = Self {
mailbox,
known_state: state,
};
let mut data = Vec::<Body>::new();
data.push(new_view.exists()?);
data.push(new_view.recent()?);
data.extend(new_view.flags()?.into_iter());
data.push(new_view.uidvalidity()?);
data.push(new_view.uidnext()?);
if let Some(unseen) = new_view.unseen()? {
data.push(unseen);
}
Ok((new_view, data))
}
// ----
/// Produce an OK [UIDVALIDITY _] message corresponding to `known_state`
fn uidvalidity(&self) -> Result<Body> {
let uid_validity = Status::ok(
None,
Some(Code::UidValidity(self.known_state.uidvalidity)),
"UIDs valid",
)
.map_err(Error::msg)?;
Ok(Body::Status(uid_validity))
}
/// Produce an OK [UIDNEXT _] message corresponding to `known_state`
fn uidnext(&self) -> Result<Body> {
let next_uid = Status::ok(
None,
Some(Code::UidNext(self.known_state.uidnext)),
"Predict next UID",
)
.map_err(Error::msg)?;
Ok(Body::Status(next_uid))
}
/// Produces an UNSEEN message (if relevant) corresponding to the
/// first unseen message id in `known_state`
fn unseen(&self) -> Result<Option<Body>> {
let unseen = self
.known_state
.idx_by_flag
.get(&"$unseen".to_string())
.and_then(|os| os.get_min())
.cloned();
if let Some(unseen) = unseen {
let status_unseen =
Status::ok(None, Some(Code::Unseen(unseen.clone())), "First unseen UID")
.map_err(Error::msg)?;
Ok(Some(Body::Status(status_unseen)))
} else {
Ok(None)
}
}
/// Produce an EXISTS message corresponding to the number of mails
/// in `known_state`
fn exists(&self) -> Result<Body> {
let exists = u32::try_from(self.known_state.idx_by_uid.len())?;
Ok(Body::Data(Data::Exists(exists)))
}
/// Produce a RECENT message corresponding to the number of
/// recent mails in `known_state`
fn recent(&self) -> Result<Body> {
let recent = self
.known_state
.idx_by_flag
.get(&"\\Recent".to_string())
.map(|os| os.len())
.unwrap_or(0);
let recent = u32::try_from(recent)?;
Ok(Body::Data(Data::Recent(recent)))
}
/// Produce a FLAGS and a PERMANENTFLAGS message that indicates
/// the flags that are in `known_state` + default flags
fn flags(&self) -> Result<Vec<Body>> {
let mut flags: Vec<Flag> = self
.known_state
.idx_by_flag
.flags()
.map(|f| match f.chars().next() {
Some('\\') => None,
Some('$') if f == "$unseen" => None,
Some(_) => match Atom::try_from(f.clone()) {
Err(_) => {
tracing::error!(flag=%f, "Unable to encode flag as IMAP atom");
None
}
Ok(a) => Some(Flag::Keyword(a)),
},
None => None,
})
.flatten()
.collect();
flags.extend_from_slice(&DEFAULT_FLAGS);
let mut ret = vec![Body::Data(Data::Flags(flags.clone()))];
flags.push(Flag::Permanent);
let permanent_flags =
Status::ok(None, Some(Code::PermanentFlags(flags)), "Flags permitted")
.map_err(Error::msg)?;
ret.push(Body::Status(permanent_flags));
Ok(ret)
}
}

View file

@ -1,5 +1,6 @@
mod command;
mod flow;
mod mailbox_view;
mod session;
use std::task::{Context, Poll};

View file

@ -2,7 +2,6 @@ use anyhow::Result;
use async_trait::async_trait;
use ldap3::{LdapConnAsync, Scope, SearchEntry};
use log::debug;
use rusoto_signature::Region;
use crate::config::*;
use crate::login::*;

View file

@ -13,7 +13,6 @@ use rand::prelude::*;
use rusoto_core::HttpClient;
use rusoto_credential::{AwsCredentials, StaticProvider};
use rusoto_s3::S3Client;
use rusoto_signature::Region;
use crate::cryptoblob::*;
@ -52,7 +51,7 @@ pub struct PublicCredentials {
}
/// The struct StorageCredentials contains access key to an S3 and K2V bucket
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct StorageCredentials {
pub s3_region: Region,
pub k2v_region: Region,
@ -87,6 +86,24 @@ pub struct CryptoKeys {
pub public: PublicKey,
}
/// A custom S3 region, composed of a region name and endpoint.
/// We use this instead of rusoto_signature::Region so that we can
/// derive Hash and Eq
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Region {
pub name: String,
pub endpoint: String,
}
impl Region {
pub fn as_rusoto_region(&self) -> rusoto_signature::Region {
rusoto_signature::Region::Custom {
name: self.name.clone(),
endpoint: self.endpoint.clone(),
}
}
}
// ----
impl Credentials {
@ -111,7 +128,7 @@ impl StorageCredentials {
);
Ok(K2vClient::new(
self.k2v_region.clone(),
self.k2v_region.as_rusoto_region(),
self.bucket.clone(),
aws_creds,
None,
@ -127,7 +144,7 @@ impl StorageCredentials {
Ok(S3Client::new_with(
HttpClient::new()?,
aws_creds_provider,
self.s3_region.clone(),
self.s3_region.as_rusoto_region(),
))
}
}

View file

@ -3,7 +3,6 @@ use std::sync::Arc;
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use rusoto_signature::Region;
use crate::config::*;
use crate::cryptoblob::{Key, SecretKey};

View file

@ -3,6 +3,7 @@ use std::convert::TryFrom;
use anyhow::Result;
use k2v_client::K2vClient;
use rusoto_s3::S3Client;
use tokio::sync::RwLock;
use crate::bayou::Bayou;
use crate::cryptoblob::Key;
@ -11,16 +12,16 @@ use crate::mail::mail_ident::*;
use crate::mail::uidindex::*;
use crate::mail::IMF;
pub struct Summary<'a> {
pub struct Summary {
pub validity: ImapUidvalidity,
pub next: ImapUid,
pub exists: u32,
pub recent: u32,
pub flags: FlagIter<'a>,
pub unseen: Option<&'a ImapUid>,
pub flags: Vec<String>,
pub unseen: Option<ImapUid>,
}
impl std::fmt::Display for Summary<'_> {
impl std::fmt::Display for Summary {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
@ -30,11 +31,67 @@ impl std::fmt::Display for Summary<'_> {
}
}
pub struct Mailbox(RwLock<MailboxInternal>);
impl Mailbox {
pub(super) async fn open(creds: &Credentials, name: &str) -> Result<Self> {
let index_path = format!("index/{}", name);
let mail_path = format!("mail/{}", name);
let mut uid_index = Bayou::<UidIndex>::new(creds, index_path)?;
uid_index.sync().await?;
Ok(Self(RwLock::new(MailboxInternal {
bucket: creds.bucket().to_string(),
key: creds.keys.master.clone(),
k2v: creds.k2v_client()?,
s3: creds.s3_client()?,
uid_index,
mail_path,
})))
}
/// Get a clone of the current UID Index of this mailbox
/// (cloning is cheap so don't hesitate to use this)
pub async fn current_uid_index(&self) -> UidIndex {
self.0.read().await.uid_index.state().clone()
}
/// Insert an email in the mailbox
pub async fn append<'a>(&self, _msg: IMF<'a>) -> Result<()> {
unimplemented!()
}
/// Copy an email from an other Mailbox to this mailbox
/// (use this when possible, as it allows for a certain number of storage optimizations)
pub async fn copy(&self, _from: &Mailbox, _uid: ImapUid) -> Result<()> {
unimplemented!()
}
/// Delete all emails with the \Delete flag in the mailbox
/// Can be called by CLOSE and EXPUNGE
/// @FIXME do we want to implement this feature or a simpler "delete" command
/// The controller could then "fetch \Delete" and call delete on each email?
pub async fn expunge(&self) -> Result<()> {
unimplemented!()
}
/// Update flags of a range of emails
pub async fn store(&self) -> Result<()> {
unimplemented!()
}
pub async fn fetch(&self) -> Result<()> {
unimplemented!()
}
}
// ----
// Non standard but common flags:
// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml
pub struct Mailbox {
struct MailboxInternal {
bucket: String,
pub name: String,
key: Key,
k2v: K2vClient,
@ -44,78 +101,7 @@ pub struct Mailbox {
mail_path: String,
}
impl Mailbox {
pub(super) fn new(creds: &Credentials, name: &str) -> Result<Self> {
let index_path = format!("index/{}", name);
let mail_path = format!("mail/{}", name);
let uid_index = Bayou::<UidIndex>::new(creds, index_path)?;
Ok(Self {
bucket: creds.bucket().to_string(),
name: name.to_string(), // TODO: don't use name field if possible, use mail_path instead
key: creds.keys.master.clone(),
k2v: creds.k2v_client()?,
s3: creds.s3_client()?,
uid_index,
mail_path,
})
}
/// Get a summary of the mailbox, useful for the SELECT command for example
pub async fn summary(&mut self) -> Result<Summary> {
self.uid_index.sync().await?;
let state = self.uid_index.state();
let unseen = state
.idx_by_flag
.get(&"$unseen".to_string())
.and_then(|os| os.get_min());
let recent = state
.idx_by_flag
.get(&"\\Recent".to_string())
.map(|os| os.len())
.unwrap_or(0);
return Ok(Summary {
validity: state.uidvalidity,
next: state.uidnext,
exists: u32::try_from(state.idx_by_uid.len())?,
recent: u32::try_from(recent)?,
flags: state.idx_by_flag.flags(),
unseen,
});
}
/// Insert an email in the mailbox
pub async fn append(&mut self, _msg: IMF) -> Result<()> {
unimplemented!()
}
/// Copy an email from an other Mailbox to this mailbox
/// (use this when possible, as it allows for a certain number of storage optimizations)
pub async fn copy(&mut self, _from: &Mailbox, _uid: ImapUid) -> Result<()> {
unimplemented!()
}
/// Delete all emails with the \Delete flag in the mailbox
/// Can be called by CLOSE and EXPUNGE
/// @FIXME do we want to implement this feature or a simpler "delete" command
/// The controller could then "fetch \Delete" and call delete on each email?
pub async fn expunge(&mut self) -> Result<()> {
unimplemented!()
}
/// Update flags of a range of emails
pub async fn store(&mut self) -> Result<()> {
unimplemented!()
}
pub async fn fetch(&mut self) -> Result<()> {
unimplemented!()
}
// ----
impl MailboxInternal {
pub async fn test(&mut self) -> Result<()> {
self.uid_index.sync().await?;

View file

@ -1,6 +1,6 @@
pub mod mail_ident;
pub mod mailbox;
mod uidindex;
pub mod uidindex;
pub mod user;
use std::convert::TryFrom;
@ -17,4 +17,7 @@ use crate::mail::uidindex::*;
// Internet Message Format
// aka RFC 822 - RFC 2822 - RFC 5322
pub struct IMF(Vec<u8>);
pub struct IMF<'a> {
raw: &'a [u8],
parsed: mail_parser::Message<'a>,
}

View file

@ -1,9 +1,13 @@
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use anyhow::Result;
use lazy_static::lazy_static;
use k2v_client::K2vClient;
use rusoto_s3::S3Client;
use crate::login::Credentials;
use crate::login::{Credentials, StorageCredentials};
use crate::mail::mailbox::Mailbox;
pub struct User {
@ -31,8 +35,24 @@ impl User {
}
/// Opens an existing mailbox given its IMAP name.
pub fn open_mailbox(&self, name: &str) -> Result<Option<Mailbox>> {
Mailbox::new(&self.creds, name).map(Some)
pub async fn open_mailbox(&self, name: &str) -> Result<Option<Arc<Mailbox>>> {
{
let cache = MAILBOX_CACHE.cache.lock().unwrap();
if let Some(mb) = cache.get(&self.creds.storage).and_then(Weak::upgrade) {
return Ok(Some(mb));
}
}
let mb = Arc::new(Mailbox::open(&self.creds, name).await?);
let mut cache = MAILBOX_CACHE.cache.lock().unwrap();
if let Some(concurrent_mb) = cache.get(&self.creds.storage).and_then(Weak::upgrade) {
drop(mb); // we worked for nothing but at least we didn't starve someone else
Ok(Some(concurrent_mb))
} else {
cache.insert(self.creds.storage.clone(), Arc::downgrade(&mb));
Ok(Some(mb))
}
}
/// Creates a new mailbox in the user's IMAP namespace.
@ -50,3 +70,21 @@ impl User {
unimplemented!()
}
}
// ---- Mailbox cache ----
struct MailboxCache {
cache: std::sync::Mutex<HashMap<StorageCredentials, Weak<Mailbox>>>,
}
impl MailboxCache {
fn new() -> Self {
Self {
cache: std::sync::Mutex::new(HashMap::new()),
}
}
}
lazy_static! {
static ref MAILBOX_CACHE: MailboxCache = MailboxCache::new();
}

View file

@ -14,8 +14,6 @@ use anyhow::{bail, Result};
use clap::{Parser, Subcommand};
use rand::prelude::*;
use rusoto_signature::Region;
use config::*;
use cryptoblob::*;
use login::{static_provider::*, *};
@ -264,11 +262,11 @@ async fn main() -> Result<()> {
}
fn make_storage_creds(c: StorageCredsArgs) -> StorageCredentials {
let s3_region = Region::Custom {
let s3_region = Region {
name: c.region.clone(),
endpoint: c.s3_endpoint,
};
let k2v_region = Region::Custom {
let k2v_region = Region {
name: c.region,
endpoint: c.k2v_endpoint,
};

View file

@ -3,14 +3,13 @@ use std::sync::Arc;
use anyhow::{bail, Result};
use futures::{try_join, StreamExt};
use log::*;
use rusoto_signature::Region;
use tokio::sync::watch;
use crate::config::*;
use crate::imap;
use crate::lmtp::*;
use crate::login::ArcLoginProvider;
use crate::login::{ldap_provider::*, static_provider::*};
use crate::login::{ldap_provider::*, static_provider::*, Region};
pub struct Server {
lmtp_server: Option<Arc<LmtpServer>>,
@ -62,11 +61,11 @@ impl Server {
}
fn build(config: Config) -> Result<(ArcLoginProvider, Option<LmtpConfig>, Option<ImapConfig>)> {
let s3_region = Region::Custom {
let s3_region = Region {
name: config.aws_region.clone(),
endpoint: config.s3_endpoint,
};
let k2v_region = Region::Custom {
let k2v_region = Region {
name: config.aws_region,
endpoint: config.k2v_endpoint,
};