Implement create_mailbox
This commit is contained in:
parent
7efe08a9b8
commit
90c20a840d
2 changed files with 97 additions and 41 deletions
|
@ -22,7 +22,11 @@ pub struct Mailbox {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Mailbox {
|
impl Mailbox {
|
||||||
pub(super) async fn open(creds: &Credentials, id: UniqueIdent, min_uidvalidity: ImapUidvalidity) -> Result<Self> {
|
pub(super) async fn open(
|
||||||
|
creds: &Credentials,
|
||||||
|
id: UniqueIdent,
|
||||||
|
min_uidvalidity: ImapUidvalidity,
|
||||||
|
) -> Result<Self> {
|
||||||
let index_path = format!("index/{}", id);
|
let index_path = format!("index/{}", id);
|
||||||
let mail_path = format!("mail/{}", id);
|
let mail_path = format!("mail/{}", id);
|
||||||
|
|
||||||
|
@ -31,7 +35,13 @@ impl Mailbox {
|
||||||
|
|
||||||
let uidvalidity = uid_index.state().uidvalidity;
|
let uidvalidity = uid_index.state().uidvalidity;
|
||||||
if uidvalidity < min_uidvalidity {
|
if uidvalidity < min_uidvalidity {
|
||||||
uid_index.push(uid_index.state().op_bump_uidvalidity(min_uidvalidity.get() - uidvalidity.get())).await?;
|
uid_index
|
||||||
|
.push(
|
||||||
|
uid_index
|
||||||
|
.state()
|
||||||
|
.op_bump_uidvalidity(min_uidvalidity.get() - uidvalidity.get()),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mbox = RwLock::new(MailboxInternal {
|
let mbox = RwLock::new(MailboxInternal {
|
||||||
|
|
124
src/mail/user.rs
124
src/mail/user.rs
|
@ -1,17 +1,17 @@
|
||||||
use std::collections::{HashMap, BTreeMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
|
|
||||||
use anyhow::{Result, bail};
|
use anyhow::{anyhow, bail, Result};
|
||||||
|
use k2v_client::{CausalityToken, K2vClient, K2vValue};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use serde::{Serialize, Deserialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use k2v_client::{K2vClient, CausalityToken, K2vValue};
|
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use crate::cryptoblob::{seal_serialize, open_deserialize};
|
use crate::cryptoblob::{open_deserialize, seal_serialize};
|
||||||
use crate::login::{Credentials, StorageCredentials};
|
use crate::login::{Credentials, StorageCredentials};
|
||||||
use crate::mail::mailbox::Mailbox;
|
use crate::mail::mailbox::Mailbox;
|
||||||
use crate::mail::unique_ident::{UniqueIdent, gen_ident};
|
|
||||||
use crate::mail::uidindex::ImapUidvalidity;
|
use crate::mail::uidindex::ImapUidvalidity;
|
||||||
|
use crate::mail::unique_ident::{gen_ident, UniqueIdent};
|
||||||
use crate::time::now_msec;
|
use crate::time::now_msec;
|
||||||
|
|
||||||
const MAILBOX_HIERARCHY_DELIMITER: &str = "/";
|
const MAILBOX_HIERARCHY_DELIMITER: &str = "/";
|
||||||
|
@ -71,16 +71,21 @@ impl User {
|
||||||
pub async fn open_mailbox(&self, name: &str) -> Result<Option<Arc<Mailbox>>> {
|
pub async fn open_mailbox(&self, name: &str) -> Result<Option<Arc<Mailbox>>> {
|
||||||
let (list, _ct) = self.load_mailbox_list().await?;
|
let (list, _ct) = self.load_mailbox_list().await?;
|
||||||
match list.get(name) {
|
match list.get(name) {
|
||||||
Some(MailboxListEntry { id_lww: (_, Some(mbid)), uidvalidity }) =>
|
Some(MailboxListEntry {
|
||||||
self.open_mailbox_by_id(*mbid, *uidvalidity).await,
|
id_lww: (_, Some(mbid)),
|
||||||
_ =>
|
uidvalidity,
|
||||||
bail!("Mailbox does not exist: {}", name),
|
}) => self.open_mailbox_by_id(*mbid, *uidvalidity).await,
|
||||||
|
_ => bail!("Mailbox does not exist: {}", name),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a new mailbox in the user's IMAP namespace.
|
/// Creates a new mailbox in the user's IMAP namespace.
|
||||||
pub fn create_mailbox(&self, _name: &str) -> Result<()> {
|
pub async fn create_mailbox(&self, name: &str) -> Result<()> {
|
||||||
unimplemented!()
|
let (mut list, ct) = self.load_mailbox_list().await?;
|
||||||
|
match self.create_mailbox_internal(&mut list, ct, name).await? {
|
||||||
|
CreatedMailbox::Created(_, _) => Ok(()),
|
||||||
|
CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Deletes a mailbox in the user's IMAP namespace.
|
/// Deletes a mailbox in the user's IMAP namespace.
|
||||||
|
@ -94,7 +99,7 @@ impl User {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Internal user & mailbox management ----
|
// ---- Internal user & mailbox management ----
|
||||||
|
|
||||||
async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> {
|
async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> {
|
||||||
let k2v = creds.k2v_client()?;
|
let k2v = creds.k2v_client()?;
|
||||||
|
|
||||||
|
@ -114,7 +119,11 @@ impl User {
|
||||||
Ok(user)
|
Ok(user)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn open_mailbox_by_id(&self, id: UniqueIdent, min_uidvalidity: ImapUidvalidity) -> Result<Option<Arc<Mailbox>>> {
|
async fn open_mailbox_by_id(
|
||||||
|
&self,
|
||||||
|
id: UniqueIdent,
|
||||||
|
min_uidvalidity: ImapUidvalidity,
|
||||||
|
) -> Result<Option<Arc<Mailbox>>> {
|
||||||
{
|
{
|
||||||
let cache = self.mailboxes.lock().unwrap();
|
let cache = self.mailboxes.lock().unwrap();
|
||||||
if let Some(mb) = cache.get(&id).and_then(Weak::upgrade) {
|
if let Some(mb) = cache.get(&id).and_then(Weak::upgrade) {
|
||||||
|
@ -155,38 +164,68 @@ impl User {
|
||||||
// and save new mailbox list.
|
// and save new mailbox list.
|
||||||
// Also, ensure that the mpsc::watch that keeps track of the
|
// Also, ensure that the mpsc::watch that keeps track of the
|
||||||
// inbox id is up-to-date.
|
// inbox id is up-to-date.
|
||||||
let (inbox_id, inbox_uidvalidity) = match list.get_mut(INBOX) {
|
let (inbox_id, inbox_uidvalidity) = match self
|
||||||
|
.create_mailbox_internal(&mut list, Some(cv.causality.clone()), INBOX)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
CreatedMailbox::Created(i, v) => (i, v),
|
||||||
|
CreatedMailbox::Existed(i, v) => (i, v),
|
||||||
|
};
|
||||||
|
self.tx_inbox_id
|
||||||
|
.send(Some((inbox_id, inbox_uidvalidity)))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Ok((list, Some(cv.causality)))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save_mailbox_list(
|
||||||
|
&self,
|
||||||
|
list: &MailboxList,
|
||||||
|
ct: Option<CausalityToken>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let list_blob = seal_serialize(list, &self.creds.keys.master)?;
|
||||||
|
self.k2v
|
||||||
|
.insert_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK, list_blob, ct)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_mailbox_internal(
|
||||||
|
&self,
|
||||||
|
list: &mut MailboxList,
|
||||||
|
ct: Option<CausalityToken>,
|
||||||
|
name: &str,
|
||||||
|
) -> Result<CreatedMailbox> {
|
||||||
|
match list.get_mut(name) {
|
||||||
None => {
|
None => {
|
||||||
let (id, uidvalidity) = (gen_ident(), ImapUidvalidity::new(1).unwrap());
|
let (id, uidvalidity) = (gen_ident(), ImapUidvalidity::new(1).unwrap());
|
||||||
list.insert(INBOX.into(), MailboxListEntry {
|
list.insert(
|
||||||
id_lww: (now_msec(), Some(id)),
|
name.into(),
|
||||||
uidvalidity,
|
MailboxListEntry {
|
||||||
});
|
id_lww: (now_msec(), Some(id)),
|
||||||
self.save_mailbox_list(&list, Some(cv.causality.clone())).await?;
|
uidvalidity,
|
||||||
(id, uidvalidity)
|
},
|
||||||
|
);
|
||||||
|
self.save_mailbox_list(&list, ct).await?;
|
||||||
|
Ok(CreatedMailbox::Created(id, uidvalidity))
|
||||||
}
|
}
|
||||||
Some(MailboxListEntry { id_lww: id_lww @ (_, None), uidvalidity }) => {
|
Some(MailboxListEntry {
|
||||||
|
id_lww: id_lww @ (_, None),
|
||||||
|
uidvalidity,
|
||||||
|
}) => {
|
||||||
let id = gen_ident();
|
let id = gen_ident();
|
||||||
id_lww.0 = std::cmp::max(id_lww.0 + 1, now_msec());
|
id_lww.0 = std::cmp::max(id_lww.0 + 1, now_msec());
|
||||||
id_lww.1 = Some(id);
|
id_lww.1 = Some(id);
|
||||||
*uidvalidity = ImapUidvalidity::new(uidvalidity.get() + 1).unwrap();
|
*uidvalidity = ImapUidvalidity::new(uidvalidity.get() + 1).unwrap();
|
||||||
let uidvalidity = *uidvalidity;
|
let uidvalidity = *uidvalidity;
|
||||||
self.save_mailbox_list(&list, Some(cv.causality.clone())).await?;
|
self.save_mailbox_list(list, ct).await?;
|
||||||
(id, uidvalidity)
|
Ok(CreatedMailbox::Created(id, uidvalidity))
|
||||||
}
|
}
|
||||||
Some(MailboxListEntry { id_lww: (_, Some(id)), uidvalidity }) => {
|
Some(MailboxListEntry {
|
||||||
(*id, *uidvalidity)
|
id_lww: (_, Some(id)),
|
||||||
}
|
uidvalidity,
|
||||||
};
|
}) => Ok(CreatedMailbox::Existed(*id, *uidvalidity)),
|
||||||
self.tx_inbox_id.send(Some((inbox_id, inbox_uidvalidity))).unwrap();
|
}
|
||||||
|
|
||||||
Ok((list, Some(cv.causality)))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn save_mailbox_list(&self, list: &MailboxList, ct: Option<CausalityToken>) -> Result<()> {
|
|
||||||
let list_blob = seal_serialize(list, &self.creds.keys.master)?;
|
|
||||||
self.k2v.insert_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK, list_blob, ct).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,7 +243,8 @@ impl MailboxListEntry {
|
||||||
fn merge(&mut self, other: &Self) {
|
fn merge(&mut self, other: &Self) {
|
||||||
// Simple CRDT merge rule
|
// Simple CRDT merge rule
|
||||||
if other.id_lww.0 > self.id_lww.0
|
if other.id_lww.0 > self.id_lww.0
|
||||||
|| (other.id_lww.0 == self.id_lww.0 && other.id_lww.1 > self.id_lww.1) {
|
|| (other.id_lww.0 == self.id_lww.0 && other.id_lww.1 > self.id_lww.1)
|
||||||
|
{
|
||||||
self.id_lww = other.id_lww;
|
self.id_lww = other.id_lww;
|
||||||
}
|
}
|
||||||
self.uidvalidity = std::cmp::max(self.uidvalidity, other.uidvalidity);
|
self.uidvalidity = std::cmp::max(self.uidvalidity, other.uidvalidity);
|
||||||
|
@ -222,8 +262,14 @@ fn merge_mailbox_lists(mut list1: MailboxList, list2: MailboxList) -> MailboxLis
|
||||||
list1
|
list1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum CreatedMailbox {
|
||||||
|
Created(UniqueIdent, ImapUidvalidity),
|
||||||
|
Existed(UniqueIdent, ImapUidvalidity),
|
||||||
|
}
|
||||||
|
|
||||||
// ---- User cache ----
|
// ---- User cache ----
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref USER_CACHE: std::sync::Mutex<HashMap<(String, StorageCredentials), Weak<User>>> = std::sync::Mutex::new(HashMap::new());
|
static ref USER_CACHE: std::sync::Mutex<HashMap<(String, StorageCredentials), Weak<User>>> =
|
||||||
|
std::sync::Mutex::new(HashMap::new());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue