aerogramme/src/mail/mailbox.rs

472 lines
15 KiB
Rust
Raw Normal View History

2022-06-29 17:24:21 +00:00
use anyhow::{anyhow, bail, Result};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
2022-06-29 11:16:58 +00:00
use crate::bayou::Bayou;
2022-06-30 10:07:01 +00:00
use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key};
2022-06-29 11:16:58 +00:00
use crate::login::Credentials;
use crate::mail::uidindex::*;
use crate::mail::unique_ident::*;
2022-06-29 11:16:58 +00:00
use crate::mail::IMF;
2023-11-02 15:17:11 +00:00
use crate::storage::{RowStore, BlobStore, self};
use crate::timestamp::now_msec;
2022-06-29 11:16:58 +00:00
pub struct Mailbox {
pub(super) id: UniqueIdent,
mbox: RwLock<MailboxInternal>,
2022-06-29 11:16:58 +00:00
}
2022-06-29 11:41:05 +00:00
2022-06-29 11:16:58 +00:00
impl Mailbox {
2022-06-30 12:17:54 +00:00
pub(super) async fn open(
creds: &Credentials,
id: UniqueIdent,
min_uidvalidity: ImapUidvalidity,
) -> Result<Self> {
let index_path = format!("index/{}", id);
let mail_path = format!("mail/{}", id);
2022-06-29 11:16:58 +00:00
let mut uid_index = Bayou::<UidIndex>::new(creds, index_path)?;
uid_index.sync().await?;
let uidvalidity = uid_index.state().uidvalidity;
if uidvalidity < min_uidvalidity {
2022-06-30 12:17:54 +00:00
uid_index
.push(
uid_index
.state()
.op_bump_uidvalidity(min_uidvalidity.get() - uidvalidity.get()),
)
.await?;
}
2022-07-12 13:35:25 +00:00
dump(&uid_index);
let mbox = RwLock::new(MailboxInternal {
id,
encryption_key: creds.keys.master.clone(),
2023-11-02 14:28:19 +00:00
k2v: creds.storage.row_store()?,
s3: creds.storage.blob_store()?,
2022-06-29 11:16:58 +00:00
uid_index,
mail_path,
});
Ok(Self { id, mbox })
2022-06-29 11:16:58 +00:00
}
2022-06-29 15:58:31 +00:00
/// Sync data with backing store
pub async fn force_sync(&self) -> Result<()> {
self.mbox.write().await.force_sync().await
}
/// Sync data with backing store only if changes are detected
/// or last sync is too old
pub async fn opportunistic_sync(&self) -> Result<()> {
self.mbox.write().await.opportunistic_sync().await
2022-06-29 15:58:31 +00:00
}
2022-06-30 08:59:42 +00:00
// ---- Functions for reading the mailbox ----
/// Get a clone of the current UID Index of this mailbox
/// (cloning is cheap so don't hesitate to use this)
pub async fn current_uid_index(&self) -> UidIndex {
self.mbox.read().await.uid_index.state().clone()
2022-06-29 11:16:58 +00:00
}
2022-06-30 08:59:42 +00:00
/// Fetch the metadata (headers + some more info) of the specified
/// mail IDs
pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
self.mbox.read().await.fetch_meta(ids).await
}
/// Fetch an entire e-mail
pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
self.mbox.read().await.fetch_full(id, message_key).await
}
// ---- Functions for changing the mailbox ----
/// Add flags to message
pub async fn add_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
self.mbox.write().await.add_flags(id, flags).await
}
/// Delete flags from message
pub async fn del_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
self.mbox.write().await.del_flags(id, flags).await
}
2022-07-13 09:00:35 +00:00
/// Define the new flags for this message
pub async fn set_flags<'a>(&self, id: UniqueIdent, flags: &[Flag]) -> Result<()> {
self.mbox.write().await.set_flags(id, flags).await
}
2022-06-30 08:59:42 +00:00
/// Insert an email into the mailbox
2022-07-12 14:35:11 +00:00
pub async fn append<'a>(
&self,
msg: IMF<'a>,
ident: Option<UniqueIdent>,
flags: &[Flag],
) -> Result<(ImapUidvalidity, ImapUid)> {
self.mbox.write().await.append(msg, ident, flags).await
2022-06-30 08:59:42 +00:00
}
2022-07-01 18:09:26 +00:00
/// Insert an email into the mailbox, copying it from an existing S3 object
pub async fn append_from_s3<'a>(
&self,
msg: IMF<'a>,
ident: UniqueIdent,
2023-11-02 15:17:11 +00:00
blob_ref: storage::BlobRef,
2022-07-01 18:09:26 +00:00
message_key: Key,
) -> Result<()> {
self.mbox
.write()
.await
2023-11-02 15:17:11 +00:00
.append_from_s3(msg, ident, blob_ref, message_key)
2022-07-01 18:09:26 +00:00
.await
}
2022-06-30 08:59:42 +00:00
/// Delete a message definitively from the mailbox
pub async fn delete<'a>(&self, id: UniqueIdent) -> Result<()> {
self.mbox.write().await.delete(id).await
2022-06-29 11:16:58 +00:00
}
2022-06-29 11:41:05 +00:00
/// Copy an email from an other Mailbox to this mailbox
/// (use this when possible, as it allows for a certain number of storage optimizations)
2022-07-21 10:44:58 +00:00
pub async fn copy_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<UniqueIdent> {
2022-06-30 10:07:01 +00:00
if self.id == from.id {
bail!("Cannot copy into same mailbox");
}
2022-07-12 13:31:29 +00:00
let (mut selflock, fromlock);
2022-06-30 10:07:01 +00:00
if self.id < from.id {
selflock = self.mbox.write().await;
fromlock = from.mbox.write().await;
} else {
fromlock = from.mbox.write().await;
selflock = self.mbox.write().await;
};
selflock.copy_from(&fromlock, uuid).await
2022-06-29 18:00:38 +00:00
}
/// Move an email from an other Mailbox to this mailbox
/// (use this when possible, as it allows for a certain number of storage optimizations)
2023-05-15 16:23:23 +00:00
#[allow(dead_code)]
2022-06-30 10:07:01 +00:00
pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> {
if self.id == from.id {
bail!("Cannot copy move same mailbox");
}
let (mut selflock, mut fromlock);
if self.id < from.id {
selflock = self.mbox.write().await;
fromlock = from.mbox.write().await;
} else {
fromlock = from.mbox.write().await;
selflock = self.mbox.write().await;
};
selflock.move_from(&mut fromlock, uuid).await
2022-06-29 11:16:58 +00:00
}
}
// ----
// Non standard but common flags:
// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml
struct MailboxInternal {
2023-05-15 16:23:23 +00:00
// 2023-05-15 will probably be used later.
#[allow(dead_code)]
id: UniqueIdent,
mail_path: String,
encryption_key: Key,
2022-06-29 11:16:58 +00:00
2023-11-02 10:51:03 +00:00
k2v: RowStore,
s3: BlobStore,
uid_index: Bayou<UidIndex>,
}
2022-06-29 11:41:05 +00:00
impl MailboxInternal {
async fn force_sync(&mut self) -> Result<()> {
2022-06-29 18:00:38 +00:00
self.uid_index.sync().await?;
Ok(())
}
async fn opportunistic_sync(&mut self) -> Result<()> {
self.uid_index.opportunistic_sync().await?;
Ok(())
}
2022-06-30 08:59:42 +00:00
// ---- Functions for reading the mailbox ----
2022-06-29 17:24:21 +00:00
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
2023-11-02 16:25:56 +00:00
let ops = ids.iter().map(|id| (self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>();
let res_vec = self.k2v.select(storage::Selector::List(ops)).await?;
2022-06-29 11:16:58 +00:00
2022-06-29 17:24:21 +00:00
let mut meta_vec = vec![];
2023-11-02 16:25:56 +00:00
for res in res_vec.into_iter() {
let mut meta_opt = None;
2023-11-02 16:25:56 +00:00
// Resolve conflicts
for v in res.content().iter() {
match v {
2023-11-02 16:25:56 +00:00
storage::Alternative::Tombstone => (),
storage::Alternative::Value(v) => {
let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
match meta_opt.as_mut() {
None => {
meta_opt = Some(meta);
}
Some(prevmeta) => {
prevmeta.try_merge(meta)?;
}
}
}
2022-06-29 17:24:21 +00:00
}
}
if let Some(meta) = meta_opt {
meta_vec.push(meta);
} else {
bail!("No valid meta value in k2v for {:?}", res.to_ref().key());
}
2022-06-29 17:24:21 +00:00
}
Ok(meta_vec)
}
async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
2023-11-02 16:25:56 +00:00
let obj_res = self.s3.blob(&format!("{}/{}", self.mail_path, id)).fetch().await?;
let body = obj_res.content().ok_or(anyhow!("missing body"))?;
cryptoblob::open(body, message_key)
2022-06-29 17:24:21 +00:00
}
2022-06-30 08:59:42 +00:00
// ---- Functions for changing the mailbox ----
async fn add_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
let add_flag_op = self.uid_index.state().op_flag_add(ident, flags.to_vec());
self.uid_index.push(add_flag_op).await
}
async fn del_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
let del_flag_op = self.uid_index.state().op_flag_del(ident, flags.to_vec());
self.uid_index.push(del_flag_op).await
}
2022-07-13 09:00:35 +00:00
async fn set_flags(&mut self, ident: UniqueIdent, flags: &[Flag]) -> Result<()> {
let set_flag_op = self.uid_index.state().op_flag_set(ident, flags.to_vec());
self.uid_index.push(set_flag_op).await
}
2022-07-12 14:35:11 +00:00
async fn append(
&mut self,
mail: IMF<'_>,
ident: Option<UniqueIdent>,
flags: &[Flag],
) -> Result<(ImapUidvalidity, ImapUid)> {
2023-05-15 16:23:23 +00:00
let ident = ident.unwrap_or_else(gen_ident);
2022-06-29 17:24:21 +00:00
let message_key = gen_key();
futures::try_join!(
async {
// Encrypt and save mail body
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
2023-11-02 16:25:56 +00:00
self.s3.blob(&format!("{}/{}", self.mail_path, ident)).set_value(message_blob).push().await?;
2022-06-29 17:24:21 +00:00
Ok::<_, anyhow::Error>(())
},
async {
// Save mail meta
let meta = MailMeta {
internaldate: now_msec(),
2023-07-25 17:08:48 +00:00
headers: mail.parsed.raw_headers.to_vec(),
2022-06-29 17:24:21 +00:00
message_key: message_key.clone(),
rfc822_size: mail.raw.len(),
};
2022-06-30 10:07:01 +00:00
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
2023-11-02 16:25:56 +00:00
self.k2v.row(&self.mail_path, &ident.to_string()).set_value(meta_blob).push().await?;
2022-06-29 17:24:21 +00:00
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync()
2022-06-29 17:24:21 +00:00
)?;
2022-06-29 11:16:58 +00:00
2022-06-29 17:24:21 +00:00
// Add mail to Bayou mail index
2022-07-12 14:35:11 +00:00
let uid_state = self.uid_index.state();
let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec());
let uidvalidity = uid_state.uidvalidity;
let uid = match add_mail_op {
UidIndexOp::MailAdd(_, uid, _) => uid,
_ => unreachable!(),
};
2022-07-01 18:09:26 +00:00
self.uid_index.push(add_mail_op).await?;
2022-07-12 14:35:11 +00:00
Ok((uidvalidity, uid))
2022-07-01 18:09:26 +00:00
}
async fn append_from_s3<'a>(
&mut self,
mail: IMF<'a>,
ident: UniqueIdent,
2023-11-02 15:17:11 +00:00
blob_ref: storage::BlobRef,
2022-07-01 18:09:26 +00:00
message_key: Key,
) -> Result<()> {
futures::try_join!(
async {
// Copy mail body from previous location
2023-11-02 16:25:56 +00:00
let dst = self.s3.blob(&format!("{}/{}", self.mail_path, ident));
blob_ref.copy(&dst).await?;
2022-07-01 18:09:26 +00:00
Ok::<_, anyhow::Error>(())
},
async {
// Save mail meta
let meta = MailMeta {
internaldate: now_msec(),
2023-10-12 10:21:59 +00:00
headers: mail.parsed.raw_headers.to_vec(),
2022-07-01 18:09:26 +00:00
message_key: message_key.clone(),
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
2023-11-02 16:25:56 +00:00
self.k2v.row(&self.mail_path, &ident.to_string()).set_value(meta_blob).push().await?;
2022-07-01 18:09:26 +00:00
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync()
2022-07-01 18:09:26 +00:00
)?;
// Add mail to Bayou mail index
let add_mail_op = self.uid_index.state().op_mail_add(ident, vec![]);
2022-06-29 11:16:58 +00:00
self.uid_index.push(add_mail_op).await?;
2022-06-29 17:24:21 +00:00
Ok(())
}
async fn delete(&mut self, ident: UniqueIdent) -> Result<()> {
2022-06-30 10:07:01 +00:00
if !self.uid_index.state().table.contains_key(&ident) {
bail!("Cannot delete mail that doesn't exit");
}
2022-06-29 17:24:21 +00:00
let del_mail_op = self.uid_index.state().op_mail_del(ident);
self.uid_index.push(del_mail_op).await?;
futures::try_join!(
async {
// Delete mail body from S3
2023-11-02 16:25:56 +00:00
self.s3.blob(&format!("{}/{}", self.mail_path, ident)).rm().await?;
2022-06-29 17:24:21 +00:00
Ok::<_, anyhow::Error>(())
},
async {
// Delete mail meta from K2V
let sk = ident.to_string();
2023-11-02 16:25:56 +00:00
self.k2v.row(&self.mail_path, &sk).fetch().await?.to_ref().rm().await?;
2022-06-29 17:24:21 +00:00
Ok::<_, anyhow::Error>(())
}
)?;
Ok(())
}
2022-07-21 10:44:58 +00:00
async fn copy_from(
&mut self,
from: &MailboxInternal,
source_id: UniqueIdent,
) -> Result<UniqueIdent> {
2022-06-30 10:07:01 +00:00
let new_id = gen_ident();
2022-07-21 10:44:58 +00:00
self.copy_internal(from, source_id, new_id).await?;
Ok(new_id)
2022-06-30 10:07:01 +00:00
}
2023-05-15 16:23:23 +00:00
#[allow(dead_code)]
// 2023-05-15 will probably be used later
2022-06-30 10:07:01 +00:00
async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> {
self.copy_internal(from, id, id).await?;
from.delete(id).await?;
Ok(())
}
2022-06-30 10:45:22 +00:00
async fn copy_internal(
&mut self,
from: &MailboxInternal,
source_id: UniqueIdent,
new_id: UniqueIdent,
) -> Result<()> {
2023-11-02 16:25:56 +00:00
if self.encryption_key != from.encryption_key {
2022-06-30 10:45:22 +00:00
bail!("Message to be copied/moved does not belong to same account.");
}
let flags = from
.uid_index
.state()
.table
.get(&source_id)
.ok_or(anyhow!("Source mail not found"))?
.1
.clone();
2022-06-30 10:07:01 +00:00
futures::try_join!(
async {
2023-11-02 16:25:56 +00:00
let dst = self.s3.blob(&format!("{}/{}", self.mail_path, new_id));
self.s3.blob(&format!("{}/{}", from.mail_path, source_id)).copy(&dst).await?;
2022-06-30 10:07:01 +00:00
Ok::<_, anyhow::Error>(())
},
async {
// Copy mail meta in K2V
let meta = &from.fetch_meta(&[source_id]).await?[0];
let meta_blob = seal_serialize(meta, &self.encryption_key)?;
2023-11-02 16:25:56 +00:00
self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(meta_blob).push().await?;
2022-06-30 10:07:01 +00:00
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync(),
2022-06-30 10:07:01 +00:00
)?;
// Add mail to Bayou mail index
2022-06-30 10:45:22 +00:00
let add_mail_op = self.uid_index.state().op_mail_add(new_id, flags);
2022-06-30 10:07:01 +00:00
self.uid_index.push(add_mail_op).await?;
Ok(())
}
2022-06-29 11:16:58 +00:00
}
fn dump(uid_index: &Bayou<UidIndex>) {
let s = uid_index.state();
println!("---- MAILBOX STATE ----");
println!("UIDVALIDITY {}", s.uidvalidity);
println!("UIDNEXT {}", s.uidnext);
println!("INTERNALSEQ {}", s.internalseq);
for (uid, ident) in s.idx_by_uid.iter() {
println!(
"{} {} {}",
uid,
hex::encode(ident.0),
s.table.get(ident).cloned().unwrap().1.join(", ")
);
}
2023-05-15 16:23:23 +00:00
println!();
2022-06-29 11:16:58 +00:00
}
2022-06-29 17:24:21 +00:00
// ----
/// The metadata of a message that is stored in K2V
/// at pk = mail/<mailbox uuid>, sk = <message uuid>
#[derive(Serialize, Deserialize)]
pub struct MailMeta {
/// INTERNALDATE field (milliseconds since epoch)
pub internaldate: u64,
/// Headers of the message
pub headers: Vec<u8>,
/// Secret key for decrypting entire message
pub message_key: Key,
/// RFC822 size
pub rfc822_size: usize,
}
impl MailMeta {
fn try_merge(&mut self, other: Self) -> Result<()> {
if self.headers != other.headers
|| self.message_key != other.message_key
|| self.rfc822_size != other.rfc822_size
{
bail!("Conflicting MailMeta values.");
}
self.internaldate = std::cmp::max(self.internaldate, other.internaldate);
Ok(())
}
}