From b15026ec9ca2045f7ddb21a759cb075bb1dbd014 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 30 Jun 2022 14:02:57 +0200 Subject: [PATCH] Refactor user to be shared using Arc --- src/imap/command/anonymous.rs | 2 +- src/imap/command/authenticated.rs | 4 +- src/imap/command/examined.rs | 4 +- src/imap/command/selected.rs | 4 +- src/imap/flow.rs | 9 +-- src/mail/user.rs | 111 ++++++++++++++++++++---------- 6 files changed, 88 insertions(+), 46 deletions(-) diff --git a/src/imap/command/anonymous.rs b/src/imap/command/anonymous.rs index b84b0da..3ac1f20 100644 --- a/src/imap/command/anonymous.rs +++ b/src/imap/command/anonymous.rs @@ -66,7 +66,7 @@ impl<'a> AnonymousContext<'a> { Ok(c) => c, }; - let user = User::new(u.clone(), creds)?; + let user = User::new(u.clone(), creds).await?; tracing::info!(username=%u, "connected"); Ok(( diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index dfcb52e..0b34223 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::Result; use boitalettres::proto::{Request, Response}; use imap_codec::types::command::{CommandBody, StatusAttribute}; @@ -12,7 +14,7 @@ use crate::mail::user::User; pub struct AuthenticatedContext<'a> { pub req: &'a Request, - pub user: &'a User, + pub user: &'a Arc, } pub async fn dispatch<'a>(ctx: AuthenticatedContext<'a>) -> Result<(Response, flow::Transition)> { diff --git a/src/imap/command/examined.rs b/src/imap/command/examined.rs index aad7874..91ad950 100644 --- a/src/imap/command/examined.rs +++ b/src/imap/command/examined.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::Result; use boitalettres::proto::Request; use boitalettres::proto::Response; @@ -15,7 +17,7 @@ use crate::mail::user::User; pub struct ExaminedContext<'a> { pub req: &'a Request, - pub user: &'a User, + pub user: &'a Arc, pub mailbox: &'a mut MailboxView, } diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index a447a49..bb78cbd 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::Result; use boitalettres::proto::Request; use boitalettres::proto::Response; @@ -16,7 +18,7 @@ use crate::mail::user::User; pub struct SelectedContext<'a> { pub req: &'a Request, - pub user: &'a User, + pub user: &'a Arc, pub mailbox: &'a mut MailboxView, } diff --git a/src/imap/flow.rs b/src/imap/flow.rs index feb78ac..303b498 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -1,5 +1,6 @@ use std::error::Error as StdError; use std::fmt; +use std::sync::Arc; use crate::imap::mailbox_view::MailboxView; use crate::mail::user::User; @@ -17,16 +18,16 @@ impl StdError for Error {} pub enum State { NotAuthenticated, - Authenticated(User), - Selected(User, MailboxView), + Authenticated(Arc), + Selected(Arc, MailboxView), // Examined is like Selected, but indicates that the mailbox is read-only - Examined(User, MailboxView), + Examined(Arc, MailboxView), Logout, } pub enum Transition { None, - Authenticate(User), + Authenticate(Arc), Examine(MailboxView), Select(MailboxView), Unselect, diff --git a/src/mail/user.rs b/src/mail/user.rs index bccb9ed..b760e17 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -5,6 +5,7 @@ use anyhow::{Result, bail}; use lazy_static::lazy_static; use serde::{Serialize, Deserialize}; use k2v_client::{K2vClient, CausalityToken, K2vValue}; +use tokio::sync::watch; use crate::cryptoblob::{seal_serialize, open_deserialize}; use crate::login::{Credentials, StorageCredentials}; @@ -25,20 +26,39 @@ const MAILBOX_HIERARCHY_DELIMITER: &str = "/"; /// INBOX), and we create a new empty mailbox for INBOX. const INBOX: &str = "INBOX"; +const MAILBOX_LIST_PK: &str = "mailboxes"; +const MAILBOX_LIST_SK: &str = "list"; + pub struct User { pub username: String, pub creds: Credentials, pub k2v: K2vClient, + pub mailboxes: std::sync::Mutex>>, + + tx_inbox_id: watch::Sender>, } impl User { - pub fn new(username: String, creds: Credentials) -> Result { - let k2v = creds.k2v_client()?; - Ok(Self { - username, - creds, - k2v, - }) + pub async fn new(username: String, creds: Credentials) -> Result> { + let cache_key = (username.clone(), creds.storage.clone()); + + { + let cache = USER_CACHE.lock().unwrap(); + if let Some(u) = cache.get(&cache_key).and_then(Weak::upgrade) { + return Ok(u); + } + } + + let user = Self::open(username, creds).await?; + + let mut cache = USER_CACHE.lock().unwrap(); + if let Some(concurrent_user) = cache.get(&cache_key).and_then(Weak::upgrade) { + drop(user); + Ok(concurrent_user) + } else { + cache.insert(cache_key, Arc::downgrade(&user)); + Ok(user) + } } /// Lists user's available mailboxes @@ -73,26 +93,43 @@ impl User { unimplemented!() } - // ---- Internal mailbox management ---- + // ---- Internal user & mailbox management ---- + + async fn open(username: String, creds: Credentials) -> Result> { + let k2v = creds.k2v_client()?; + + let (tx_inbox_id, rx_inbox_id) = watch::channel(None); + + let user = Arc::new(Self { + username, + creds, + k2v, + tx_inbox_id, + mailboxes: std::sync::Mutex::new(HashMap::new()), + }); + + // Ensure INBOX exists (done inside load_mailbox_list) + user.load_mailbox_list().await?; + + Ok(user) + } async fn open_mailbox_by_id(&self, id: UniqueIdent, min_uidvalidity: ImapUidvalidity) -> Result>> { - let cache_key = (self.creds.storage.clone(), id); - { - let cache = MAILBOX_CACHE.cache.lock().unwrap(); - if let Some(mb) = cache.get(&cache_key).and_then(Weak::upgrade) { + let cache = self.mailboxes.lock().unwrap(); + if let Some(mb) = cache.get(&id).and_then(Weak::upgrade) { return Ok(Some(mb)); } } let mb = Arc::new(Mailbox::open(&self.creds, id, min_uidvalidity).await?); - let mut cache = MAILBOX_CACHE.cache.lock().unwrap(); - if let Some(concurrent_mb) = cache.get(&cache_key).and_then(Weak::upgrade) { + let mut cache = self.mailboxes.lock().unwrap(); + if let Some(concurrent_mb) = cache.get(&id).and_then(Weak::upgrade) { drop(mb); // we worked for nothing but at least we didn't starve someone else Ok(Some(concurrent_mb)) } else { - cache.insert(cache_key, Arc::downgrade(&mb)); + cache.insert(id, Arc::downgrade(&mb)); Ok(Some(mb)) } } @@ -100,7 +137,7 @@ impl User { // ---- Mailbox list management ---- async fn load_mailbox_list(&self) -> Result<(MailboxList, Option)> { - let cv = match self.k2v.read_item("mailboxes", "list").await { + let cv = match self.k2v.read_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK).await { Err(k2v_client::Error::NotFound) => return Ok((BTreeMap::new(), None)), Err(e) => return Err(e.into()), Ok(cv) => cv, @@ -116,29 +153,39 @@ impl User { // If INBOX doesn't exist, create a new mailbox with that name // and save new mailbox list. - match list.get_mut(INBOX) { + // Also, ensure that the mpsc::watch that keeps track of the + // inbox id is up-to-date. + let (inbox_id, inbox_uidvalidity) = match list.get_mut(INBOX) { None => { + let (id, uidvalidity) = (gen_ident(), ImapUidvalidity::new(1).unwrap()); list.insert(INBOX.into(), MailboxListEntry { - id_lww: (now_msec(), Some(gen_ident())), - uidvalidity: ImapUidvalidity::new(1).unwrap(), + id_lww: (now_msec(), Some(id)), + uidvalidity, }); self.save_mailbox_list(&list, Some(cv.causality.clone())).await?; + (id, uidvalidity) } - Some(MailboxListEntry { id_lww, uidvalidity }) if id_lww.1.is_none() => { + Some(MailboxListEntry { id_lww: id_lww @ (_, None), uidvalidity }) => { + let id = gen_ident(); id_lww.0 = std::cmp::max(id_lww.0 + 1, now_msec()); - id_lww.1 = Some(gen_ident()); + id_lww.1 = Some(id); *uidvalidity = ImapUidvalidity::new(uidvalidity.get() + 1).unwrap(); + let uidvalidity = *uidvalidity; self.save_mailbox_list(&list, Some(cv.causality.clone())).await?; + (id, uidvalidity) } - _ => (), - } + Some(MailboxListEntry { id_lww: (_, Some(id)), uidvalidity }) => { + (*id, *uidvalidity) + } + }; + self.tx_inbox_id.send(Some((inbox_id, inbox_uidvalidity))).unwrap(); Ok((list, Some(cv.causality))) } async fn save_mailbox_list(&self, list: &MailboxList, ct: Option) -> Result<()> { let list_blob = seal_serialize(list, &self.creds.keys.master)?; - self.k2v.insert_item("mailboxes", "list", list_blob, ct).await?; + self.k2v.insert_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK, list_blob, ct).await?; Ok(()) } } @@ -175,20 +222,8 @@ fn merge_mailbox_lists(mut list1: MailboxList, list2: MailboxList) -> MailboxLis list1 } -// ---- Mailbox cache ---- - -struct MailboxCache { - cache: std::sync::Mutex>>, -} - -impl MailboxCache { - fn new() -> Self { - Self { - cache: std::sync::Mutex::new(HashMap::new()), - } - } -} +// ---- User cache ---- lazy_static! { - static ref MAILBOX_CACHE: MailboxCache = MailboxCache::new(); + static ref USER_CACHE: std::sync::Mutex>> = std::sync::Mutex::new(HashMap::new()); }