2022-06-29 17:24:21 +00:00
|
|
|
use anyhow::{anyhow, bail, Result};
|
2022-06-29 11:16:58 +00:00
|
|
|
use k2v_client::K2vClient;
|
2022-06-29 17:24:21 +00:00
|
|
|
use k2v_client::{BatchReadOp, Filter, K2vValue};
|
2022-06-29 18:00:38 +00:00
|
|
|
use rusoto_s3::{DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3};
|
2022-06-29 17:24:21 +00:00
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
use tokio::io::AsyncReadExt;
|
2022-06-29 13:39:54 +00:00
|
|
|
use tokio::sync::RwLock;
|
2022-06-29 11:16:58 +00:00
|
|
|
|
|
|
|
use crate::bayou::Bayou;
|
2022-06-29 17:27:32 +00:00
|
|
|
use crate::cryptoblob::{self, gen_key, open_deserialize, Key};
|
2022-06-29 11:16:58 +00:00
|
|
|
use crate::login::Credentials;
|
|
|
|
use crate::mail::uidindex::*;
|
2022-06-29 13:52:09 +00:00
|
|
|
use crate::mail::unique_ident::*;
|
2022-06-29 11:16:58 +00:00
|
|
|
use crate::mail::IMF;
|
2022-06-29 17:24:21 +00:00
|
|
|
use crate::time::now_msec;
|
2022-06-29 11:16:58 +00:00
|
|
|
|
2022-06-29 13:52:09 +00:00
|
|
|
pub struct Mailbox {
|
|
|
|
id: UniqueIdent,
|
|
|
|
mbox: RwLock<MailboxInternal>,
|
2022-06-29 11:16:58 +00:00
|
|
|
}
|
2022-06-29 11:41:05 +00:00
|
|
|
|
2022-06-29 11:16:58 +00:00
|
|
|
impl Mailbox {
|
2022-06-29 13:52:09 +00:00
|
|
|
pub(super) async fn open(creds: &Credentials, id: UniqueIdent) -> Result<Self> {
|
|
|
|
let index_path = format!("index/{}", id);
|
|
|
|
let mail_path = format!("mail/{}", id);
|
2022-06-29 11:16:58 +00:00
|
|
|
|
2022-06-29 13:39:54 +00:00
|
|
|
let mut uid_index = Bayou::<UidIndex>::new(creds, index_path)?;
|
|
|
|
uid_index.sync().await?;
|
|
|
|
|
2022-06-29 13:52:09 +00:00
|
|
|
let mbox = RwLock::new(MailboxInternal {
|
|
|
|
id,
|
2022-06-29 11:16:58 +00:00
|
|
|
bucket: creds.bucket().to_string(),
|
2022-06-29 13:52:09 +00:00
|
|
|
encryption_key: creds.keys.master.clone(),
|
2022-06-29 11:16:58 +00:00
|
|
|
k2v: creds.k2v_client()?,
|
|
|
|
s3: creds.s3_client()?,
|
|
|
|
uid_index,
|
|
|
|
mail_path,
|
2022-06-29 13:52:09 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
Ok(Self { id, mbox })
|
2022-06-29 11:16:58 +00:00
|
|
|
}
|
|
|
|
|
2022-06-29 15:58:31 +00:00
|
|
|
/// Sync data with backing store
|
|
|
|
pub async fn sync(&self) -> Result<()> {
|
2022-06-29 18:00:38 +00:00
|
|
|
self.mbox.write().await.sync().await
|
2022-06-29 15:58:31 +00:00
|
|
|
}
|
|
|
|
|
2022-06-29 13:39:54 +00:00
|
|
|
/// Get a clone of the current UID Index of this mailbox
|
|
|
|
/// (cloning is cheap so don't hesitate to use this)
|
|
|
|
pub async fn current_uid_index(&self) -> UidIndex {
|
2022-06-29 13:52:09 +00:00
|
|
|
self.mbox.read().await.uid_index.state().clone()
|
2022-06-29 11:16:58 +00:00
|
|
|
}
|
|
|
|
|
2022-06-29 11:41:05 +00:00
|
|
|
/// Insert an email in the mailbox
|
2022-06-29 17:24:21 +00:00
|
|
|
pub async fn append<'a>(&self, msg: IMF<'a>) -> Result<()> {
|
|
|
|
self.mbox.write().await.append(msg, None).await
|
2022-06-29 11:16:58 +00:00
|
|
|
}
|
|
|
|
|
2022-06-29 11:41:05 +00:00
|
|
|
/// Copy an email from an other Mailbox to this mailbox
|
|
|
|
/// (use this when possible, as it allows for a certain number of storage optimizations)
|
2022-06-29 18:00:38 +00:00
|
|
|
pub async fn copy_from(&self, _from: &Mailbox, _uuid: UniqueIdent) -> Result<()> {
|
|
|
|
unimplemented!()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Move an email from an other Mailbox to this mailbox
|
|
|
|
/// (use this when possible, as it allows for a certain number of storage optimizations)
|
|
|
|
pub async fn move_from(&self, _from: &Mailbox, _uuid: UniqueIdent) -> Result<()> {
|
2022-06-29 11:41:05 +00:00
|
|
|
unimplemented!()
|
2022-06-29 11:16:58 +00:00
|
|
|
}
|
|
|
|
|
2022-06-29 17:24:21 +00:00
|
|
|
/// Fetch the metadata (headers + some more info) of the specified
|
|
|
|
/// mail IDs
|
|
|
|
pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
|
|
|
|
self.mbox.read().await.fetch_meta(ids).await
|
2022-06-29 11:16:58 +00:00
|
|
|
}
|
|
|
|
|
2022-06-29 17:24:21 +00:00
|
|
|
/// Fetch an entire e-mail
|
|
|
|
pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
|
|
|
|
self.mbox.read().await.fetch_full(id, message_key).await
|
2022-06-29 11:16:58 +00:00
|
|
|
}
|
2022-06-29 14:04:11 +00:00
|
|
|
|
|
|
|
/// Test procedure TODO WILL REMOVE THIS
|
|
|
|
pub async fn test(&self) -> Result<()> {
|
|
|
|
self.mbox.write().await.test().await
|
|
|
|
}
|
2022-06-29 13:39:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ----
|
|
|
|
|
|
|
|
// Non standard but common flags:
|
|
|
|
// https://www.iana.org/assignments/imap-jmap-keywords/imap-jmap-keywords.xhtml
|
|
|
|
struct MailboxInternal {
|
2022-06-29 13:52:09 +00:00
|
|
|
id: UniqueIdent,
|
2022-06-29 13:39:54 +00:00
|
|
|
bucket: String,
|
2022-06-29 13:52:09 +00:00
|
|
|
mail_path: String,
|
|
|
|
encryption_key: Key,
|
2022-06-29 11:16:58 +00:00
|
|
|
|
2022-06-29 13:39:54 +00:00
|
|
|
k2v: K2vClient,
|
|
|
|
s3: S3Client,
|
|
|
|
|
|
|
|
uid_index: Bayou<UidIndex>,
|
|
|
|
}
|
2022-06-29 11:41:05 +00:00
|
|
|
|
2022-06-29 13:39:54 +00:00
|
|
|
impl MailboxInternal {
|
2022-06-29 18:00:38 +00:00
|
|
|
async fn sync(&mut self) -> Result<()> {
|
|
|
|
self.uid_index.sync().await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-06-29 17:24:21 +00:00
|
|
|
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
|
|
|
|
let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
|
|
|
|
let ops = ids
|
|
|
|
.iter()
|
|
|
|
.map(|id| BatchReadOp {
|
|
|
|
partition_key: &self.mail_path,
|
|
|
|
filter: Filter {
|
|
|
|
start: Some(id),
|
|
|
|
end: None,
|
|
|
|
prefix: None,
|
|
|
|
limit: None,
|
|
|
|
reverse: false,
|
|
|
|
},
|
|
|
|
single_item: true,
|
|
|
|
conflicts_only: false,
|
|
|
|
tombstones: false,
|
|
|
|
})
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
let res_vec = self.k2v.read_batch(&ops).await?;
|
2022-06-29 11:16:58 +00:00
|
|
|
|
2022-06-29 17:24:21 +00:00
|
|
|
let mut meta_vec = vec![];
|
|
|
|
for res in res_vec {
|
|
|
|
if res.items.len() != 1 {
|
|
|
|
bail!("Expected 1 item, got {}", res.items.len());
|
|
|
|
}
|
|
|
|
let (_, cv) = res.items.iter().next().unwrap();
|
|
|
|
if cv.value.len() != 1 {
|
|
|
|
bail!("Expected 1 value, got {}", cv.value.len());
|
|
|
|
}
|
|
|
|
match &cv.value[0] {
|
|
|
|
K2vValue::Tombstone => bail!("Expected value, got tombstone"),
|
|
|
|
K2vValue::Value(v) => {
|
|
|
|
let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
|
|
|
|
meta_vec.push(meta);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(meta_vec)
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
|
|
|
|
let mut gor = GetObjectRequest::default();
|
|
|
|
gor.bucket = self.bucket.clone();
|
|
|
|
gor.key = format!("{}/{}", self.mail_path, id);
|
|
|
|
|
|
|
|
let obj_res = self.s3.get_object(gor).await?;
|
|
|
|
|
|
|
|
let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
|
|
|
|
let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
|
|
|
|
obj_body.into_async_read().read_to_end(&mut buf).await?;
|
|
|
|
|
|
|
|
Ok(cryptoblob::open(&buf, &message_key)?)
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn append(&mut self, mail: IMF<'_>, ident: Option<UniqueIdent>) -> Result<()> {
|
|
|
|
let ident = ident.unwrap_or_else(|| gen_ident());
|
|
|
|
let message_key = gen_key();
|
|
|
|
|
|
|
|
futures::try_join!(
|
|
|
|
async {
|
|
|
|
// Encrypt and save mail body
|
|
|
|
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
|
|
|
|
let mut por = PutObjectRequest::default();
|
|
|
|
por.bucket = self.bucket.clone();
|
|
|
|
por.key = format!("{}/{}", self.mail_path, ident);
|
|
|
|
por.body = Some(message_blob.into());
|
|
|
|
self.s3.put_object(por).await?;
|
|
|
|
Ok::<_, anyhow::Error>(())
|
|
|
|
},
|
|
|
|
async {
|
|
|
|
// Save mail meta
|
|
|
|
let meta = MailMeta {
|
|
|
|
internaldate: now_msec(),
|
|
|
|
headers: mail.raw[..mail.parsed.offset_body].to_vec(),
|
|
|
|
message_key: message_key.clone(),
|
|
|
|
rfc822_size: mail.raw.len(),
|
|
|
|
};
|
|
|
|
let meta_blob = cryptoblob::seal_serialize(&meta, &self.encryption_key)?;
|
|
|
|
self.k2v
|
|
|
|
.insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
|
|
|
|
.await?;
|
|
|
|
Ok::<_, anyhow::Error>(())
|
|
|
|
}
|
|
|
|
)?;
|
2022-06-29 11:16:58 +00:00
|
|
|
|
2022-06-29 17:24:21 +00:00
|
|
|
// Add mail to Bayou mail index
|
2022-06-29 11:16:58 +00:00
|
|
|
let add_mail_op = self
|
|
|
|
.uid_index
|
|
|
|
.state()
|
2022-06-29 17:24:21 +00:00
|
|
|
.op_mail_add(ident, vec!["\\Unseen".into()]);
|
2022-06-29 11:16:58 +00:00
|
|
|
self.uid_index.push(add_mail_op).await?;
|
|
|
|
|
2022-06-29 17:24:21 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn delete(&mut self, ident: UniqueIdent) -> Result<()> {
|
|
|
|
let del_mail_op = self.uid_index.state().op_mail_del(ident);
|
|
|
|
self.uid_index.push(del_mail_op).await?;
|
|
|
|
|
|
|
|
futures::try_join!(
|
|
|
|
async {
|
|
|
|
// Delete mail body from S3
|
|
|
|
let mut dor = DeleteObjectRequest::default();
|
|
|
|
dor.bucket = self.bucket.clone();
|
|
|
|
dor.key = format!("{}/{}", self.mail_path, ident);
|
|
|
|
self.s3.delete_object(dor).await?;
|
|
|
|
Ok::<_, anyhow::Error>(())
|
|
|
|
},
|
|
|
|
async {
|
|
|
|
// Delete mail meta from K2V
|
|
|
|
let sk = ident.to_string();
|
|
|
|
let v = self.k2v.read_item(&self.mail_path, &sk).await?;
|
|
|
|
self.k2v
|
|
|
|
.delete_item(&self.mail_path, &sk, v.causality)
|
|
|
|
.await?;
|
|
|
|
Ok::<_, anyhow::Error>(())
|
|
|
|
}
|
|
|
|
)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
// ----
|
|
|
|
|
|
|
|
async fn test(&mut self) -> Result<()> {
|
|
|
|
self.uid_index.sync().await?;
|
|
|
|
|
|
|
|
dump(&self.uid_index);
|
|
|
|
|
|
|
|
let mail = br#"From: Garage team <garagehq@deuxfleurs.fr>
|
|
|
|
Subject: Welcome to Aerogramme!!
|
|
|
|
|
2022-06-29 18:10:42 +00:00
|
|
|
This is just a test email, feel free to ignore.
|
|
|
|
"#;
|
2022-06-29 17:24:21 +00:00
|
|
|
let mail = IMF::try_from(&mail[..]).unwrap();
|
|
|
|
self.append(mail, None).await?;
|
|
|
|
|
2022-06-29 11:16:58 +00:00
|
|
|
dump(&self.uid_index);
|
|
|
|
|
|
|
|
if self.uid_index.state().idx_by_uid.len() > 6 {
|
|
|
|
for i in 0..2 {
|
|
|
|
let (_, ident) = self
|
|
|
|
.uid_index
|
|
|
|
.state()
|
|
|
|
.idx_by_uid
|
|
|
|
.iter()
|
|
|
|
.skip(3 + i)
|
|
|
|
.next()
|
|
|
|
.unwrap();
|
2022-06-29 17:24:21 +00:00
|
|
|
|
|
|
|
self.delete(*ident).await?;
|
2022-06-29 11:16:58 +00:00
|
|
|
|
|
|
|
dump(&self.uid_index);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn dump(uid_index: &Bayou<UidIndex>) {
|
|
|
|
let s = uid_index.state();
|
|
|
|
println!("---- MAILBOX STATE ----");
|
|
|
|
println!("UIDVALIDITY {}", s.uidvalidity);
|
|
|
|
println!("UIDNEXT {}", s.uidnext);
|
|
|
|
println!("INTERNALSEQ {}", s.internalseq);
|
|
|
|
for (uid, ident) in s.idx_by_uid.iter() {
|
|
|
|
println!(
|
|
|
|
"{} {} {}",
|
|
|
|
uid,
|
|
|
|
hex::encode(ident.0),
|
|
|
|
s.table.get(ident).cloned().unwrap().1.join(", ")
|
|
|
|
);
|
|
|
|
}
|
|
|
|
println!("");
|
|
|
|
}
|
2022-06-29 17:24:21 +00:00
|
|
|
|
|
|
|
// ----
|
|
|
|
|
|
|
|
/// The metadata of a message that is stored in K2V
|
|
|
|
/// at pk = mail/<mailbox uuid>, sk = <message uuid>
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
|
|
pub struct MailMeta {
|
|
|
|
/// INTERNALDATE field (milliseconds since epoch)
|
|
|
|
pub internaldate: u64,
|
|
|
|
/// Headers of the message
|
|
|
|
pub headers: Vec<u8>,
|
|
|
|
/// Secret key for decrypting entire message
|
|
|
|
pub message_key: Key,
|
|
|
|
/// RFC822 size
|
|
|
|
pub rfc822_size: usize,
|
|
|
|
}
|