Refactor user to be shared using Arc
This commit is contained in:
parent
89b5883387
commit
b15026ec9c
6 changed files with 88 additions and 46 deletions
|
@ -66,7 +66,7 @@ impl<'a> AnonymousContext<'a> {
|
||||||
Ok(c) => c,
|
Ok(c) => c,
|
||||||
};
|
};
|
||||||
|
|
||||||
let user = User::new(u.clone(), creds)?;
|
let user = User::new(u.clone(), creds).await?;
|
||||||
|
|
||||||
tracing::info!(username=%u, "connected");
|
tracing::info!(username=%u, "connected");
|
||||||
Ok((
|
Ok((
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use boitalettres::proto::{Request, Response};
|
use boitalettres::proto::{Request, Response};
|
||||||
use imap_codec::types::command::{CommandBody, StatusAttribute};
|
use imap_codec::types::command::{CommandBody, StatusAttribute};
|
||||||
|
@ -12,7 +14,7 @@ use crate::mail::user::User;
|
||||||
|
|
||||||
pub struct AuthenticatedContext<'a> {
|
pub struct AuthenticatedContext<'a> {
|
||||||
pub req: &'a Request,
|
pub req: &'a Request,
|
||||||
pub user: &'a User,
|
pub user: &'a Arc<User>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn dispatch<'a>(ctx: AuthenticatedContext<'a>) -> Result<(Response, flow::Transition)> {
|
pub async fn dispatch<'a>(ctx: AuthenticatedContext<'a>) -> Result<(Response, flow::Transition)> {
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use boitalettres::proto::Request;
|
use boitalettres::proto::Request;
|
||||||
use boitalettres::proto::Response;
|
use boitalettres::proto::Response;
|
||||||
|
@ -15,7 +17,7 @@ use crate::mail::user::User;
|
||||||
|
|
||||||
pub struct ExaminedContext<'a> {
|
pub struct ExaminedContext<'a> {
|
||||||
pub req: &'a Request,
|
pub req: &'a Request,
|
||||||
pub user: &'a User,
|
pub user: &'a Arc<User>,
|
||||||
pub mailbox: &'a mut MailboxView,
|
pub mailbox: &'a mut MailboxView,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use boitalettres::proto::Request;
|
use boitalettres::proto::Request;
|
||||||
use boitalettres::proto::Response;
|
use boitalettres::proto::Response;
|
||||||
|
@ -16,7 +18,7 @@ use crate::mail::user::User;
|
||||||
|
|
||||||
pub struct SelectedContext<'a> {
|
pub struct SelectedContext<'a> {
|
||||||
pub req: &'a Request,
|
pub req: &'a Request,
|
||||||
pub user: &'a User,
|
pub user: &'a Arc<User>,
|
||||||
pub mailbox: &'a mut MailboxView,
|
pub mailbox: &'a mut MailboxView,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
use std::error::Error as StdError;
|
use std::error::Error as StdError;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crate::imap::mailbox_view::MailboxView;
|
use crate::imap::mailbox_view::MailboxView;
|
||||||
use crate::mail::user::User;
|
use crate::mail::user::User;
|
||||||
|
@ -17,16 +18,16 @@ impl StdError for Error {}
|
||||||
|
|
||||||
pub enum State {
|
pub enum State {
|
||||||
NotAuthenticated,
|
NotAuthenticated,
|
||||||
Authenticated(User),
|
Authenticated(Arc<User>),
|
||||||
Selected(User, MailboxView),
|
Selected(Arc<User>, MailboxView),
|
||||||
// Examined is like Selected, but indicates that the mailbox is read-only
|
// Examined is like Selected, but indicates that the mailbox is read-only
|
||||||
Examined(User, MailboxView),
|
Examined(Arc<User>, MailboxView),
|
||||||
Logout,
|
Logout,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum Transition {
|
pub enum Transition {
|
||||||
None,
|
None,
|
||||||
Authenticate(User),
|
Authenticate(Arc<User>),
|
||||||
Examine(MailboxView),
|
Examine(MailboxView),
|
||||||
Select(MailboxView),
|
Select(MailboxView),
|
||||||
Unselect,
|
Unselect,
|
||||||
|
|
111
src/mail/user.rs
111
src/mail/user.rs
|
@ -5,6 +5,7 @@ use anyhow::{Result, bail};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
use k2v_client::{K2vClient, CausalityToken, K2vValue};
|
use k2v_client::{K2vClient, CausalityToken, K2vValue};
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use crate::cryptoblob::{seal_serialize, open_deserialize};
|
use crate::cryptoblob::{seal_serialize, open_deserialize};
|
||||||
use crate::login::{Credentials, StorageCredentials};
|
use crate::login::{Credentials, StorageCredentials};
|
||||||
|
@ -25,20 +26,39 @@ const MAILBOX_HIERARCHY_DELIMITER: &str = "/";
|
||||||
/// INBOX), and we create a new empty mailbox for INBOX.
|
/// INBOX), and we create a new empty mailbox for INBOX.
|
||||||
const INBOX: &str = "INBOX";
|
const INBOX: &str = "INBOX";
|
||||||
|
|
||||||
|
const MAILBOX_LIST_PK: &str = "mailboxes";
|
||||||
|
const MAILBOX_LIST_SK: &str = "list";
|
||||||
|
|
||||||
pub struct User {
|
pub struct User {
|
||||||
pub username: String,
|
pub username: String,
|
||||||
pub creds: Credentials,
|
pub creds: Credentials,
|
||||||
pub k2v: K2vClient,
|
pub k2v: K2vClient,
|
||||||
|
pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>,
|
||||||
|
|
||||||
|
tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl User {
|
impl User {
|
||||||
pub fn new(username: String, creds: Credentials) -> Result<Self> {
|
pub async fn new(username: String, creds: Credentials) -> Result<Arc<Self>> {
|
||||||
let k2v = creds.k2v_client()?;
|
let cache_key = (username.clone(), creds.storage.clone());
|
||||||
Ok(Self {
|
|
||||||
username,
|
{
|
||||||
creds,
|
let cache = USER_CACHE.lock().unwrap();
|
||||||
k2v,
|
if let Some(u) = cache.get(&cache_key).and_then(Weak::upgrade) {
|
||||||
})
|
return Ok(u);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let user = Self::open(username, creds).await?;
|
||||||
|
|
||||||
|
let mut cache = USER_CACHE.lock().unwrap();
|
||||||
|
if let Some(concurrent_user) = cache.get(&cache_key).and_then(Weak::upgrade) {
|
||||||
|
drop(user);
|
||||||
|
Ok(concurrent_user)
|
||||||
|
} else {
|
||||||
|
cache.insert(cache_key, Arc::downgrade(&user));
|
||||||
|
Ok(user)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Lists user's available mailboxes
|
/// Lists user's available mailboxes
|
||||||
|
@ -73,26 +93,43 @@ impl User {
|
||||||
unimplemented!()
|
unimplemented!()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Internal mailbox management ----
|
// ---- Internal user & mailbox management ----
|
||||||
|
|
||||||
|
async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> {
|
||||||
|
let k2v = creds.k2v_client()?;
|
||||||
|
|
||||||
|
let (tx_inbox_id, rx_inbox_id) = watch::channel(None);
|
||||||
|
|
||||||
|
let user = Arc::new(Self {
|
||||||
|
username,
|
||||||
|
creds,
|
||||||
|
k2v,
|
||||||
|
tx_inbox_id,
|
||||||
|
mailboxes: std::sync::Mutex::new(HashMap::new()),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Ensure INBOX exists (done inside load_mailbox_list)
|
||||||
|
user.load_mailbox_list().await?;
|
||||||
|
|
||||||
|
Ok(user)
|
||||||
|
}
|
||||||
|
|
||||||
async fn open_mailbox_by_id(&self, id: UniqueIdent, min_uidvalidity: ImapUidvalidity) -> Result<Option<Arc<Mailbox>>> {
|
async fn open_mailbox_by_id(&self, id: UniqueIdent, min_uidvalidity: ImapUidvalidity) -> Result<Option<Arc<Mailbox>>> {
|
||||||
let cache_key = (self.creds.storage.clone(), id);
|
|
||||||
|
|
||||||
{
|
{
|
||||||
let cache = MAILBOX_CACHE.cache.lock().unwrap();
|
let cache = self.mailboxes.lock().unwrap();
|
||||||
if let Some(mb) = cache.get(&cache_key).and_then(Weak::upgrade) {
|
if let Some(mb) = cache.get(&id).and_then(Weak::upgrade) {
|
||||||
return Ok(Some(mb));
|
return Ok(Some(mb));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mb = Arc::new(Mailbox::open(&self.creds, id, min_uidvalidity).await?);
|
let mb = Arc::new(Mailbox::open(&self.creds, id, min_uidvalidity).await?);
|
||||||
|
|
||||||
let mut cache = MAILBOX_CACHE.cache.lock().unwrap();
|
let mut cache = self.mailboxes.lock().unwrap();
|
||||||
if let Some(concurrent_mb) = cache.get(&cache_key).and_then(Weak::upgrade) {
|
if let Some(concurrent_mb) = cache.get(&id).and_then(Weak::upgrade) {
|
||||||
drop(mb); // we worked for nothing but at least we didn't starve someone else
|
drop(mb); // we worked for nothing but at least we didn't starve someone else
|
||||||
Ok(Some(concurrent_mb))
|
Ok(Some(concurrent_mb))
|
||||||
} else {
|
} else {
|
||||||
cache.insert(cache_key, Arc::downgrade(&mb));
|
cache.insert(id, Arc::downgrade(&mb));
|
||||||
Ok(Some(mb))
|
Ok(Some(mb))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -100,7 +137,7 @@ impl User {
|
||||||
// ---- Mailbox list management ----
|
// ---- Mailbox list management ----
|
||||||
|
|
||||||
async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<CausalityToken>)> {
|
async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<CausalityToken>)> {
|
||||||
let cv = match self.k2v.read_item("mailboxes", "list").await {
|
let cv = match self.k2v.read_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK).await {
|
||||||
Err(k2v_client::Error::NotFound) => return Ok((BTreeMap::new(), None)),
|
Err(k2v_client::Error::NotFound) => return Ok((BTreeMap::new(), None)),
|
||||||
Err(e) => return Err(e.into()),
|
Err(e) => return Err(e.into()),
|
||||||
Ok(cv) => cv,
|
Ok(cv) => cv,
|
||||||
|
@ -116,29 +153,39 @@ impl User {
|
||||||
|
|
||||||
// If INBOX doesn't exist, create a new mailbox with that name
|
// If INBOX doesn't exist, create a new mailbox with that name
|
||||||
// and save new mailbox list.
|
// and save new mailbox list.
|
||||||
match list.get_mut(INBOX) {
|
// Also, ensure that the mpsc::watch that keeps track of the
|
||||||
|
// inbox id is up-to-date.
|
||||||
|
let (inbox_id, inbox_uidvalidity) = match list.get_mut(INBOX) {
|
||||||
None => {
|
None => {
|
||||||
|
let (id, uidvalidity) = (gen_ident(), ImapUidvalidity::new(1).unwrap());
|
||||||
list.insert(INBOX.into(), MailboxListEntry {
|
list.insert(INBOX.into(), MailboxListEntry {
|
||||||
id_lww: (now_msec(), Some(gen_ident())),
|
id_lww: (now_msec(), Some(id)),
|
||||||
uidvalidity: ImapUidvalidity::new(1).unwrap(),
|
uidvalidity,
|
||||||
});
|
});
|
||||||
self.save_mailbox_list(&list, Some(cv.causality.clone())).await?;
|
self.save_mailbox_list(&list, Some(cv.causality.clone())).await?;
|
||||||
|
(id, uidvalidity)
|
||||||
}
|
}
|
||||||
Some(MailboxListEntry { id_lww, uidvalidity }) if id_lww.1.is_none() => {
|
Some(MailboxListEntry { id_lww: id_lww @ (_, None), uidvalidity }) => {
|
||||||
|
let id = gen_ident();
|
||||||
id_lww.0 = std::cmp::max(id_lww.0 + 1, now_msec());
|
id_lww.0 = std::cmp::max(id_lww.0 + 1, now_msec());
|
||||||
id_lww.1 = Some(gen_ident());
|
id_lww.1 = Some(id);
|
||||||
*uidvalidity = ImapUidvalidity::new(uidvalidity.get() + 1).unwrap();
|
*uidvalidity = ImapUidvalidity::new(uidvalidity.get() + 1).unwrap();
|
||||||
|
let uidvalidity = *uidvalidity;
|
||||||
self.save_mailbox_list(&list, Some(cv.causality.clone())).await?;
|
self.save_mailbox_list(&list, Some(cv.causality.clone())).await?;
|
||||||
|
(id, uidvalidity)
|
||||||
}
|
}
|
||||||
_ => (),
|
Some(MailboxListEntry { id_lww: (_, Some(id)), uidvalidity }) => {
|
||||||
}
|
(*id, *uidvalidity)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self.tx_inbox_id.send(Some((inbox_id, inbox_uidvalidity))).unwrap();
|
||||||
|
|
||||||
Ok((list, Some(cv.causality)))
|
Ok((list, Some(cv.causality)))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save_mailbox_list(&self, list: &MailboxList, ct: Option<CausalityToken>) -> Result<()> {
|
async fn save_mailbox_list(&self, list: &MailboxList, ct: Option<CausalityToken>) -> Result<()> {
|
||||||
let list_blob = seal_serialize(list, &self.creds.keys.master)?;
|
let list_blob = seal_serialize(list, &self.creds.keys.master)?;
|
||||||
self.k2v.insert_item("mailboxes", "list", list_blob, ct).await?;
|
self.k2v.insert_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK, list_blob, ct).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -175,20 +222,8 @@ fn merge_mailbox_lists(mut list1: MailboxList, list2: MailboxList) -> MailboxLis
|
||||||
list1
|
list1
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Mailbox cache ----
|
// ---- User cache ----
|
||||||
|
|
||||||
struct MailboxCache {
|
|
||||||
cache: std::sync::Mutex<HashMap<(StorageCredentials, UniqueIdent), Weak<Mailbox>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MailboxCache {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
cache: std::sync::Mutex::new(HashMap::new()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref MAILBOX_CACHE: MailboxCache = MailboxCache::new();
|
static ref USER_CACHE: std::sync::Mutex<HashMap<(String, StorageCredentials), Weak<User>>> = std::sync::Mutex::new(HashMap::new());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue