aerogramme/src/mail/user.rs

296 lines
9.8 KiB
Rust
Raw Normal View History

2022-06-30 12:17:54 +00:00
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, Weak};
2022-06-30 12:17:54 +00:00
use anyhow::{anyhow, bail, Result};
use k2v_client::{CausalityToken, K2vClient, K2vValue};
use lazy_static::lazy_static;
2022-06-30 12:17:54 +00:00
use serde::{Deserialize, Serialize};
2022-06-30 12:02:57 +00:00
use tokio::sync::watch;
2022-06-29 11:16:58 +00:00
2022-06-30 12:17:54 +00:00
use crate::cryptoblob::{open_deserialize, seal_serialize};
use crate::login::{Credentials, StorageCredentials};
use crate::mail::incoming::incoming_mail_watch_process;
2022-06-29 11:16:58 +00:00
use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity;
2022-06-30 12:17:54 +00:00
use crate::mail::unique_ident::{gen_ident, UniqueIdent};
use crate::time::now_msec;
const MAILBOX_HIERARCHY_DELIMITER: &str = "/";
/// INBOX is the only mailbox that must always exist.
/// It is created automatically when the account is created.
/// IMAP allows the user to rename INBOX to something else,
/// in this case all messages from INBOX are moved to a mailbox
/// with the new name and the INBOX mailbox still exists and is empty.
/// In our implementation, we indeed move the underlying mailbox
/// to the new name (i.e. the new name has the same id as the previous
/// INBOX), and we create a new empty mailbox for INBOX.
const INBOX: &str = "INBOX";
2022-06-29 11:16:58 +00:00
2022-06-30 12:02:57 +00:00
const MAILBOX_LIST_PK: &str = "mailboxes";
const MAILBOX_LIST_SK: &str = "list";
2022-06-29 11:16:58 +00:00
pub struct User {
pub username: String,
pub creds: Credentials,
pub k2v: K2vClient,
2022-06-30 12:02:57 +00:00
pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>,
tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>,
2022-06-29 11:16:58 +00:00
}
impl User {
2022-06-30 12:02:57 +00:00
pub async fn new(username: String, creds: Credentials) -> Result<Arc<Self>> {
let cache_key = (username.clone(), creds.storage.clone());
{
let cache = USER_CACHE.lock().unwrap();
if let Some(u) = cache.get(&cache_key).and_then(Weak::upgrade) {
return Ok(u);
}
}
let user = Self::open(username, creds).await?;
let mut cache = USER_CACHE.lock().unwrap();
if let Some(concurrent_user) = cache.get(&cache_key).and_then(Weak::upgrade) {
drop(user);
Ok(concurrent_user)
} else {
cache.insert(cache_key, Arc::downgrade(&user));
Ok(user)
}
2022-06-29 11:16:58 +00:00
}
2022-06-29 11:41:05 +00:00
/// Lists user's available mailboxes
pub async fn list_mailboxes(&self) -> Result<Vec<String>> {
let (list, _ct) = self.load_mailbox_list().await?;
Ok(list.into_iter().map(|(k, _)| k).collect())
2022-06-29 11:41:05 +00:00
}
/// Opens an existing mailbox given its IMAP name.
pub async fn open_mailbox(&self, name: &str) -> Result<Option<Arc<Mailbox>>> {
let (mut list, ct) = self.load_mailbox_list().await?;
2022-06-30 14:18:08 +00:00
eprintln!("List of mailboxes: {:?}", list);
match list.get_mut(name) {
2022-06-30 12:17:54 +00:00
Some(MailboxListEntry {
id_lww: (_, Some(mbid)),
uidvalidity,
}) => {
let mb_opt = self.open_mailbox_by_id(*mbid, *uidvalidity).await?;
if let Some(mb) = &mb_opt {
let mb_uidvalidity = mb.current_uid_index().await.uidvalidity;
if mb_uidvalidity > *uidvalidity {
*uidvalidity = mb_uidvalidity;
self.save_mailbox_list(&list, ct).await?;
}
}
Ok(mb_opt)
}
2022-06-30 12:17:54 +00:00
_ => bail!("Mailbox does not exist: {}", name),
}
}
/// Creates a new mailbox in the user's IMAP namespace.
2022-06-30 12:17:54 +00:00
pub async fn create_mailbox(&self, name: &str) -> Result<()> {
let (mut list, ct) = self.load_mailbox_list().await?;
match self.create_mailbox_internal(&mut list, ct, name).await? {
CreatedMailbox::Created(_, _) => Ok(()),
CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)),
}
}
/// Deletes a mailbox in the user's IMAP namespace.
pub fn delete_mailbox(&self, _name: &str) -> Result<()> {
unimplemented!()
}
/// Renames a mailbox in the user's IMAP namespace.
pub fn rename_mailbox(&self, _old_name: &str, _new_name: &str) -> Result<()> {
unimplemented!()
}
2022-06-30 12:02:57 +00:00
// ---- Internal user & mailbox management ----
2022-06-30 12:17:54 +00:00
2022-06-30 12:02:57 +00:00
async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> {
let k2v = creds.k2v_client()?;
2022-06-30 12:02:57 +00:00
let (tx_inbox_id, rx_inbox_id) = watch::channel(None);
2022-06-30 12:02:57 +00:00
let user = Arc::new(Self {
username,
creds: creds.clone(),
2022-06-30 12:02:57 +00:00
k2v,
tx_inbox_id,
mailboxes: std::sync::Mutex::new(HashMap::new()),
});
// Ensure INBOX exists (done inside load_mailbox_list)
user.load_mailbox_list().await?;
tokio::spawn(incoming_mail_watch_process(
Arc::downgrade(&user),
user.creds.clone(),
rx_inbox_id,
));
2022-06-30 14:18:08 +00:00
2022-06-30 12:02:57 +00:00
Ok(user)
}
pub(super) async fn open_mailbox_by_id(
2022-06-30 12:17:54 +00:00
&self,
id: UniqueIdent,
min_uidvalidity: ImapUidvalidity,
) -> Result<Option<Arc<Mailbox>>> {
{
2022-06-30 12:02:57 +00:00
let cache = self.mailboxes.lock().unwrap();
if let Some(mb) = cache.get(&id).and_then(Weak::upgrade) {
return Ok(Some(mb));
}
}
let mb = Arc::new(Mailbox::open(&self.creds, id, min_uidvalidity).await?);
2022-06-30 12:02:57 +00:00
let mut cache = self.mailboxes.lock().unwrap();
if let Some(concurrent_mb) = cache.get(&id).and_then(Weak::upgrade) {
drop(mb); // we worked for nothing but at least we didn't starve someone else
Ok(Some(concurrent_mb))
} else {
2022-06-30 12:02:57 +00:00
cache.insert(id, Arc::downgrade(&mb));
Ok(Some(mb))
}
2022-06-29 11:41:05 +00:00
}
// ---- Mailbox list management ----
async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<CausalityToken>)> {
2022-06-30 14:18:08 +00:00
let (mut list, ct) = match self.k2v.read_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK).await {
Err(k2v_client::Error::NotFound) => (BTreeMap::new(), None),
Err(e) => return Err(e.into()),
2022-06-30 14:18:08 +00:00
Ok(cv) => {
let mut list = BTreeMap::new();
for v in cv.value {
if let K2vValue::Value(vbytes) = v {
let list2 =
open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?;
2022-06-30 14:18:08 +00:00
list = merge_mailbox_lists(list, list2);
}
}
(list, Some(cv.causality))
}
};
// If INBOX doesn't exist, create a new mailbox with that name
// and save new mailbox list.
2022-06-30 12:02:57 +00:00
// Also, ensure that the mpsc::watch that keeps track of the
// inbox id is up-to-date.
2022-06-30 12:17:54 +00:00
let (inbox_id, inbox_uidvalidity) = match self
2022-06-30 14:18:08 +00:00
.create_mailbox_internal(&mut list, ct.clone(), INBOX)
2022-06-30 12:17:54 +00:00
.await?
{
CreatedMailbox::Created(i, v) => (i, v),
CreatedMailbox::Existed(i, v) => (i, v),
};
self.tx_inbox_id
.send(Some((inbox_id, inbox_uidvalidity)))
.unwrap();
2022-06-30 14:18:08 +00:00
Ok((list, ct))
2022-06-30 12:17:54 +00:00
}
async fn save_mailbox_list(
&self,
list: &MailboxList,
ct: Option<CausalityToken>,
) -> Result<()> {
let list_blob = seal_serialize(list, &self.creds.keys.master)?;
self.k2v
.insert_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK, list_blob, ct)
.await?;
Ok(())
}
async fn create_mailbox_internal(
&self,
list: &mut MailboxList,
ct: Option<CausalityToken>,
name: &str,
) -> Result<CreatedMailbox> {
match list.get_mut(name) {
None => {
2022-06-30 12:02:57 +00:00
let (id, uidvalidity) = (gen_ident(), ImapUidvalidity::new(1).unwrap());
2022-06-30 12:17:54 +00:00
list.insert(
name.into(),
MailboxListEntry {
id_lww: (now_msec(), Some(id)),
uidvalidity,
},
);
self.save_mailbox_list(&list, ct).await?;
Ok(CreatedMailbox::Created(id, uidvalidity))
}
2022-06-30 12:17:54 +00:00
Some(MailboxListEntry {
id_lww: id_lww @ (_, None),
uidvalidity,
}) => {
2022-06-30 12:02:57 +00:00
let id = gen_ident();
id_lww.0 = std::cmp::max(id_lww.0 + 1, now_msec());
2022-06-30 12:02:57 +00:00
id_lww.1 = Some(id);
*uidvalidity = ImapUidvalidity::new(uidvalidity.get() + 1).unwrap();
2022-06-30 12:02:57 +00:00
let uidvalidity = *uidvalidity;
2022-06-30 12:17:54 +00:00
self.save_mailbox_list(list, ct).await?;
Ok(CreatedMailbox::Created(id, uidvalidity))
}
2022-06-30 12:17:54 +00:00
Some(MailboxListEntry {
id_lww: (_, Some(id)),
uidvalidity,
}) => Ok(CreatedMailbox::Existed(*id, *uidvalidity)),
}
2022-06-29 11:41:05 +00:00
}
}
2022-06-29 11:41:05 +00:00
// ---- User's mailbox list (serialized in K2V) ----
type MailboxList = BTreeMap<String, MailboxListEntry>;
2022-06-30 14:18:08 +00:00
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
struct MailboxListEntry {
id_lww: (u64, Option<UniqueIdent>),
uidvalidity: ImapUidvalidity,
}
impl MailboxListEntry {
fn merge(&mut self, other: &Self) {
// Simple CRDT merge rule
if other.id_lww.0 > self.id_lww.0
2022-06-30 12:17:54 +00:00
|| (other.id_lww.0 == self.id_lww.0 && other.id_lww.1 > self.id_lww.1)
{
self.id_lww = other.id_lww;
}
self.uidvalidity = std::cmp::max(self.uidvalidity, other.uidvalidity);
}
}
fn merge_mailbox_lists(mut list1: MailboxList, list2: MailboxList) -> MailboxList {
for (k, v) in list2.into_iter() {
if let Some(e) = list1.get_mut(&k) {
e.merge(&v);
} else {
list1.insert(k, v);
}
2022-06-29 11:16:58 +00:00
}
list1
2022-06-29 11:16:58 +00:00
}
2022-06-30 12:17:54 +00:00
enum CreatedMailbox {
Created(UniqueIdent, ImapUidvalidity),
Existed(UniqueIdent, ImapUidvalidity),
}
2022-06-30 12:02:57 +00:00
// ---- User cache ----
lazy_static! {
2022-06-30 12:17:54 +00:00
static ref USER_CACHE: std::sync::Mutex<HashMap<(String, StorageCredentials), Weak<User>>> =
std::sync::Mutex::new(HashMap::new());
}