diff --git a/src/lmtp.rs b/src/lmtp.rs index 5ab7429..78fe28d 100644 --- a/src/lmtp.rs +++ b/src/lmtp.rs @@ -21,6 +21,7 @@ use crate::config::*; use crate::cryptoblob::*; use crate::login::*; use crate::mail::unique_ident::*; +use crate::mail::incoming::EncryptedMessage; pub struct LmtpServer { bind_addr: SocketAddr, @@ -210,41 +211,3 @@ impl Config for LmtpServer { } } -// ---- - -struct EncryptedMessage { - key: Key, - encrypted_body: Vec, -} - -impl EncryptedMessage { - fn new(body: Vec) -> Result { - let key = gen_key(); - let encrypted_body = seal(&body, &key)?; - Ok(Self { - key, - encrypted_body, - }) - } - - async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { - let s3_client = creds.storage.s3_client()?; - - let encrypted_key = - sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); - let key_header = base64::encode(&encrypted_key); - - let mut por = PutObjectRequest::default(); - por.bucket = creds.storage.bucket.clone(); - por.key = format!("incoming/{}", gen_ident().to_string()); - por.metadata = Some( - [("Message-Key".to_string(), key_header)] - .into_iter() - .collect::>(), - ); - por.body = Some(self.encrypted_body.clone().into()); - s3_client.put_object(por).await?; - - Ok(()) - } -} diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 4455c91..24c7a2a 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -1,16 +1,223 @@ +use std::collections::HashMap; use std::sync::{Arc, Weak}; use std::time::Duration; +use anyhow::Result; +use k2v_client::{CausalityToken, K2vClient, K2vValue}; +use rusoto_s3::{PutObjectRequest, S3Client, S3}; use tokio::sync::watch; +use tracing::error; -use crate::mail::unique_ident::UniqueIdent; -use crate::mail::user::User; +use crate::cryptoblob; +use crate::login::{Credentials, PublicCredentials}; +use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; +use crate::mail::unique_ident::*; +use crate::mail::user::User; +use crate::time::now_msec; -pub async fn incoming_mail_watch_process(user: Weak, rx_inbox_id: watch::Receiver>) { - while Weak::upgrade(&user).is_some() { - eprintln!("User still available"); - tokio::time::sleep(Duration::from_secs(10)).await; +const INCOMING_PK: &str = "incoming"; +const INCOMING_LOCK_SK: &str = "lock"; +const INCOMING_WATCH_SK: &str = "watch"; + +pub async fn incoming_mail_watch_process( + user: Weak, + creds: Credentials, + rx_inbox_id: watch::Receiver>, +) { + if let Err(e) = incoming_mail_watch_process_internal(user, creds, rx_inbox_id).await { + error!("Error in incoming mail watch process: {}", e); + } +} + +async fn incoming_mail_watch_process_internal( + user: Weak, + creds: Credentials, + mut rx_inbox_id: watch::Receiver>, +) -> Result<()> { + let mut lock_held = k2v_lock_loop(creds.k2v_client()?, INCOMING_PK, INCOMING_LOCK_SK); + + let k2v = creds.k2v_client()?; + let s3 = creds.s3_client()?; + + let mut inbox: Option> = None; + let mut prev_ct: Option = None; + + loop { + let new_mail = if *lock_held.borrow() { + tokio::select! { + ct = wait_new_mail(&k2v, &prev_ct) => Some(ct), + _ = tokio::time::sleep(Duration::from_secs(300)) => prev_ct.take(), + _ = lock_held.changed() => None, + _ = rx_inbox_id.changed() => None, + } + } else { + tokio::select! { + _ = lock_held.changed() => None, + _ = rx_inbox_id.changed() => None, + } + }; + + if let Some(user) = Weak::upgrade(&user) { + eprintln!("User still available"); + + // If INBOX no longer is same mailbox, open new mailbox + let inbox_id = rx_inbox_id.borrow().clone(); + if let Some((id, uidvalidity)) = inbox_id { + if Some(id) != inbox.as_ref().map(|b| b.id) { + match user.open_mailbox_by_id(id, uidvalidity).await { + Ok(mb) => { + inbox = mb; + } + Err(e) => { + inbox = None; + error!("Error when opening inbox ({}): {}", id, e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + } + } + + // If we were able to open INBOX, and we have mail (implies lock is held), + // fetch new mail + if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) { + match handle_incoming_mail(&user, &s3, inbox).await { + Ok(()) => { + prev_ct = Some(new_ct); + } + Err(e) => { + error!("Could not fetch incoming mail: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + } + } else { + eprintln!("User no longer available, exiting incoming loop."); + break; + } } drop(rx_inbox_id); + Ok(()) +} + +async fn wait_new_mail(k2v: &K2vClient, prev_ct: &Option) -> CausalityToken { + loop { + if let Some(ct) = &prev_ct { + match k2v + .poll_item(INCOMING_PK, INCOMING_WATCH_SK, ct.clone(), None) + .await + { + Err(e) => { + error!("Error when waiting for incoming watch: {}, sleeping", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + Ok(None) => continue, + Ok(Some(cv)) => { + return cv.causality; + } + } + } else { + match k2v.read_item(INCOMING_PK, INCOMING_WATCH_SK).await { + Err(k2v_client::Error::NotFound) => { + if let Err(e) = k2v + .insert_item(INCOMING_PK, INCOMING_WATCH_SK, vec![0u8], None) + .await + { + error!("Error when waiting for incoming watch: {}, sleeping", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + Err(e) => { + error!("Error when waiting for incoming watch: {}, sleeping", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + Ok(cv) => { + return cv.causality; + } + } + } + } +} + +async fn handle_incoming_mail(user: &Arc, s3: &S3Client, inbox: &Arc) -> Result<()> { + unimplemented!() +} + +fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::Receiver { + let (held_tx, held_rx) = watch::channel(false); + + tokio::spawn(async move { + if let Err(e) = k2v_lock_loop_internal(k2v, pk, sk, &held_tx).await { + error!("Error in k2v locking loop: {}", e); + } + let _ = held_tx.send(false); + }); + + held_rx +} + +async fn k2v_lock_loop_internal( + k2v: K2vClient, + pk: &'static str, + sk: &'static str, + held_tx: &watch::Sender, +) -> Result<()> { + unimplemented!() +} + +// ---- + +pub struct EncryptedMessage { + key: cryptoblob::Key, + encrypted_body: Vec, +} + +impl EncryptedMessage { + pub fn new(body: Vec) -> Result { + let key = cryptoblob::gen_key(); + let encrypted_body = cryptoblob::seal(&body, &key)?; + Ok(Self { + key, + encrypted_body, + }) + } + + pub async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { + let s3_client = creds.storage.s3_client()?; + let k2v_client = creds.storage.k2v_client()?; + + // Get causality token of previous watch key + let watch_ct = match k2v_client.read_item(INCOMING_PK, INCOMING_WATCH_SK).await { + Err(_) => None, + Ok(cv) => Some(cv.causality), + }; + + // Write mail to encrypted storage + let encrypted_key = + sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); + let key_header = base64::encode(&encrypted_key); + + let mut por = PutObjectRequest::default(); + por.bucket = creds.storage.bucket.clone(); + por.key = format!("incoming/{}", gen_ident().to_string()); + por.metadata = Some( + [("Message-Key".to_string(), key_header)] + .into_iter() + .collect::>(), + ); + por.body = Some(self.encrypted_body.clone().into()); + s3_client.put_object(por).await?; + + // Update watch key to signal new mail + k2v_client + .insert_item( + INCOMING_PK, + INCOMING_WATCH_SK, + gen_ident().0.to_vec(), + watch_ct, + ) + .await?; + + Ok(()) + } } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index c4fa435..35b681a 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -17,7 +17,7 @@ use crate::mail::IMF; use crate::time::now_msec; pub struct Mailbox { - id: UniqueIdent, + pub(super) id: UniqueIdent, mbox: RwLock, } diff --git a/src/mail/mod.rs b/src/mail/mod.rs index 94a1712..b6054b0 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -1,10 +1,10 @@ use std::convert::TryFrom; +pub mod incoming; pub mod mailbox; pub mod uidindex; pub mod unique_ident; pub mod user; -pub mod incoming; // Internet Message Format // aka RFC 822 - RFC 2822 - RFC 5322 diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs index 736c763..7166ea7 100644 --- a/src/mail/uidindex.rs +++ b/src/mail/uidindex.rs @@ -163,7 +163,8 @@ impl BayouState for UidIndex { } } UidIndexOp::BumpUidvalidity(count) => { - new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count).unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap()); + new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count) + .unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap()); } } new diff --git a/src/mail/user.rs b/src/mail/user.rs index fefb084..c2d1d85 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -9,10 +9,10 @@ use tokio::sync::watch; use crate::cryptoblob::{open_deserialize, seal_serialize}; use crate::login::{Credentials, StorageCredentials}; +use crate::mail::incoming::incoming_mail_watch_process; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; use crate::mail::unique_ident::{gen_ident, UniqueIdent}; -use crate::mail::incoming::incoming_mail_watch_process; use crate::time::now_msec; const MAILBOX_HIERARCHY_DELIMITER: &str = "/"; @@ -119,7 +119,7 @@ impl User { let user = Arc::new(Self { username, - creds, + creds: creds.clone(), k2v, tx_inbox_id, mailboxes: std::sync::Mutex::new(HashMap::new()), @@ -128,12 +128,16 @@ impl User { // Ensure INBOX exists (done inside load_mailbox_list) user.load_mailbox_list().await?; - tokio::spawn(incoming_mail_watch_process(Arc::downgrade(&user), rx_inbox_id)); + tokio::spawn(incoming_mail_watch_process( + Arc::downgrade(&user), + user.creds.clone(), + rx_inbox_id, + )); Ok(user) } - async fn open_mailbox_by_id( + pub(super) async fn open_mailbox_by_id( &self, id: UniqueIdent, min_uidvalidity: ImapUidvalidity, @@ -167,15 +171,15 @@ impl User { let mut list = BTreeMap::new(); for v in cv.value { if let K2vValue::Value(vbytes) = v { - let list2 = open_deserialize::(&vbytes, &self.creds.keys.master)?; + let list2 = + open_deserialize::(&vbytes, &self.creds.keys.master)?; list = merge_mailbox_lists(list, list2); } } (list, Some(cv.causality)) - }, + } }; - // If INBOX doesn't exist, create a new mailbox with that name // and save new mailbox list. // Also, ensure that the mpsc::watch that keeps track of the