aerogramme/aero-collections/src/mail/incoming.rs

444 lines
15 KiB
Rust
Raw Normal View History

2022-06-30 14:18:08 +00:00
use std::sync::{Arc, Weak};
use std::time::Duration;
2022-07-01 17:23:29 +00:00
use anyhow::{anyhow, bail, Result};
2023-12-18 16:09:44 +00:00
use base64::Engine;
2022-07-12 13:31:29 +00:00
use futures::{future::BoxFuture, FutureExt};
2023-11-17 11:15:44 +00:00
//use tokio::io::AsyncReadExt;
2022-06-30 14:18:08 +00:00
use tokio::sync::watch;
2024-01-03 15:52:31 +00:00
use tracing::{debug, error, info, warn};
2022-06-30 14:18:08 +00:00
2024-03-08 07:43:28 +00:00
use aero_user::cryptoblob;
use aero_user::login::{Credentials, PublicCredentials};
use aero_user::storage;
use aero_bayou::timestamp::now_msec;
use crate::mail::mailbox::Mailbox;
2022-06-30 14:18:08 +00:00
use crate::mail::uidindex::ImapUidvalidity;
use crate::mail::unique_ident::*;
2024-02-27 17:33:49 +00:00
use crate::user::User;
2022-07-01 18:09:26 +00:00
use crate::mail::IMF;
const INCOMING_PK: &str = "incoming";
const INCOMING_LOCK_SK: &str = "lock";
const INCOMING_WATCH_SK: &str = "watch";
const MESSAGE_KEY: &str = "message-key";
2022-07-01 18:09:26 +00:00
2022-07-01 17:23:29 +00:00
// When a lock is held, it is held for LOCK_DURATION (here 5 minutes)
// It is renewed every LOCK_DURATION/3
// If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we
// lost the lock.
const LOCK_DURATION: Duration = Duration::from_secs(300);
2022-07-01 17:41:29 +00:00
// In addition to checking when notified, also check for new mail every 10 minutes
const MAIL_CHECK_INTERVAL: Duration = Duration::from_secs(600);
pub async fn incoming_mail_watch_process(
user: Weak<User>,
creds: Credentials,
rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
) {
if let Err(e) = incoming_mail_watch_process_internal(user, creds, rx_inbox_id).await {
error!("Error in incoming mail watch process: {}", e);
}
}
async fn incoming_mail_watch_process_internal(
user: Weak<User>,
creds: Credentials,
mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
) -> Result<()> {
2023-12-27 13:58:28 +00:00
let mut lock_held = k2v_lock_loop(
creds.storage.build().await?,
storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK),
);
2023-12-21 20:54:36 +00:00
let storage = creds.storage.build().await?;
let mut inbox: Option<Arc<Mailbox>> = None;
2023-12-18 16:09:44 +00:00
let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
2022-06-30 14:18:08 +00:00
loop {
2023-11-02 14:28:19 +00:00
let maybe_updated_incoming_key = if *lock_held.borrow() {
2024-01-03 15:52:31 +00:00
debug!("incoming lock held");
2022-06-30 18:35:27 +00:00
let wait_new_mail = async {
loop {
2023-12-27 13:58:28 +00:00
match storage.row_poll(&incoming_key).await {
2023-12-18 16:09:44 +00:00
Ok(row_val) => break row_val.row_ref,
2022-06-30 18:35:27 +00:00
Err(e) => {
error!("Error in wait_new_mail: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
}
};
tokio::select! {
2023-11-02 14:28:19 +00:00
inc_k = wait_new_mail => Some(inc_k),
2023-12-18 16:09:44 +00:00
_ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()),
2023-11-02 14:28:19 +00:00
_ = lock_held.changed() => None,
_ = rx_inbox_id.changed() => None,
}
} else {
2024-01-03 15:52:31 +00:00
debug!("incoming lock not held");
tokio::select! {
_ = lock_held.changed() => None,
_ = rx_inbox_id.changed() => None,
}
};
2022-07-01 17:41:29 +00:00
let user = match Weak::upgrade(&user) {
Some(user) => user,
None => {
2024-01-03 15:52:31 +00:00
debug!("User no longer available, exiting incoming loop.");
2022-07-01 17:41:29 +00:00
break;
}
2022-07-01 17:41:29 +00:00
};
2024-01-03 15:52:31 +00:00
debug!("User still available");
2022-07-01 17:41:29 +00:00
// If INBOX no longer is same mailbox, open new mailbox
2023-05-15 16:23:23 +00:00
let inbox_id = *rx_inbox_id.borrow();
2022-07-01 17:41:29 +00:00
if let Some((id, uidvalidity)) = inbox_id {
if Some(id) != inbox.as_ref().map(|b| b.id) {
match user.open_mailbox_by_id(id, uidvalidity).await {
Ok(mb) => {
2022-07-21 10:50:44 +00:00
inbox = Some(mb);
}
Err(e) => {
2022-07-01 17:41:29 +00:00
inbox = None;
error!("Error when opening inbox ({}): {}", id, e);
tokio::time::sleep(Duration::from_secs(30)).await;
2022-07-01 17:41:29 +00:00
continue;
}
}
}
2022-07-01 17:41:29 +00:00
}
2022-07-01 18:09:26 +00:00
// If we were able to open INBOX, and we have mail,
2022-07-01 17:41:29 +00:00
// fetch new mail
2023-11-02 14:28:19 +00:00
if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) {
2023-12-18 16:09:44 +00:00
match handle_incoming_mail(&user, &storage, inbox, &lock_held).await {
2022-07-01 17:41:29 +00:00
Ok(()) => {
2023-11-02 14:28:19 +00:00
incoming_key = updated_incoming_key;
2022-07-01 17:41:29 +00:00
}
Err(e) => {
error!("Could not fetch incoming mail: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
}
2022-06-30 14:18:08 +00:00
}
drop(rx_inbox_id);
Ok(())
}
2023-11-02 09:38:47 +00:00
async fn handle_incoming_mail(
2022-07-01 18:09:26 +00:00
user: &Arc<User>,
2023-12-18 16:09:44 +00:00
storage: &storage::Store,
2022-07-01 18:09:26 +00:00
inbox: &Arc<Mailbox>,
lock_held: &watch::Receiver<bool>,
) -> Result<()> {
2023-12-18 16:09:44 +00:00
let mails_res = storage.blob_list("incoming/").await?;
2023-11-02 14:28:19 +00:00
for object in mails_res {
2022-07-01 18:09:26 +00:00
if !*lock_held.borrow() {
break;
}
2023-12-18 16:09:44 +00:00
let key = object.0;
2023-11-02 14:28:19 +00:00
if let Some(mail_id) = key.strip_prefix("incoming/") {
if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
2023-12-18 16:09:44 +00:00
move_incoming_message(user, storage, inbox, mail_id).await?;
2022-07-01 18:09:26 +00:00
}
}
}
Ok(())
}
async fn move_incoming_message(
user: &Arc<User>,
2023-12-18 16:09:44 +00:00
storage: &storage::Store,
2022-07-01 18:09:26 +00:00
inbox: &Arc<Mailbox>,
id: UniqueIdent,
) -> Result<()> {
info!("Moving incoming message: {}", id);
let object_key = format!("incoming/{}", id);
// 1. Fetch message from S3
2023-12-18 16:09:44 +00:00
let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?;
2022-07-01 18:09:26 +00:00
// 1.a decrypt message key from headers
2023-11-02 14:28:19 +00:00
//info!("Object metadata: {:?}", get_result.metadata);
let key_encrypted_b64 = object
2023-12-18 16:09:44 +00:00
.meta
.get(MESSAGE_KEY)
2022-07-01 18:09:26 +00:00
.ok_or(anyhow!("Missing key in metadata"))?;
2023-12-18 16:09:44 +00:00
let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
2022-07-01 18:09:26 +00:00
let message_key = sodiumoxide::crypto::sealedbox::open(
&key_encrypted,
&user.creds.keys.public,
&user.creds.keys.secret,
)
.map_err(|_| anyhow!("Cannot decrypt message key"))?;
let message_key =
cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
// 1.b retrieve message body
2023-12-18 16:09:44 +00:00
let obj_body = object.value;
2023-11-02 14:28:19 +00:00
let plain_mail = cryptoblob::open(&obj_body, &message_key)
2022-07-01 18:09:26 +00:00
.map_err(|_| anyhow!("Cannot decrypt email content"))?;
// 2 parse mail and add to inbox
let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?;
inbox
2023-12-18 16:09:44 +00:00
.append_from_s3(msg, id, object.blob_ref.clone(), message_key)
2022-07-01 18:09:26 +00:00
.await?;
// 3 delete from incoming
2023-12-18 16:09:44 +00:00
storage.blob_rm(&object.blob_ref).await?;
2022-07-01 18:09:26 +00:00
Ok(())
}
2022-07-01 17:26:30 +00:00
// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
2023-12-18 16:09:44 +00:00
fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver<bool> {
let (held_tx, held_rx) = watch::channel(false);
2023-12-18 16:09:44 +00:00
tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx));
held_rx
}
2022-07-01 17:23:29 +00:00
#[derive(Clone, Debug)]
enum LockState {
Unknown,
Empty,
2023-12-18 16:09:44 +00:00
Held(UniqueIdent, u64, storage::RowRef),
2022-07-01 17:23:29 +00:00
}
async fn k2v_lock_loop_internal(
2023-12-18 16:09:44 +00:00
storage: storage::Store,
row_ref: storage::RowRef,
2022-06-30 18:35:27 +00:00
held_tx: watch::Sender<bool>,
2022-07-01 17:23:29 +00:00
) {
let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown);
let mut state_rx_2 = state_rx.clone();
2022-06-30 18:35:27 +00:00
2022-07-01 17:23:29 +00:00
let our_pid = gen_ident();
2022-06-30 18:35:27 +00:00
2022-07-01 17:23:29 +00:00
// Loop 1: watch state of lock in K2V, save that in corresponding watch channel
let watch_lock_loop: BoxFuture<Result<()>> = async {
2023-12-18 16:09:44 +00:00
let mut ct = row_ref.clone();
2022-07-01 17:23:29 +00:00
loop {
2024-01-03 15:52:31 +00:00
debug!("k2v watch lock loop iter: ct = {:?}", ct);
2023-12-18 16:09:44 +00:00
match storage.row_poll(&ct).await {
2022-07-01 17:23:29 +00:00
Err(e) => {
error!(
"Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
e
);
state_tx.send(LockState::Unknown)?;
tokio::time::sleep(Duration::from_secs(30)).await;
}
Ok(cv) => {
let mut lock_state = None;
2023-12-18 16:09:44 +00:00
for v in cv.value.iter() {
2023-11-17 11:15:44 +00:00
if let storage::Alternative::Value(vbytes) = v {
2022-07-01 17:23:29 +00:00
if vbytes.len() == 32 {
let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
let pid = UniqueIdent(vbytes[8..].try_into().unwrap());
if lock_state
.map(|(pid2, ts2)| ts > ts2 || (ts == ts2 && pid > pid2))
.unwrap_or(true)
{
lock_state = Some((pid, ts));
}
}
}
}
2023-12-18 16:09:44 +00:00
let new_ct = cv.row_ref;
2023-11-17 11:15:44 +00:00
2024-01-03 15:52:31 +00:00
debug!(
"k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
2023-11-17 11:15:44 +00:00
ct, new_ct, lock_state
);
2022-07-01 17:23:29 +00:00
state_tx.send(
lock_state
2023-12-18 16:09:44 +00:00
.map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone()))
2022-07-01 17:23:29 +00:00
.unwrap_or(LockState::Empty),
)?;
2023-11-17 11:15:44 +00:00
ct = new_ct;
2022-07-01 17:23:29 +00:00
}
2022-06-30 18:35:27 +00:00
}
2022-07-01 17:23:29 +00:00
}
}
.boxed();
2022-06-30 18:35:27 +00:00
2022-07-01 17:23:29 +00:00
// Loop 2: notify user whether we are holding the lock or not
let lock_notify_loop: BoxFuture<Result<()>> = async {
loop {
let now = now_msec();
let held_with_expiration_time = match &*state_rx.borrow_and_update() {
LockState::Held(pid, ts, _ct) if *pid == our_pid => {
let expiration_time = *ts - (LOCK_DURATION / 3).as_millis() as u64;
if now < expiration_time {
Some(expiration_time)
} else {
None
}
}
_ => None,
};
let held = held_with_expiration_time.is_some();
if held != *held_tx.borrow() {
held_tx.send(held)?;
}
2022-06-30 18:35:27 +00:00
2022-07-01 17:23:29 +00:00
let await_expired = async {
match held_with_expiration_time {
None => futures::future::pending().await,
Some(expiration_time) => {
tokio::time::sleep(Duration::from_millis(expiration_time - now)).await
2022-06-30 18:35:27 +00:00
}
2022-07-01 17:23:29 +00:00
};
};
tokio::select!(
r = state_rx.changed() => {
r?;
}
_ = held_tx.closed() => bail!("held_tx closed, don't need to hold lock anymore"),
_ = await_expired => continue,
);
}
}
.boxed();
// Loop 3: acquire lock when relevant
let take_lock_loop: BoxFuture<Result<()>> = async {
loop {
let now = now_msec();
let state: LockState = state_rx_2.borrow_and_update().clone();
let (acquire_at, ct) = match state {
LockState::Unknown => {
// If state of the lock is unknown, don't try to acquire
state_rx_2.changed().await?;
continue;
}
LockState::Empty => (now, None),
LockState::Held(pid, ts, ct) => {
if pid == our_pid {
(ts - (2 * LOCK_DURATION / 3).as_millis() as u64, Some(ct))
} else {
(ts, Some(ct))
2022-06-30 18:35:27 +00:00
}
}
2022-07-01 17:23:29 +00:00
};
// Wait until it is time to acquire lock
if acquire_at > now {
tokio::select!(
r = state_rx_2.changed() => {
// If lock state changed in the meantime, don't acquire and loop around
r?;
continue;
}
_ = tokio::time::sleep(Duration::from_millis(acquire_at - now)) => ()
);
2022-06-30 18:35:27 +00:00
}
2022-07-01 17:23:29 +00:00
// Acquire lock
let mut lock = vec![0u8; 32];
lock[..8].copy_from_slice(&u64::to_be_bytes(
now_msec() + LOCK_DURATION.as_millis() as u64,
));
2022-07-01 17:23:29 +00:00
lock[8..].copy_from_slice(&our_pid.0);
2023-11-17 11:15:44 +00:00
let row = match ct {
2023-12-18 16:09:44 +00:00
Some(existing) => existing,
None => row_ref.clone(),
2023-11-17 11:15:44 +00:00
};
2023-12-27 13:58:28 +00:00
if let Err(e) = storage
.row_insert(vec![storage::RowVal::new(row, lock)])
.await
{
2022-07-01 17:23:29 +00:00
error!("Could not take lock: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
// Wait for new information to loop back
state_rx_2.changed().await?;
2022-06-30 18:35:27 +00:00
}
}
2022-07-01 17:23:29 +00:00
.boxed();
2022-07-15 15:55:04 +00:00
let _ = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop);
2022-07-01 17:33:47 +00:00
2024-01-03 15:52:31 +00:00
debug!("lock loop exited, releasing");
2022-07-01 17:33:47 +00:00
if !held_tx.is_closed() {
2023-12-18 16:09:44 +00:00
warn!("weird...");
2022-07-01 17:33:47 +00:00
let _ = held_tx.send(false);
}
// If lock is ours, release it
let release = match &*state_rx.borrow() {
LockState::Held(pid, _, ct) if *pid == our_pid => Some(ct.clone()),
_ => None,
};
if let Some(ct) = release {
2023-12-27 13:58:09 +00:00
match storage.row_rm(&storage::Selector::Single(&ct)).await {
2023-12-18 16:09:44 +00:00
Err(e) => warn!("Unable to release lock {:?}: {}", ct, e),
Ok(_) => (),
};
2022-07-01 17:33:47 +00:00
}
2022-06-30 18:35:27 +00:00
}
2022-07-01 17:26:30 +00:00
// ---- LMTP SIDE: storing messages encrypted with user's pubkey ----
pub struct EncryptedMessage {
key: cryptoblob::Key,
encrypted_body: Vec<u8>,
}
impl EncryptedMessage {
pub fn new(body: Vec<u8>) -> Result<Self> {
let key = cryptoblob::gen_key();
let encrypted_body = cryptoblob::seal(&body, &key)?;
Ok(Self {
key,
encrypted_body,
})
}
pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
2023-12-21 20:54:36 +00:00
let storage = creds.storage.build().await?;
// Get causality token of previous watch key
2023-12-18 16:09:44 +00:00
let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await {
2023-11-17 11:15:44 +00:00
Err(_) => query,
2023-12-18 16:09:44 +00:00
Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query),
};
// Write mail to encrypted storage
let encrypted_key =
sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
2023-12-18 16:09:44 +00:00
let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key);
2023-12-18 16:09:44 +00:00
let blob_val = storage::BlobVal::new(
storage::BlobRef(format!("incoming/{}", gen_ident())),
self.encrypted_body.clone().into(),
2023-12-27 13:58:28 +00:00
)
.with_meta(MESSAGE_KEY.to_string(), key_header);
2023-12-22 18:32:07 +00:00
storage.blob_insert(blob_val).await?;
// Update watch key to signal new mail
2023-12-27 13:58:28 +00:00
let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec());
2023-12-18 16:09:44 +00:00
storage.row_insert(vec![watch_val]).await?;
Ok(())
}
2022-06-30 14:18:08 +00:00
}