From 9c95b261e00299933368a18ce56e238f50874cc4 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 14 Jun 2022 10:19:24 +0200 Subject: [PATCH] cargo fmt + WIP uid index improvement --- src/command.rs | 71 ++++++++++++++++++++++++++++++------------------- src/mailbox.rs | 20 ++++++++------ src/server.rs | 10 +++---- src/service.rs | 9 +++---- src/session.rs | 61 +++++++++++++++++++++++++++++------------- src/uidindex.rs | 23 +++++++++++++--- 6 files changed, 126 insertions(+), 68 deletions(-) diff --git a/src/command.rs b/src/command.rs index 4391b39..2c61227 100644 --- a/src/command.rs +++ b/src/command.rs @@ -1,14 +1,14 @@ use anyhow::Result; -use boitalettres::proto::{Request, Response}; use boitalettres::errors::Error as BalError; -use imap_codec::types::core::{Tag, AString}; -use imap_codec::types::response::{Capability, Data}; -use imap_codec::types::mailbox::{Mailbox as MailboxCodec, ListMailbox}; -use imap_codec::types::sequence::SequenceSet; +use boitalettres::proto::{Request, Response}; +use imap_codec::types::core::{AString, Tag}; use imap_codec::types::fetch_attributes::MacroOrFetchAttributes; +use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec}; +use imap_codec::types::response::{Capability, Data}; +use imap_codec::types::sequence::SequenceSet; -use crate::mailstore::Mailstore; use crate::mailbox::Mailbox; +use crate::mailstore::Mailstore; use crate::session; pub struct Command<'a> { @@ -34,40 +34,55 @@ impl<'a> Command<'a> { tracing::info!(user = %u, "command.login"); let creds = match self.session.mailstore.login_provider.login(&u, &p).await { - Err(_) => return Ok(Response::no("[AUTHENTICATIONFAILED] Authentication failed.")?), + Err(_) => { + return Ok(Response::no( + "[AUTHENTICATIONFAILED] Authentication failed.", + )?) + } Ok(c) => c, }; - self.session.user = Some(session::User { creds, name: u.clone(), }); + self.session.user = Some(session::User { + creds, + name: u.clone(), + }); tracing::info!(username=%u, "connected"); Ok(Response::ok("Logged in")?) } - pub async fn lsub(&self, reference: MailboxCodec, mailbox_wildcard: ListMailbox) -> Result { + pub async fn lsub( + &self, + reference: MailboxCodec, + mailbox_wildcard: ListMailbox, + ) -> Result { Ok(Response::bad("Not implemented")?) } - pub async fn list(&self, reference: MailboxCodec, mailbox_wildcard: ListMailbox) -> Result { + pub async fn list( + &self, + reference: MailboxCodec, + mailbox_wildcard: ListMailbox, + ) -> Result { Ok(Response::bad("Not implemented")?) } /* - * TRACE BEGIN --- + * TRACE BEGIN --- - Example: C: A142 SELECT INBOX - S: * 172 EXISTS - S: * 1 RECENT - S: * OK [UNSEEN 12] Message 12 is first unseen - S: * OK [UIDVALIDITY 3857529045] UIDs valid - S: * OK [UIDNEXT 4392] Predicted next UID - S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft) - S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited - S: A142 OK [READ-WRITE] SELECT completed + Example: C: A142 SELECT INBOX + S: * 172 EXISTS + S: * 1 RECENT + S: * OK [UNSEEN 12] Message 12 is first unseen + S: * OK [UIDVALIDITY 3857529045] UIDs valid + S: * OK [UIDNEXT 4392] Predicted next UID + S: * FLAGS (\Answered \Flagged \Deleted \Seen \Draft) + S: * OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited + S: A142 OK [READ-WRITE] SELECT completed - * TRACE END --- - */ + * TRACE END --- + */ pub async fn select(&mut self, mailbox: MailboxCodec) -> Result { let name = String::try_from(mailbox)?; let user = match self.session.user.as_ref() { @@ -81,16 +96,18 @@ impl<'a> Command<'a> { let sum = mb.summary().await?; tracing::trace!(summary=%sum, "mailbox.summary"); - let body = vec![ - Data::Exists(sum.exists.try_into()?), - Data::Recent(0), - ]; + let body = vec![Data::Exists(sum.exists.try_into()?), Data::Recent(0)]; self.session.selected = Some(mb); Ok(Response::ok("[READ-WRITE] Select completed")?.with_body(body)) } - pub async fn fetch(&self, sequence_set: SequenceSet, attributes: MacroOrFetchAttributes, uid: bool) -> Result { + pub async fn fetch( + &self, + sequence_set: SequenceSet, + attributes: MacroOrFetchAttributes, + uid: bool, + ) -> Result { Ok(Response::bad("Not implemented")?) } } diff --git a/src/mailbox.rs b/src/mailbox.rs index 3b1ac99..9d6c1fc 100644 --- a/src/mailbox.rs +++ b/src/mailbox.rs @@ -9,13 +9,17 @@ use crate::login::Credentials; use crate::uidindex::*; pub struct Summary { - pub validity: ImapUidvalidity, - pub next: ImapUid, - pub exists: usize, + pub validity: ImapUidvalidity, + pub next: ImapUid, + pub exists: usize, } impl std::fmt::Display for Summary { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "uidvalidity: {}, uidnext: {}, exists: {}", self.validity, self.next, self.exists) + write!( + f, + "uidvalidity: {}, uidnext: {}, exists: {}", + self.validity, self.next, self.exists + ) } } @@ -49,10 +53,10 @@ impl Mailbox { let state = self.uid_index.state(); return Ok(Summary { - validity: state.uidvalidity, - next: state.uidnext, - exists: state.mail_uid.len(), - }) + validity: state.uidvalidity, + next: state.uidnext, + exists: state.mail_uid.len(), + }); } pub async fn test(&mut self) -> Result<()> { diff --git a/src/server.rs b/src/server.rs index 2bbdc2b..365bc0f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,8 +2,8 @@ use anyhow::Result; use std::sync::Arc; use crate::config::*; -use crate::service; use crate::mailstore; +use crate::service; use boitalettres::server::accept::addr::AddrIncoming; use boitalettres::server::Server as ImapServer; @@ -24,10 +24,10 @@ impl Server { tracing::info!("Starting server on {:#}", self.incoming.local_addr); /*let creds = self - .mailstore - .login_provider - .login("quentin", "poupou") - .await?;*/ + .mailstore + .login_provider + .login("quentin", "poupou") + .await?;*/ //let mut mailbox = Mailbox::new(&creds, "TestMailbox".to_string()).await?; //mailbox.test().await?; diff --git a/src/service.rs b/src/service.rs index 35c753b..f99ba7a 100644 --- a/src/service.rs +++ b/src/service.rs @@ -2,9 +2,9 @@ use std::sync::Arc; use std::task::{Context, Poll}; use anyhow::Result; -use boitalettres::server::accept::addr::AddrStream; use boitalettres::errors::Error as BalError; use boitalettres::proto::{Request, Response}; +use boitalettres::server::accept::addr::AddrStream; use futures::future::BoxFuture; use futures::future::FutureExt; use tower::Service; @@ -41,7 +41,9 @@ pub struct Connection { } impl Connection { pub fn new(mailstore: Arc) -> Self { - Self { session: session::Manager::new(mailstore) } + Self { + session: session::Manager::new(mailstore), + } } } impl Service for Connection { @@ -58,6 +60,3 @@ impl Service for Connection { self.session.process(req) } } - - - diff --git a/src/session.rs b/src/session.rs index d689e72..8ad44dd 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,21 +1,21 @@ use std::sync::Arc; -use boitalettres::proto::{Request, Response}; use boitalettres::errors::Error as BalError; -use imap_codec::types::command::CommandBody; -use tokio::sync::{oneshot,mpsc}; -use tokio::sync::mpsc::error::TrySendError; +use boitalettres::proto::{Request, Response}; use futures::future::BoxFuture; use futures::future::FutureExt; +use imap_codec::types::command::CommandBody; +use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::{mpsc, oneshot}; use crate::command; use crate::login::Credentials; -use crate::mailstore::Mailstore; use crate::mailbox::Mailbox; +use crate::mailstore::Mailstore; /* This constant configures backpressure in the system, * or more specifically, how many pipelined messages are allowed - * before refusing them + * before refusing them */ const MAX_PIPELINED_COMMANDS: usize = 10; @@ -31,8 +31,8 @@ pub struct Manager { //@FIXME we should garbage collect the Instance when the Manager is destroyed. impl Manager { pub fn new(mailstore: Arc) -> Self { - let (tx, mut rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); - tokio::spawn(async move { + let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); + tokio::spawn(async move { let mut instance = Instance::new(mailstore, rx); instance.start().await; }); @@ -49,8 +49,12 @@ impl Manager { // will probably be malicious so we "rate limit" them. match self.tx.try_send(msg) { Ok(()) => (), - Err(TrySendError::Full(_)) => return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed(), - Err(TrySendError::Closed(_)) => return async { Response::bad("The session task has exited") }.boxed(), + Err(TrySendError::Full(_)) => { + return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed() + } + Err(TrySendError::Closed(_)) => { + return async { Response::bad("The session task has exited") }.boxed() + } }; // @FIXME add a timeout, handle a session that fails. @@ -60,9 +64,10 @@ impl Manager { Err(e) => { tracing::warn!("Got error {:#?}", e); Response::bad("No response from the session handler") - }, + } } - }.boxed() + } + .boxed() } } @@ -74,13 +79,18 @@ pub struct User { pub struct Instance { rx: mpsc::Receiver, - pub mailstore: Arc, + pub mailstore: Arc, pub selected: Option, pub user: Option, } impl Instance { fn new(mailstore: Arc, rx: mpsc::Receiver) -> Self { - Self { mailstore, rx, selected: None, user: None, } + Self { + mailstore, + rx, + selected: None, + user: None, + } } //@FIXME add a function that compute the runner's name from its local info @@ -96,11 +106,22 @@ impl Instance { let res = match msg.req.body { CommandBody::Capability => cmd.capability().await, CommandBody::Login { username, password } => cmd.login(username, password).await, - CommandBody::Lsub { reference, mailbox_wildcard } => cmd.lsub(reference, mailbox_wildcard).await, - CommandBody::List { reference, mailbox_wildcard } => cmd.list(reference, mailbox_wildcard).await, + CommandBody::Lsub { + reference, + mailbox_wildcard, + } => cmd.lsub(reference, mailbox_wildcard).await, + CommandBody::List { + reference, + mailbox_wildcard, + } => cmd.list(reference, mailbox_wildcard).await, CommandBody::Select { mailbox } => cmd.select(mailbox).await, - CommandBody::Fetch { sequence_set, attributes, uid } => cmd.fetch(sequence_set, attributes, uid).await, - _ => Response::bad("Error in IMAP command received by server.").map_err(anyhow::Error::new), + CommandBody::Fetch { + sequence_set, + attributes, + uid, + } => cmd.fetch(sequence_set, attributes, uid).await, + _ => Response::bad("Error in IMAP command received by server.") + .map_err(anyhow::Error::new), }; let wrapped_res = res.or_else(|e| match e.downcast::() { @@ -113,7 +134,9 @@ impl Instance { //@FIXME I think we should quit this thread on error and having our manager watch it, // and then abort the session as it is corrupted. - msg.tx.send(wrapped_res).unwrap_or_else(|e| tracing::warn!("failed to send imap response to manager: {:#?}", e)); + msg.tx.send(wrapped_res).unwrap_or_else(|e| { + tracing::warn!("failed to send imap response to manager: {:#?}", e) + }); } //@FIXME add more info about the runner diff --git a/src/uidindex.rs b/src/uidindex.rs index 42aa3bc..99647bc 100644 --- a/src/uidindex.rs +++ b/src/uidindex.rs @@ -1,4 +1,4 @@ -use im::OrdMap; +use im::{HashMap, HashSet, OrdMap}; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; use crate::bayou::*; @@ -9,16 +9,20 @@ pub type ImapUidvalidity = u32; /// A Mail UUID is composed of two components: /// - a process identifier, 128 bits /// - a sequence number, 64 bits -#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Debug)] +#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Debug)] pub struct MailUuid(pub [u8; 24]); #[derive(Clone)] +/// A UidIndex handles the mutable part of a mailbox +/// It is built by running the event log on it +/// Each applied log generates a new UidIndex by cloning the previous one +/// and applying the event. This is why we use immutable datastructures +/// that are optimized for cloning (they clone underlying values only if they are modified) pub struct UidIndex { pub mail_uid: OrdMap, pub mail_flags: OrdMap>, - pub mails_by_uid: OrdMap, - + pub flags: HashMap>, pub uidvalidity: ImapUidvalidity, pub uidnext: ImapUid, @@ -61,6 +65,7 @@ impl Default for UidIndex { mail_flags: OrdMap::new(), mail_uid: OrdMap::new(), mails_by_uid: OrdMap::new(), + flags: HashMap::new(), uidvalidity: 1, uidnext: 1, internalseq: 1, @@ -100,12 +105,21 @@ impl BayouState for UidIndex { new.internalseq += 1; } UidIndexOp::FlagAdd(uuid, new_flags) => { + // Upate mapping Email -> Flag let mail_flags = new.mail_flags.entry(*uuid).or_insert(vec![]); for flag in new_flags { if !mail_flags.contains(flag) { mail_flags.push(flag.to_string()); } } + + // Update mapping Flag -> Email + let _ = new_flags.iter().map(|flag| { + new.flags + .entry(flag.clone()) + .or_insert(HashSet::new()) + .update(*uuid) + }); } UidIndexOp::FlagDel(uuid, rm_flags) => { if let Some(mail_flags) = new.mail_flags.get_mut(uuid) { @@ -138,6 +152,7 @@ impl<'de> Deserialize<'de> for UidIndex { mail_flags: OrdMap::new(), mail_uid: OrdMap::new(), mails_by_uid: OrdMap::new(), + flags: HashMap::new(), uidvalidity: val.uidvalidity, uidnext: val.uidnext, internalseq: val.internalseq,