Code to add mail to inbox

This commit is contained in:
Alex 2022-07-01 20:09:26 +02:00
parent 14b420d9df
commit 05eb2e050b
Signed by: lx
GPG key ID: 0E496D15096376BE
2 changed files with 148 additions and 7 deletions

View file

@ -1,4 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryFrom;
use std::pin::Pin; use std::pin::Pin;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::time::Duration; use std::time::Duration;
@ -6,7 +7,10 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use futures::{future::BoxFuture, Future, FutureExt}; use futures::{future::BoxFuture, Future, FutureExt};
use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue}; use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue};
use rusoto_s3::{PutObjectRequest, S3Client, S3}; use rusoto_s3::{
GetObjectRequest, HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
};
use tokio::io::AsyncReadExt;
use tokio::sync::watch; use tokio::sync::watch;
use tracing::{error, info, warn}; use tracing::{error, info, warn};
@ -16,12 +20,15 @@ use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity; use crate::mail::uidindex::ImapUidvalidity;
use crate::mail::unique_ident::*; use crate::mail::unique_ident::*;
use crate::mail::user::User; use crate::mail::user::User;
use crate::mail::IMF;
use crate::time::now_msec; use crate::time::now_msec;
const INCOMING_PK: &str = "incoming"; const INCOMING_PK: &str = "incoming";
const INCOMING_LOCK_SK: &str = "lock"; const INCOMING_LOCK_SK: &str = "lock";
const INCOMING_WATCH_SK: &str = "watch"; const INCOMING_WATCH_SK: &str = "watch";
const MESSAGE_KEY: &str = "Message-Key";
// When a lock is held, it is held for LOCK_DURATION (here 5 minutes) // When a lock is held, it is held for LOCK_DURATION (here 5 minutes)
// It is renewed every LOCK_DURATION/3 // It is renewed every LOCK_DURATION/3
// If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we // If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we
@ -113,10 +120,10 @@ async fn incoming_mail_watch_process_internal(
} }
} }
// If we were able to open INBOX, and we have mail (implies lock is held), // If we were able to open INBOX, and we have mail,
// fetch new mail // fetch new mail
if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) { if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) {
match handle_incoming_mail(&user, &s3, inbox).await { match handle_incoming_mail(&user, &s3, inbox, &lock_held).await {
Ok(()) => { Ok(()) => {
prev_ct = Some(new_ct); prev_ct = Some(new_ct);
} }
@ -131,8 +138,84 @@ async fn incoming_mail_watch_process_internal(
Ok(()) Ok(())
} }
async fn handle_incoming_mail(user: &Arc<User>, s3: &S3Client, inbox: &Arc<Mailbox>) -> Result<()> { async fn handle_incoming_mail(
unimplemented!() user: &Arc<User>,
s3: &S3Client,
inbox: &Arc<Mailbox>,
lock_held: &watch::Receiver<bool>,
) -> Result<()> {
let mut lor = ListObjectsV2Request::default();
lor.bucket = user.creds.storage.bucket.clone();
lor.max_keys = Some(1000);
lor.prefix = Some("incoming/".into());
let mails_res = s3.list_objects_v2(lor).await?;
for object in mails_res.contents.unwrap_or_default() {
if !*lock_held.borrow() {
break;
}
if let Some(key) = object.key {
if let Some(mail_id) = key.strip_prefix("incoming/") {
if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
move_incoming_message(user, s3, inbox, mail_id).await?;
}
}
}
}
Ok(())
}
async fn move_incoming_message(
user: &Arc<User>,
s3: &S3Client,
inbox: &Arc<Mailbox>,
id: UniqueIdent,
) -> Result<()> {
info!("Moving incoming message: {}", id);
let object_key = format!("incoming/{}", id);
// 1. Fetch message from S3
let mut gor = GetObjectRequest::default();
gor.bucket = user.creds.storage.bucket.clone();
gor.key = object_key.clone();
let get_result = s3.get_object(gor).await?;
// 1.a decrypt message key from headers
let key_encrypted_b64 = get_result
.metadata
.as_ref()
.ok_or(anyhow!("Missing key in metadata"))?
.get(MESSAGE_KEY)
.ok_or(anyhow!("Missing key in metadata"))?;
let key_encrypted = base64::decode(key_encrypted_b64)?;
let message_key = sodiumoxide::crypto::sealedbox::open(
&key_encrypted,
&user.creds.keys.public,
&user.creds.keys.secret,
)
.map_err(|_| anyhow!("Cannot decrypt message key"))?;
let message_key =
cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
// 1.b retrieve message body
let obj_body = get_result.body.ok_or(anyhow!("Missing object body"))?;
let mut mail_buf = Vec::with_capacity(get_result.content_length.unwrap_or(128) as usize);
obj_body
.into_async_read()
.read_to_end(&mut mail_buf)
.await?;
let plain_mail = cryptoblob::open(&mail_buf, &message_key)
.map_err(|_| anyhow!("Cannot decrypt email content"))?;
// 2 parse mail and add to inbox
let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?;
inbox
.append_from_s3(msg, id, &object_key, message_key)
.await?;
Ok(())
} }
// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- // ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
@ -307,7 +390,6 @@ async fn k2v_lock_loop_internal(
if let Some(ct) = release { if let Some(ct) = release {
let _ = k2v.delete_item(pk, sk, ct.clone()).await; let _ = k2v.delete_item(pk, sk, ct.clone()).await;
} }
} }
// ---- UTIL: function to wait for a value to have changed in K2V ---- // ---- UTIL: function to wait for a value to have changed in K2V ----
@ -372,7 +454,7 @@ impl EncryptedMessage {
por.bucket = creds.storage.bucket.clone(); por.bucket = creds.storage.bucket.clone();
por.key = format!("incoming/{}", gen_ident().to_string()); por.key = format!("incoming/{}", gen_ident().to_string());
por.metadata = Some( por.metadata = Some(
[("Message-Key".to_string(), key_header)] [(MESSAGE_KEY.to_string(), key_header)]
.into_iter() .into_iter()
.collect::<HashMap<_, _>>(), .collect::<HashMap<_, _>>(),
); );

View file

@ -98,6 +98,21 @@ impl Mailbox {
self.mbox.write().await.append(msg, ident).await self.mbox.write().await.append(msg, ident).await
} }
/// Insert an email into the mailbox, copying it from an existing S3 object
pub async fn append_from_s3<'a>(
&self,
msg: IMF<'a>,
ident: UniqueIdent,
s3_key: &str,
message_key: Key,
) -> Result<()> {
self.mbox
.write()
.await
.append_from_s3(msg, ident, s3_key, message_key)
.await
}
/// Delete a message definitively from the mailbox /// Delete a message definitively from the mailbox
pub async fn delete<'a>(&self, id: UniqueIdent) -> Result<()> { pub async fn delete<'a>(&self, id: UniqueIdent) -> Result<()> {
self.mbox.write().await.delete(id).await self.mbox.write().await.delete(id).await
@ -279,6 +294,50 @@ impl MailboxInternal {
Ok(()) Ok(())
} }
async fn append_from_s3<'a>(
&mut self,
mail: IMF<'a>,
ident: UniqueIdent,
s3_key: &str,
message_key: Key,
) -> Result<()> {
futures::try_join!(
async {
// Copy mail body from previous location
let mut cor = CopyObjectRequest::default();
cor.bucket = self.bucket.clone();
cor.key = format!("{}/{}", self.mail_path, ident);
cor.copy_source = format!("{}/{}", self.bucket, s3_key);
cor.metadata_directive = Some("REPLACE".into());
self.s3.copy_object(cor).await?;
Ok::<_, anyhow::Error>(())
},
async {
// Save mail meta
let meta = MailMeta {
internaldate: now_msec(),
headers: mail.raw[..mail.parsed.offset_body].to_vec(),
message_key: message_key.clone(),
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
self.k2v
.insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
.await?;
Ok::<_, anyhow::Error>(())
}
)?;
// Add mail to Bayou mail index
let add_mail_op = self
.uid_index
.state()
.op_mail_add(ident, vec!["\\Unseen".into()]);
self.uid_index.push(add_mail_op).await?;
Ok(())
}
async fn delete(&mut self, ident: UniqueIdent) -> Result<()> { async fn delete(&mut self, ident: UniqueIdent) -> Result<()> {
if !self.uid_index.state().table.contains_key(&ident) { if !self.uid_index.state().table.contains_key(&ident) {
bail!("Cannot delete mail that doesn't exit"); bail!("Cannot delete mail that doesn't exit");