diff --git a/Cargo.lock b/Cargo.lock index b433689..a430b9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -689,6 +689,15 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "encoding_rs" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" +dependencies = [ + "cfg-if", +] + [[package]] name = "env_logger" version = "0.7.1" @@ -1259,6 +1268,16 @@ dependencies = [ "value-bag", ] +[[package]] +name = "mail-parser" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c46a841ae5276aba5218ade7bb76896358f9f95a925c7b3deea6a0ec0fb8e2a7" +dependencies = [ + "encoding_rs", + "serde", +] + [[package]] name = "mailrage" version = "0.0.1" @@ -1279,6 +1298,7 @@ dependencies = [ "lazy_static", "ldap3", "log", + "mail-parser", "pretty_env_logger", "rand", "rmp-serde", diff --git a/Cargo.toml b/Cargo.toml index 4393d1c..be05124 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ itertools = "0.10" lazy_static = "1.4" ldap3 = { version = "0.10", default-features = false, features = ["tls"] } log = "0.4" +mail-parser = "0.4.8" pretty_env_logger = "0.4" rusoto_core = "0.48.0" rusoto_credential = "0.48.0" diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index 47df5be..443edda 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -8,18 +8,11 @@ use imap_codec::types::response::{Code, Data, Status}; use crate::imap::command::anonymous; use crate::imap::flow; +use crate::imap::mailbox_view::MailboxView; use crate::mail::mailbox::Mailbox; use crate::mail::user::User; -const DEFAULT_FLAGS: [Flag; 5] = [ - Flag::Seen, - Flag::Answered, - Flag::Flagged, - Flag::Deleted, - Flag::Draft, -]; - pub struct AuthenticatedContext<'a> { pub req: &'a Request, pub user: &'a User, @@ -96,59 +89,24 @@ impl<'a> AuthenticatedContext<'a> { async fn select(self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> { let name = String::try_from(mailbox.clone())?; - let mut mb = self.user.open_mailbox(name)?; + let mb_opt = self.user.open_mailbox(&name).await?; + let mb = match mb_opt { + Some(mb) => mb, + None => { + return Ok(( + Response::no("Mailbox does not exist")?, + flow::Transition::None, + )) + } + }; tracing::info!(username=%self.user.username, mailbox=%name, "mailbox.selected"); - let sum = mb.summary().await?; - tracing::trace!(summary=%sum, "mailbox.summary"); - - let mut res = Vec::::new(); - - res.push(Body::Data(Data::Exists(sum.exists))); - - res.push(Body::Data(Data::Recent(sum.recent))); - - let mut flags: Vec = sum.flags.map(|f| match f.chars().next() { - Some('\\') => None, - Some('$') if f == "$unseen" => None, - Some(_) => match Atom::try_from(f.clone()) { - Err(_) => { - tracing::error!(username=%self.user.username, mailbox=%name, flag=%f, "Unable to encode flag as IMAP atom"); - None - }, - Ok(a) => Some(Flag::Keyword(a)), - }, - None => None, - }).flatten().collect(); - flags.extend_from_slice(&DEFAULT_FLAGS); - - res.push(Body::Data(Data::Flags(flags.clone()))); - - let uid_validity = Status::ok(None, Some(Code::UidValidity(sum.validity)), "UIDs valid") - .map_err(Error::msg)?; - res.push(Body::Status(uid_validity)); - - let next_uid = Status::ok(None, Some(Code::UidNext(sum.next)), "Predict next UID") - .map_err(Error::msg)?; - res.push(Body::Status(next_uid)); - - if let Some(unseen) = sum.unseen { - let status_unseen = - Status::ok(None, Some(Code::Unseen(unseen.clone())), "First unseen UID") - .map_err(Error::msg)?; - res.push(Body::Status(status_unseen)); - } - - flags.push(Flag::Permanent); - let permanent_flags = - Status::ok(None, Some(Code::PermanentFlags(flags)), "Flags permitted") - .map_err(Error::msg)?; - res.push(Body::Status(permanent_flags)); + let (mb, data) = MailboxView::new(mb).await?; Ok(( Response::ok("Select completed")? .with_extra_code(Code::ReadWrite) - .with_body(res), + .with_body(data), flow::Transition::Select(mb), )) } diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index b1bba23..4e3ff2f 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -9,6 +9,7 @@ use imap_codec::types::sequence::SequenceSet; use crate::imap::command::authenticated; use crate::imap::flow; +use crate::imap::mailbox_view::MailboxView; use crate::mail::mailbox::Mailbox; use crate::mail::user::User; @@ -16,7 +17,7 @@ use crate::mail::user::User; pub struct SelectedContext<'a> { pub req: &'a Request, pub user: &'a User, - pub mailbox: &'a mut Mailbox, + pub mailbox: &'a mut MailboxView, } pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response, flow::Transition)> { diff --git a/src/imap/flow.rs b/src/imap/flow.rs index 0fe6f92..c9d7e40 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -1,6 +1,7 @@ use std::error::Error as StdError; use std::fmt; +use crate::imap::mailbox_view::MailboxView; use crate::mail::mailbox::Mailbox; use crate::mail::user::User; @@ -18,14 +19,14 @@ impl StdError for Error {} pub enum State { NotAuthenticated, Authenticated(User), - Selected(User, Mailbox), + Selected(User, MailboxView), Logout, } pub enum Transition { None, Authenticate(User), - Select(Mailbox), + Select(MailboxView), Unselect, Logout, } diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs new file mode 100644 index 0000000..ec5580d --- /dev/null +++ b/src/imap/mailbox_view.rs @@ -0,0 +1,154 @@ +use std::sync::Arc; + +use anyhow::{Error, Result}; +use boitalettres::proto::{res::body::Data as Body, Request, Response}; +use imap_codec::types::command::CommandBody; +use imap_codec::types::core::Atom; +use imap_codec::types::flag::Flag; +use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec}; +use imap_codec::types::response::{Code, Data, Status}; + +use crate::mail::mailbox::{Mailbox, Summary}; +use crate::mail::uidindex::UidIndex; + +const DEFAULT_FLAGS: [Flag; 5] = [ + Flag::Seen, + Flag::Answered, + Flag::Flagged, + Flag::Deleted, + Flag::Draft, +]; + +/// A MailboxView is responsible for giving the client the information +/// it needs about a mailbox, such as an initial summary of the mailbox's +/// content and continuous updates indicating when the content +/// of the mailbox has been changed. +/// To do this, it keeps a variable `known_state` that corresponds to +/// what the client knows, and produces IMAP messages to be sent to the +/// client that go along updates to `known_state`. +pub struct MailboxView { + mailbox: Arc, + known_state: UidIndex, +} + +impl MailboxView { + /// Creates a new IMAP view into a mailbox. + /// Generates the necessary IMAP messages so that the client + /// has a satisfactory summary of the current mailbox's state. + pub async fn new(mailbox: Arc) -> Result<(Self, Vec)> { + let state = mailbox.current_uid_index().await; + + let new_view = Self { + mailbox, + known_state: state, + }; + + let mut data = Vec::::new(); + data.push(new_view.exists()?); + data.push(new_view.recent()?); + data.extend(new_view.flags()?.into_iter()); + data.push(new_view.uidvalidity()?); + data.push(new_view.uidnext()?); + if let Some(unseen) = new_view.unseen()? { + data.push(unseen); + } + + Ok((new_view, data)) + } + + // ---- + + /// Produce an OK [UIDVALIDITY _] message corresponding to `known_state` + fn uidvalidity(&self) -> Result { + let uid_validity = Status::ok( + None, + Some(Code::UidValidity(self.known_state.uidvalidity)), + "UIDs valid", + ) + .map_err(Error::msg)?; + Ok(Body::Status(uid_validity)) + } + + /// Produce an OK [UIDNEXT _] message corresponding to `known_state` + fn uidnext(&self) -> Result { + let next_uid = Status::ok( + None, + Some(Code::UidNext(self.known_state.uidnext)), + "Predict next UID", + ) + .map_err(Error::msg)?; + Ok(Body::Status(next_uid)) + } + + /// Produces an UNSEEN message (if relevant) corresponding to the + /// first unseen message id in `known_state` + fn unseen(&self) -> Result> { + let unseen = self + .known_state + .idx_by_flag + .get(&"$unseen".to_string()) + .and_then(|os| os.get_min()) + .cloned(); + if let Some(unseen) = unseen { + let status_unseen = + Status::ok(None, Some(Code::Unseen(unseen.clone())), "First unseen UID") + .map_err(Error::msg)?; + Ok(Some(Body::Status(status_unseen))) + } else { + Ok(None) + } + } + + /// Produce an EXISTS message corresponding to the number of mails + /// in `known_state` + fn exists(&self) -> Result { + let exists = u32::try_from(self.known_state.idx_by_uid.len())?; + Ok(Body::Data(Data::Exists(exists))) + } + + /// Produce a RECENT message corresponding to the number of + /// recent mails in `known_state` + fn recent(&self) -> Result { + let recent = self + .known_state + .idx_by_flag + .get(&"\\Recent".to_string()) + .map(|os| os.len()) + .unwrap_or(0); + let recent = u32::try_from(recent)?; + Ok(Body::Data(Data::Recent(recent))) + } + + /// Produce a FLAGS and a PERMANENTFLAGS message that indicates + /// the flags that are in `known_state` + default flags + fn flags(&self) -> Result> { + let mut flags: Vec = self + .known_state + .idx_by_flag + .flags() + .map(|f| match f.chars().next() { + Some('\\') => None, + Some('$') if f == "$unseen" => None, + Some(_) => match Atom::try_from(f.clone()) { + Err(_) => { + tracing::error!(flag=%f, "Unable to encode flag as IMAP atom"); + None + } + Ok(a) => Some(Flag::Keyword(a)), + }, + None => None, + }) + .flatten() + .collect(); + flags.extend_from_slice(&DEFAULT_FLAGS); + let mut ret = vec![Body::Data(Data::Flags(flags.clone()))]; + + flags.push(Flag::Permanent); + let permanent_flags = + Status::ok(None, Some(Code::PermanentFlags(flags)), "Flags permitted") + .map_err(Error::msg)?; + ret.push(Body::Status(permanent_flags)); + + Ok(ret) + } +} diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 0e9f49a..f85bcc6 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -1,5 +1,6 @@ mod command; mod flow; +mod mailbox_view; mod session; use std::task::{Context, Poll}; diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs index 9310e55..2eeb6d9 100644 --- a/src/login/ldap_provider.rs +++ b/src/login/ldap_provider.rs @@ -2,7 +2,6 @@ use anyhow::Result; use async_trait::async_trait; use ldap3::{LdapConnAsync, Scope, SearchEntry}; use log::debug; -use rusoto_signature::Region; use crate::config::*; use crate::login::*; diff --git a/src/login/mod.rs b/src/login/mod.rs index 1d5d634..0605d4e 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -13,7 +13,6 @@ use rand::prelude::*; use rusoto_core::HttpClient; use rusoto_credential::{AwsCredentials, StaticProvider}; use rusoto_s3::S3Client; -use rusoto_signature::Region; use crate::cryptoblob::*; @@ -52,7 +51,7 @@ pub struct PublicCredentials { } /// The struct StorageCredentials contains access key to an S3 and K2V bucket -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct StorageCredentials { pub s3_region: Region, pub k2v_region: Region, @@ -87,6 +86,24 @@ pub struct CryptoKeys { pub public: PublicKey, } +/// A custom S3 region, composed of a region name and endpoint. +/// We use this instead of rusoto_signature::Region so that we can +/// derive Hash and Eq +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct Region { + pub name: String, + pub endpoint: String, +} + +impl Region { + pub fn as_rusoto_region(&self) -> rusoto_signature::Region { + rusoto_signature::Region::Custom { + name: self.name.clone(), + endpoint: self.endpoint.clone(), + } + } +} + // ---- impl Credentials { @@ -111,7 +128,7 @@ impl StorageCredentials { ); Ok(K2vClient::new( - self.k2v_region.clone(), + self.k2v_region.as_rusoto_region(), self.bucket.clone(), aws_creds, None, @@ -127,7 +144,7 @@ impl StorageCredentials { Ok(S3Client::new_with( HttpClient::new()?, aws_creds_provider, - self.s3_region.clone(), + self.s3_region.as_rusoto_region(), )) } } diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs index 6bbc717..5ea765f 100644 --- a/src/login/static_provider.rs +++ b/src/login/static_provider.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; -use rusoto_signature::Region; use crate::config::*; use crate::cryptoblob::{Key, SecretKey}; diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index a2d28fb..9e8f0db 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -3,6 +3,7 @@ use std::convert::TryFrom; use anyhow::Result; use k2v_client::K2vClient; use rusoto_s3::S3Client; +use tokio::sync::RwLock; use crate::bayou::Bayou; use crate::cryptoblob::Key; @@ -11,16 +12,16 @@ use crate::mail::mail_ident::*; use crate::mail::uidindex::*; use crate::mail::IMF; -pub struct Summary<'a> { +pub struct Summary { pub validity: ImapUidvalidity, pub next: ImapUid, pub exists: u32, pub recent: u32, - pub flags: FlagIter<'a>, - pub unseen: Option<&'a ImapUid>, + pub flags: Vec, + pub unseen: Option, } -impl std::fmt::Display for Summary<'_> { +impl std::fmt::Display for Summary { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( f, @@ -30,11 +31,67 @@ impl std::fmt::Display for Summary<'_> { } } +pub struct Mailbox(RwLock); + +impl Mailbox { + pub(super) async fn open(creds: &Credentials, name: &str) -> Result { + let index_path = format!("index/{}", name); + let mail_path = format!("mail/{}", name); + + let mut uid_index = Bayou::::new(creds, index_path)?; + uid_index.sync().await?; + + Ok(Self(RwLock::new(MailboxInternal { + bucket: creds.bucket().to_string(), + key: creds.keys.master.clone(), + k2v: creds.k2v_client()?, + s3: creds.s3_client()?, + uid_index, + mail_path, + }))) + } + + /// Get a clone of the current UID Index of this mailbox + /// (cloning is cheap so don't hesitate to use this) + pub async fn current_uid_index(&self) -> UidIndex { + self.0.read().await.uid_index.state().clone() + } + + /// Insert an email in the mailbox + pub async fn append<'a>(&self, _msg: IMF<'a>) -> Result<()> { + unimplemented!() + } + + /// Copy an email from an other Mailbox to this mailbox + /// (use this when possible, as it allows for a certain number of storage optimizations) + pub async fn copy(&self, _from: &Mailbox, _uid: ImapUid) -> Result<()> { + unimplemented!() + } + + /// Delete all emails with the \Delete flag in the mailbox + /// Can be called by CLOSE and EXPUNGE + /// @FIXME do we want to implement this feature or a simpler "delete" command + /// The controller could then "fetch \Delete" and call delete on each email? + pub async fn expunge(&self) -> Result<()> { + unimplemented!() + } + + /// Update flags of a range of emails + pub async fn store(&self) -> Result<()> { + unimplemented!() + } + + pub async fn fetch(&self) -> Result<()> { + unimplemented!() + } +} + +// ---- + // Non standard but common flags: // https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml -pub struct Mailbox { +struct MailboxInternal { bucket: String, - pub name: String, key: Key, k2v: K2vClient, @@ -44,78 +101,7 @@ pub struct Mailbox { mail_path: String, } -impl Mailbox { - pub(super) fn new(creds: &Credentials, name: &str) -> Result { - let index_path = format!("index/{}", name); - let mail_path = format!("mail/{}", name); - let uid_index = Bayou::::new(creds, index_path)?; - - Ok(Self { - bucket: creds.bucket().to_string(), - name: name.to_string(), // TODO: don't use name field if possible, use mail_path instead - key: creds.keys.master.clone(), - k2v: creds.k2v_client()?, - s3: creds.s3_client()?, - uid_index, - mail_path, - }) - } - - /// Get a summary of the mailbox, useful for the SELECT command for example - pub async fn summary(&mut self) -> Result { - self.uid_index.sync().await?; - let state = self.uid_index.state(); - - let unseen = state - .idx_by_flag - .get(&"$unseen".to_string()) - .and_then(|os| os.get_min()); - let recent = state - .idx_by_flag - .get(&"\\Recent".to_string()) - .map(|os| os.len()) - .unwrap_or(0); - - return Ok(Summary { - validity: state.uidvalidity, - next: state.uidnext, - exists: u32::try_from(state.idx_by_uid.len())?, - recent: u32::try_from(recent)?, - flags: state.idx_by_flag.flags(), - unseen, - }); - } - - /// Insert an email in the mailbox - pub async fn append(&mut self, _msg: IMF) -> Result<()> { - unimplemented!() - } - - /// Copy an email from an other Mailbox to this mailbox - /// (use this when possible, as it allows for a certain number of storage optimizations) - pub async fn copy(&mut self, _from: &Mailbox, _uid: ImapUid) -> Result<()> { - unimplemented!() - } - - /// Delete all emails with the \Delete flag in the mailbox - /// Can be called by CLOSE and EXPUNGE - /// @FIXME do we want to implement this feature or a simpler "delete" command - /// The controller could then "fetch \Delete" and call delete on each email? - pub async fn expunge(&mut self) -> Result<()> { - unimplemented!() - } - - /// Update flags of a range of emails - pub async fn store(&mut self) -> Result<()> { - unimplemented!() - } - - pub async fn fetch(&mut self) -> Result<()> { - unimplemented!() - } - - // ---- - +impl MailboxInternal { pub async fn test(&mut self) -> Result<()> { self.uid_index.sync().await?; diff --git a/src/mail/mod.rs b/src/mail/mod.rs index 4339038..70182a9 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -1,6 +1,6 @@ pub mod mail_ident; pub mod mailbox; -mod uidindex; +pub mod uidindex; pub mod user; use std::convert::TryFrom; @@ -17,4 +17,7 @@ use crate::mail::uidindex::*; // Internet Message Format // aka RFC 822 - RFC 2822 - RFC 5322 -pub struct IMF(Vec); +pub struct IMF<'a> { + raw: &'a [u8], + parsed: mail_parser::Message<'a>, +} diff --git a/src/mail/user.rs b/src/mail/user.rs index 4864509..e2b33e2 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -1,9 +1,13 @@ +use std::collections::HashMap; +use std::sync::{Arc, Weak}; + use anyhow::Result; +use lazy_static::lazy_static; use k2v_client::K2vClient; use rusoto_s3::S3Client; -use crate::login::Credentials; +use crate::login::{Credentials, StorageCredentials}; use crate::mail::mailbox::Mailbox; pub struct User { @@ -31,8 +35,24 @@ impl User { } /// Opens an existing mailbox given its IMAP name. - pub fn open_mailbox(&self, name: &str) -> Result> { - Mailbox::new(&self.creds, name).map(Some) + pub async fn open_mailbox(&self, name: &str) -> Result>> { + { + let cache = MAILBOX_CACHE.cache.lock().unwrap(); + if let Some(mb) = cache.get(&self.creds.storage).and_then(Weak::upgrade) { + return Ok(Some(mb)); + } + } + + let mb = Arc::new(Mailbox::open(&self.creds, name).await?); + + let mut cache = MAILBOX_CACHE.cache.lock().unwrap(); + if let Some(concurrent_mb) = cache.get(&self.creds.storage).and_then(Weak::upgrade) { + drop(mb); // we worked for nothing but at least we didn't starve someone else + Ok(Some(concurrent_mb)) + } else { + cache.insert(self.creds.storage.clone(), Arc::downgrade(&mb)); + Ok(Some(mb)) + } } /// Creates a new mailbox in the user's IMAP namespace. @@ -50,3 +70,21 @@ impl User { unimplemented!() } } + +// ---- Mailbox cache ---- + +struct MailboxCache { + cache: std::sync::Mutex>>, +} + +impl MailboxCache { + fn new() -> Self { + Self { + cache: std::sync::Mutex::new(HashMap::new()), + } + } +} + +lazy_static! { + static ref MAILBOX_CACHE: MailboxCache = MailboxCache::new(); +} diff --git a/src/main.rs b/src/main.rs index 401ed53..5d139b6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,8 +14,6 @@ use anyhow::{bail, Result}; use clap::{Parser, Subcommand}; use rand::prelude::*; -use rusoto_signature::Region; - use config::*; use cryptoblob::*; use login::{static_provider::*, *}; @@ -264,11 +262,11 @@ async fn main() -> Result<()> { } fn make_storage_creds(c: StorageCredsArgs) -> StorageCredentials { - let s3_region = Region::Custom { + let s3_region = Region { name: c.region.clone(), endpoint: c.s3_endpoint, }; - let k2v_region = Region::Custom { + let k2v_region = Region { name: c.region, endpoint: c.k2v_endpoint, }; diff --git a/src/server.rs b/src/server.rs index 2cc481c..55fa5ba 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,14 +3,13 @@ use std::sync::Arc; use anyhow::{bail, Result}; use futures::{try_join, StreamExt}; use log::*; -use rusoto_signature::Region; use tokio::sync::watch; use crate::config::*; use crate::imap; use crate::lmtp::*; use crate::login::ArcLoginProvider; -use crate::login::{ldap_provider::*, static_provider::*}; +use crate::login::{ldap_provider::*, static_provider::*, Region}; pub struct Server { lmtp_server: Option>, @@ -62,11 +61,11 @@ impl Server { } fn build(config: Config) -> Result<(ArcLoginProvider, Option, Option)> { - let s3_region = Region::Custom { + let s3_region = Region { name: config.aws_region.clone(), endpoint: config.s3_endpoint, }; - let k2v_region = Region::Custom { + let k2v_region = Region { name: config.aws_region, endpoint: config.k2v_endpoint, };