diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index d87456f..86cbc07 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::convert::TryFrom; use std::pin::Pin; use std::sync::{Arc, Weak}; use std::time::Duration; @@ -6,7 +7,10 @@ use std::time::Duration; use anyhow::{anyhow, bail, Result}; use futures::{future::BoxFuture, Future, FutureExt}; use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue}; -use rusoto_s3::{PutObjectRequest, S3Client, S3}; +use rusoto_s3::{ + GetObjectRequest, HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, +}; +use tokio::io::AsyncReadExt; use tokio::sync::watch; use tracing::{error, info, warn}; @@ -16,12 +20,15 @@ use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; use crate::mail::unique_ident::*; use crate::mail::user::User; +use crate::mail::IMF; use crate::time::now_msec; const INCOMING_PK: &str = "incoming"; const INCOMING_LOCK_SK: &str = "lock"; const INCOMING_WATCH_SK: &str = "watch"; +const MESSAGE_KEY: &str = "Message-Key"; + // When a lock is held, it is held for LOCK_DURATION (here 5 minutes) // It is renewed every LOCK_DURATION/3 // If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we @@ -113,10 +120,10 @@ async fn incoming_mail_watch_process_internal( } } - // If we were able to open INBOX, and we have mail (implies lock is held), + // If we were able to open INBOX, and we have mail, // fetch new mail if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) { - match handle_incoming_mail(&user, &s3, inbox).await { + match handle_incoming_mail(&user, &s3, inbox, &lock_held).await { Ok(()) => { prev_ct = Some(new_ct); } @@ -131,8 +138,84 @@ async fn incoming_mail_watch_process_internal( Ok(()) } -async fn handle_incoming_mail(user: &Arc, s3: &S3Client, inbox: &Arc) -> Result<()> { - unimplemented!() +async fn handle_incoming_mail( + user: &Arc, + s3: &S3Client, + inbox: &Arc, + lock_held: &watch::Receiver, +) -> Result<()> { + let mut lor = ListObjectsV2Request::default(); + lor.bucket = user.creds.storage.bucket.clone(); + lor.max_keys = Some(1000); + lor.prefix = Some("incoming/".into()); + let mails_res = s3.list_objects_v2(lor).await?; + + for object in mails_res.contents.unwrap_or_default() { + if !*lock_held.borrow() { + break; + } + if let Some(key) = object.key { + if let Some(mail_id) = key.strip_prefix("incoming/") { + if let Ok(mail_id) = mail_id.parse::() { + move_incoming_message(user, s3, inbox, mail_id).await?; + } + } + } + } + + Ok(()) +} + +async fn move_incoming_message( + user: &Arc, + s3: &S3Client, + inbox: &Arc, + id: UniqueIdent, +) -> Result<()> { + info!("Moving incoming message: {}", id); + + let object_key = format!("incoming/{}", id); + + // 1. Fetch message from S3 + let mut gor = GetObjectRequest::default(); + gor.bucket = user.creds.storage.bucket.clone(); + gor.key = object_key.clone(); + let get_result = s3.get_object(gor).await?; + + // 1.a decrypt message key from headers + let key_encrypted_b64 = get_result + .metadata + .as_ref() + .ok_or(anyhow!("Missing key in metadata"))? + .get(MESSAGE_KEY) + .ok_or(anyhow!("Missing key in metadata"))?; + let key_encrypted = base64::decode(key_encrypted_b64)?; + let message_key = sodiumoxide::crypto::sealedbox::open( + &key_encrypted, + &user.creds.keys.public, + &user.creds.keys.secret, + ) + .map_err(|_| anyhow!("Cannot decrypt message key"))?; + let message_key = + cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; + + // 1.b retrieve message body + let obj_body = get_result.body.ok_or(anyhow!("Missing object body"))?; + let mut mail_buf = Vec::with_capacity(get_result.content_length.unwrap_or(128) as usize); + obj_body + .into_async_read() + .read_to_end(&mut mail_buf) + .await?; + let plain_mail = cryptoblob::open(&mail_buf, &message_key) + .map_err(|_| anyhow!("Cannot decrypt email content"))?; + + // 2 parse mail and add to inbox + let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?; + inbox + .append_from_s3(msg, id, &object_key, message_key) + .await?; + + Ok(()) } // ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- @@ -307,7 +390,6 @@ async fn k2v_lock_loop_internal( if let Some(ct) = release { let _ = k2v.delete_item(pk, sk, ct.clone()).await; } - } // ---- UTIL: function to wait for a value to have changed in K2V ---- @@ -372,7 +454,7 @@ impl EncryptedMessage { por.bucket = creds.storage.bucket.clone(); por.key = format!("incoming/{}", gen_ident().to_string()); por.metadata = Some( - [("Message-Key".to_string(), key_header)] + [(MESSAGE_KEY.to_string(), key_header)] .into_iter() .collect::>(), ); diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 35b681a..e63f712 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -98,6 +98,21 @@ impl Mailbox { self.mbox.write().await.append(msg, ident).await } + /// Insert an email into the mailbox, copying it from an existing S3 object + pub async fn append_from_s3<'a>( + &self, + msg: IMF<'a>, + ident: UniqueIdent, + s3_key: &str, + message_key: Key, + ) -> Result<()> { + self.mbox + .write() + .await + .append_from_s3(msg, ident, s3_key, message_key) + .await + } + /// Delete a message definitively from the mailbox pub async fn delete<'a>(&self, id: UniqueIdent) -> Result<()> { self.mbox.write().await.delete(id).await @@ -279,6 +294,50 @@ impl MailboxInternal { Ok(()) } + async fn append_from_s3<'a>( + &mut self, + mail: IMF<'a>, + ident: UniqueIdent, + s3_key: &str, + message_key: Key, + ) -> Result<()> { + futures::try_join!( + async { + // Copy mail body from previous location + let mut cor = CopyObjectRequest::default(); + cor.bucket = self.bucket.clone(); + cor.key = format!("{}/{}", self.mail_path, ident); + cor.copy_source = format!("{}/{}", self.bucket, s3_key); + cor.metadata_directive = Some("REPLACE".into()); + self.s3.copy_object(cor).await?; + Ok::<_, anyhow::Error>(()) + }, + async { + // Save mail meta + let meta = MailMeta { + internaldate: now_msec(), + headers: mail.raw[..mail.parsed.offset_body].to_vec(), + message_key: message_key.clone(), + rfc822_size: mail.raw.len(), + }; + let meta_blob = seal_serialize(&meta, &self.encryption_key)?; + self.k2v + .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) + .await?; + Ok::<_, anyhow::Error>(()) + } + )?; + + // Add mail to Bayou mail index + let add_mail_op = self + .uid_index + .state() + .op_mail_add(ident, vec!["\\Unseen".into()]); + self.uid_index.push(add_mail_op).await?; + + Ok(()) + } + async fn delete(&mut self, ident: UniqueIdent) -> Result<()> { if !self.uid_index.state().table.contains_key(&ident) { bail!("Cannot delete mail that doesn't exit");