2023-11-17 11:15:44 +00:00
|
|
|
//use std::collections::HashMap;
|
2022-07-01 18:09:26 +00:00
|
|
|
use std::convert::TryFrom;
|
2022-07-12 13:31:29 +00:00
|
|
|
|
2022-06-30 14:18:08 +00:00
|
|
|
use std::sync::{Arc, Weak};
|
|
|
|
use std::time::Duration;
|
|
|
|
|
2022-07-01 17:23:29 +00:00
|
|
|
use anyhow::{anyhow, bail, Result};
|
2023-12-18 16:09:44 +00:00
|
|
|
use base64::Engine;
|
2022-07-12 13:31:29 +00:00
|
|
|
use futures::{future::BoxFuture, FutureExt};
|
2023-11-17 11:15:44 +00:00
|
|
|
//use tokio::io::AsyncReadExt;
|
2022-06-30 14:18:08 +00:00
|
|
|
use tokio::sync::watch;
|
2024-01-03 15:52:31 +00:00
|
|
|
use tracing::{debug, error, info, warn};
|
2022-06-30 14:18:08 +00:00
|
|
|
|
2022-06-30 15:40:59 +00:00
|
|
|
use crate::cryptoblob;
|
|
|
|
use crate::login::{Credentials, PublicCredentials};
|
|
|
|
use crate::mail::mailbox::Mailbox;
|
2022-06-30 14:18:08 +00:00
|
|
|
use crate::mail::uidindex::ImapUidvalidity;
|
2022-06-30 15:40:59 +00:00
|
|
|
use crate::mail::unique_ident::*;
|
|
|
|
use crate::mail::user::User;
|
2022-07-01 18:09:26 +00:00
|
|
|
use crate::mail::IMF;
|
2023-11-02 14:28:19 +00:00
|
|
|
use crate::storage;
|
2023-11-15 14:56:43 +00:00
|
|
|
use crate::timestamp::now_msec;
|
2022-06-30 15:40:59 +00:00
|
|
|
|
|
|
|
const INCOMING_PK: &str = "incoming";
|
|
|
|
const INCOMING_LOCK_SK: &str = "lock";
|
|
|
|
const INCOMING_WATCH_SK: &str = "watch";
|
|
|
|
|
2022-07-04 10:44:48 +00:00
|
|
|
const MESSAGE_KEY: &str = "message-key";
|
2022-07-01 18:09:26 +00:00
|
|
|
|
2022-07-01 17:23:29 +00:00
|
|
|
// When a lock is held, it is held for LOCK_DURATION (here 5 minutes)
|
|
|
|
// It is renewed every LOCK_DURATION/3
|
|
|
|
// If we are at 2*LOCK_DURATION/3 and haven't renewed, we assume we
|
|
|
|
// lost the lock.
|
|
|
|
const LOCK_DURATION: Duration = Duration::from_secs(300);
|
|
|
|
|
2022-07-01 17:41:29 +00:00
|
|
|
// In addition to checking when notified, also check for new mail every 10 minutes
|
|
|
|
const MAIL_CHECK_INTERVAL: Duration = Duration::from_secs(600);
|
|
|
|
|
2022-06-30 15:40:59 +00:00
|
|
|
pub async fn incoming_mail_watch_process(
|
|
|
|
user: Weak<User>,
|
|
|
|
creds: Credentials,
|
|
|
|
rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
|
|
|
|
) {
|
|
|
|
if let Err(e) = incoming_mail_watch_process_internal(user, creds, rx_inbox_id).await {
|
|
|
|
error!("Error in incoming mail watch process: {}", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn incoming_mail_watch_process_internal(
|
|
|
|
user: Weak<User>,
|
|
|
|
creds: Credentials,
|
|
|
|
mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
|
|
|
|
) -> Result<()> {
|
2023-12-27 13:58:28 +00:00
|
|
|
let mut lock_held = k2v_lock_loop(
|
|
|
|
creds.storage.build().await?,
|
|
|
|
storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK),
|
|
|
|
);
|
2023-12-21 20:54:36 +00:00
|
|
|
let storage = creds.storage.build().await?;
|
2022-06-30 15:40:59 +00:00
|
|
|
|
|
|
|
let mut inbox: Option<Arc<Mailbox>> = None;
|
2023-12-18 16:09:44 +00:00
|
|
|
let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
|
2022-06-30 14:18:08 +00:00
|
|
|
|
2022-06-30 15:40:59 +00:00
|
|
|
loop {
|
2023-11-02 14:28:19 +00:00
|
|
|
let maybe_updated_incoming_key = if *lock_held.borrow() {
|
2024-01-03 15:52:31 +00:00
|
|
|
debug!("incoming lock held");
|
2022-06-30 18:35:27 +00:00
|
|
|
|
|
|
|
let wait_new_mail = async {
|
|
|
|
loop {
|
2023-12-27 13:58:28 +00:00
|
|
|
match storage.row_poll(&incoming_key).await {
|
2023-12-18 16:09:44 +00:00
|
|
|
Ok(row_val) => break row_val.row_ref,
|
2022-06-30 18:35:27 +00:00
|
|
|
Err(e) => {
|
|
|
|
error!("Error in wait_new_mail: {}", e);
|
|
|
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2022-06-30 15:40:59 +00:00
|
|
|
tokio::select! {
|
2023-11-02 14:28:19 +00:00
|
|
|
inc_k = wait_new_mail => Some(inc_k),
|
2023-12-18 16:09:44 +00:00
|
|
|
_ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()),
|
2023-11-02 14:28:19 +00:00
|
|
|
_ = lock_held.changed() => None,
|
|
|
|
_ = rx_inbox_id.changed() => None,
|
2022-06-30 15:40:59 +00:00
|
|
|
}
|
|
|
|
} else {
|
2024-01-03 15:52:31 +00:00
|
|
|
debug!("incoming lock not held");
|
2022-06-30 15:40:59 +00:00
|
|
|
tokio::select! {
|
|
|
|
_ = lock_held.changed() => None,
|
|
|
|
_ = rx_inbox_id.changed() => None,
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2022-07-01 17:41:29 +00:00
|
|
|
let user = match Weak::upgrade(&user) {
|
|
|
|
Some(user) => user,
|
|
|
|
None => {
|
2024-01-03 15:52:31 +00:00
|
|
|
debug!("User no longer available, exiting incoming loop.");
|
2022-07-01 17:41:29 +00:00
|
|
|
break;
|
2022-06-30 15:40:59 +00:00
|
|
|
}
|
2022-07-01 17:41:29 +00:00
|
|
|
};
|
2024-01-03 15:52:31 +00:00
|
|
|
debug!("User still available");
|
2022-07-01 17:41:29 +00:00
|
|
|
|
|
|
|
// If INBOX no longer is same mailbox, open new mailbox
|
2023-05-15 16:23:23 +00:00
|
|
|
let inbox_id = *rx_inbox_id.borrow();
|
2022-07-01 17:41:29 +00:00
|
|
|
if let Some((id, uidvalidity)) = inbox_id {
|
|
|
|
if Some(id) != inbox.as_ref().map(|b| b.id) {
|
|
|
|
match user.open_mailbox_by_id(id, uidvalidity).await {
|
|
|
|
Ok(mb) => {
|
2022-07-21 10:50:44 +00:00
|
|
|
inbox = Some(mb);
|
2022-06-30 15:40:59 +00:00
|
|
|
}
|
|
|
|
Err(e) => {
|
2022-07-01 17:41:29 +00:00
|
|
|
inbox = None;
|
|
|
|
error!("Error when opening inbox ({}): {}", id, e);
|
2022-06-30 15:40:59 +00:00
|
|
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
2022-07-01 17:41:29 +00:00
|
|
|
continue;
|
2022-06-30 15:40:59 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-07-01 17:41:29 +00:00
|
|
|
}
|
|
|
|
|
2022-07-01 18:09:26 +00:00
|
|
|
// If we were able to open INBOX, and we have mail,
|
2022-07-01 17:41:29 +00:00
|
|
|
// fetch new mail
|
2023-11-02 14:28:19 +00:00
|
|
|
if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) {
|
2023-12-18 16:09:44 +00:00
|
|
|
match handle_incoming_mail(&user, &storage, inbox, &lock_held).await {
|
2022-07-01 17:41:29 +00:00
|
|
|
Ok(()) => {
|
2023-11-02 14:28:19 +00:00
|
|
|
incoming_key = updated_incoming_key;
|
2022-07-01 17:41:29 +00:00
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
error!("Could not fetch incoming mail: {}", e);
|
|
|
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
|
|
|
}
|
|
|
|
}
|
2022-06-30 15:40:59 +00:00
|
|
|
}
|
2022-06-30 14:18:08 +00:00
|
|
|
}
|
|
|
|
drop(rx_inbox_id);
|
2022-06-30 15:40:59 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-11-02 09:38:47 +00:00
|
|
|
async fn handle_incoming_mail(
|
2022-07-01 18:09:26 +00:00
|
|
|
user: &Arc<User>,
|
2023-12-18 16:09:44 +00:00
|
|
|
storage: &storage::Store,
|
2022-07-01 18:09:26 +00:00
|
|
|
inbox: &Arc<Mailbox>,
|
|
|
|
lock_held: &watch::Receiver<bool>,
|
|
|
|
) -> Result<()> {
|
2023-12-18 16:09:44 +00:00
|
|
|
let mails_res = storage.blob_list("incoming/").await?;
|
2023-11-02 14:28:19 +00:00
|
|
|
|
|
|
|
for object in mails_res {
|
2022-07-01 18:09:26 +00:00
|
|
|
if !*lock_held.borrow() {
|
|
|
|
break;
|
|
|
|
}
|
2023-12-18 16:09:44 +00:00
|
|
|
let key = object.0;
|
2023-11-02 14:28:19 +00:00
|
|
|
if let Some(mail_id) = key.strip_prefix("incoming/") {
|
|
|
|
if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
|
2023-12-18 16:09:44 +00:00
|
|
|
move_incoming_message(user, storage, inbox, mail_id).await?;
|
2022-07-01 18:09:26 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn move_incoming_message(
|
|
|
|
user: &Arc<User>,
|
2023-12-18 16:09:44 +00:00
|
|
|
storage: &storage::Store,
|
2022-07-01 18:09:26 +00:00
|
|
|
inbox: &Arc<Mailbox>,
|
|
|
|
id: UniqueIdent,
|
|
|
|
) -> Result<()> {
|
|
|
|
info!("Moving incoming message: {}", id);
|
|
|
|
|
|
|
|
let object_key = format!("incoming/{}", id);
|
|
|
|
|
|
|
|
// 1. Fetch message from S3
|
2023-12-18 16:09:44 +00:00
|
|
|
let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?;
|
2022-07-01 18:09:26 +00:00
|
|
|
|
|
|
|
// 1.a decrypt message key from headers
|
2023-11-02 14:28:19 +00:00
|
|
|
//info!("Object metadata: {:?}", get_result.metadata);
|
|
|
|
let key_encrypted_b64 = object
|
2023-12-18 16:09:44 +00:00
|
|
|
.meta
|
|
|
|
.get(MESSAGE_KEY)
|
2022-07-01 18:09:26 +00:00
|
|
|
.ok_or(anyhow!("Missing key in metadata"))?;
|
2023-12-18 16:09:44 +00:00
|
|
|
let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?;
|
2022-07-01 18:09:26 +00:00
|
|
|
let message_key = sodiumoxide::crypto::sealedbox::open(
|
|
|
|
&key_encrypted,
|
|
|
|
&user.creds.keys.public,
|
|
|
|
&user.creds.keys.secret,
|
|
|
|
)
|
|
|
|
.map_err(|_| anyhow!("Cannot decrypt message key"))?;
|
|
|
|
let message_key =
|
|
|
|
cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
|
|
|
|
|
|
|
|
// 1.b retrieve message body
|
2023-12-18 16:09:44 +00:00
|
|
|
let obj_body = object.value;
|
2023-11-02 14:28:19 +00:00
|
|
|
let plain_mail = cryptoblob::open(&obj_body, &message_key)
|
2022-07-01 18:09:26 +00:00
|
|
|
.map_err(|_| anyhow!("Cannot decrypt email content"))?;
|
|
|
|
|
|
|
|
// 2 parse mail and add to inbox
|
|
|
|
let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?;
|
|
|
|
inbox
|
2023-12-18 16:09:44 +00:00
|
|
|
.append_from_s3(msg, id, object.blob_ref.clone(), message_key)
|
2022-07-01 18:09:26 +00:00
|
|
|
.await?;
|
|
|
|
|
2022-07-04 10:44:48 +00:00
|
|
|
// 3 delete from incoming
|
2023-12-18 16:09:44 +00:00
|
|
|
storage.blob_rm(&object.blob_ref).await?;
|
2022-07-04 10:44:48 +00:00
|
|
|
|
2022-07-01 18:09:26 +00:00
|
|
|
Ok(())
|
2022-06-30 15:40:59 +00:00
|
|
|
}
|
|
|
|
|
2022-07-01 17:26:30 +00:00
|
|
|
// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
|
|
|
|
|
2023-12-18 16:09:44 +00:00
|
|
|
fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver<bool> {
|
2022-06-30 15:40:59 +00:00
|
|
|
let (held_tx, held_rx) = watch::channel(false);
|
|
|
|
|
2023-12-18 16:09:44 +00:00
|
|
|
tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx));
|
2022-06-30 15:40:59 +00:00
|
|
|
|
|
|
|
held_rx
|
|
|
|
}
|
|
|
|
|
2022-07-01 17:23:29 +00:00
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
enum LockState {
|
|
|
|
Unknown,
|
|
|
|
Empty,
|
2023-12-18 16:09:44 +00:00
|
|
|
Held(UniqueIdent, u64, storage::RowRef),
|
2022-07-01 17:23:29 +00:00
|
|
|
}
|
|
|
|
|
2022-06-30 15:40:59 +00:00
|
|
|
async fn k2v_lock_loop_internal(
|
2023-12-18 16:09:44 +00:00
|
|
|
storage: storage::Store,
|
|
|
|
row_ref: storage::RowRef,
|
2022-06-30 18:35:27 +00:00
|
|
|
held_tx: watch::Sender<bool>,
|
2022-07-01 17:23:29 +00:00
|
|
|
) {
|
|
|
|
let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown);
|
|
|
|
let mut state_rx_2 = state_rx.clone();
|
2022-06-30 18:35:27 +00:00
|
|
|
|
2022-07-01 17:23:29 +00:00
|
|
|
let our_pid = gen_ident();
|
2022-06-30 18:35:27 +00:00
|
|
|
|
2022-07-01 17:23:29 +00:00
|
|
|
// Loop 1: watch state of lock in K2V, save that in corresponding watch channel
|
|
|
|
let watch_lock_loop: BoxFuture<Result<()>> = async {
|
2023-12-18 16:09:44 +00:00
|
|
|
let mut ct = row_ref.clone();
|
2022-07-01 17:23:29 +00:00
|
|
|
loop {
|
2024-01-03 15:52:31 +00:00
|
|
|
debug!("k2v watch lock loop iter: ct = {:?}", ct);
|
2023-12-18 16:09:44 +00:00
|
|
|
match storage.row_poll(&ct).await {
|
2022-07-01 17:23:29 +00:00
|
|
|
Err(e) => {
|
|
|
|
error!(
|
|
|
|
"Error in k2v wait value changed: {} ; assuming we no longer hold lock.",
|
|
|
|
e
|
|
|
|
);
|
|
|
|
state_tx.send(LockState::Unknown)?;
|
|
|
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
|
|
|
}
|
|
|
|
Ok(cv) => {
|
|
|
|
let mut lock_state = None;
|
2023-12-18 16:09:44 +00:00
|
|
|
for v in cv.value.iter() {
|
2023-11-17 11:15:44 +00:00
|
|
|
if let storage::Alternative::Value(vbytes) = v {
|
2022-07-01 17:23:29 +00:00
|
|
|
if vbytes.len() == 32 {
|
|
|
|
let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap());
|
|
|
|
let pid = UniqueIdent(vbytes[8..].try_into().unwrap());
|
|
|
|
if lock_state
|
|
|
|
.map(|(pid2, ts2)| ts > ts2 || (ts == ts2 && pid > pid2))
|
|
|
|
.unwrap_or(true)
|
|
|
|
{
|
|
|
|
lock_state = Some((pid, ts));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2023-12-18 16:09:44 +00:00
|
|
|
let new_ct = cv.row_ref;
|
2023-11-17 11:15:44 +00:00
|
|
|
|
2024-01-03 15:52:31 +00:00
|
|
|
debug!(
|
2022-07-04 10:44:48 +00:00
|
|
|
"k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
|
2023-11-17 11:15:44 +00:00
|
|
|
ct, new_ct, lock_state
|
2022-07-04 10:44:48 +00:00
|
|
|
);
|
2022-07-01 17:23:29 +00:00
|
|
|
state_tx.send(
|
|
|
|
lock_state
|
2023-12-18 16:09:44 +00:00
|
|
|
.map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone()))
|
2022-07-01 17:23:29 +00:00
|
|
|
.unwrap_or(LockState::Empty),
|
|
|
|
)?;
|
2023-11-17 11:15:44 +00:00
|
|
|
ct = new_ct;
|
2022-07-01 17:23:29 +00:00
|
|
|
}
|
2022-06-30 18:35:27 +00:00
|
|
|
}
|
2022-07-01 17:23:29 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
.boxed();
|
2022-06-30 18:35:27 +00:00
|
|
|
|
2022-07-01 17:23:29 +00:00
|
|
|
// Loop 2: notify user whether we are holding the lock or not
|
|
|
|
let lock_notify_loop: BoxFuture<Result<()>> = async {
|
|
|
|
loop {
|
|
|
|
let now = now_msec();
|
|
|
|
let held_with_expiration_time = match &*state_rx.borrow_and_update() {
|
|
|
|
LockState::Held(pid, ts, _ct) if *pid == our_pid => {
|
|
|
|
let expiration_time = *ts - (LOCK_DURATION / 3).as_millis() as u64;
|
|
|
|
if now < expiration_time {
|
|
|
|
Some(expiration_time)
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ => None,
|
|
|
|
};
|
2022-07-04 10:44:48 +00:00
|
|
|
let held = held_with_expiration_time.is_some();
|
|
|
|
if held != *held_tx.borrow() {
|
|
|
|
held_tx.send(held)?;
|
|
|
|
}
|
2022-06-30 18:35:27 +00:00
|
|
|
|
2022-07-01 17:23:29 +00:00
|
|
|
let await_expired = async {
|
|
|
|
match held_with_expiration_time {
|
|
|
|
None => futures::future::pending().await,
|
|
|
|
Some(expiration_time) => {
|
|
|
|
tokio::time::sleep(Duration::from_millis(expiration_time - now)).await
|
2022-06-30 18:35:27 +00:00
|
|
|
}
|
2022-07-01 17:23:29 +00:00
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
tokio::select!(
|
|
|
|
r = state_rx.changed() => {
|
|
|
|
r?;
|
|
|
|
}
|
|
|
|
_ = held_tx.closed() => bail!("held_tx closed, don't need to hold lock anymore"),
|
|
|
|
_ = await_expired => continue,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
.boxed();
|
|
|
|
|
|
|
|
// Loop 3: acquire lock when relevant
|
|
|
|
let take_lock_loop: BoxFuture<Result<()>> = async {
|
|
|
|
loop {
|
|
|
|
let now = now_msec();
|
|
|
|
let state: LockState = state_rx_2.borrow_and_update().clone();
|
|
|
|
let (acquire_at, ct) = match state {
|
|
|
|
LockState::Unknown => {
|
|
|
|
// If state of the lock is unknown, don't try to acquire
|
|
|
|
state_rx_2.changed().await?;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
LockState::Empty => (now, None),
|
|
|
|
LockState::Held(pid, ts, ct) => {
|
|
|
|
if pid == our_pid {
|
|
|
|
(ts - (2 * LOCK_DURATION / 3).as_millis() as u64, Some(ct))
|
|
|
|
} else {
|
|
|
|
(ts, Some(ct))
|
2022-06-30 18:35:27 +00:00
|
|
|
}
|
|
|
|
}
|
2022-07-01 17:23:29 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
// Wait until it is time to acquire lock
|
|
|
|
if acquire_at > now {
|
|
|
|
tokio::select!(
|
|
|
|
r = state_rx_2.changed() => {
|
|
|
|
// If lock state changed in the meantime, don't acquire and loop around
|
|
|
|
r?;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
_ = tokio::time::sleep(Duration::from_millis(acquire_at - now)) => ()
|
|
|
|
);
|
2022-06-30 18:35:27 +00:00
|
|
|
}
|
2022-07-01 17:23:29 +00:00
|
|
|
|
|
|
|
// Acquire lock
|
|
|
|
let mut lock = vec![0u8; 32];
|
2022-07-04 10:44:48 +00:00
|
|
|
lock[..8].copy_from_slice(&u64::to_be_bytes(
|
|
|
|
now_msec() + LOCK_DURATION.as_millis() as u64,
|
|
|
|
));
|
2022-07-01 17:23:29 +00:00
|
|
|
lock[8..].copy_from_slice(&our_pid.0);
|
2023-11-17 11:15:44 +00:00
|
|
|
let row = match ct {
|
2023-12-18 16:09:44 +00:00
|
|
|
Some(existing) => existing,
|
|
|
|
None => row_ref.clone(),
|
2023-11-17 11:15:44 +00:00
|
|
|
};
|
2023-12-27 13:58:28 +00:00
|
|
|
if let Err(e) = storage
|
|
|
|
.row_insert(vec![storage::RowVal::new(row, lock)])
|
|
|
|
.await
|
|
|
|
{
|
2022-07-01 17:23:29 +00:00
|
|
|
error!("Could not take lock: {}", e);
|
|
|
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait for new information to loop back
|
|
|
|
state_rx_2.changed().await?;
|
2022-06-30 18:35:27 +00:00
|
|
|
}
|
|
|
|
}
|
2022-07-01 17:23:29 +00:00
|
|
|
.boxed();
|
|
|
|
|
2022-07-15 15:55:04 +00:00
|
|
|
let _ = futures::try_join!(watch_lock_loop, lock_notify_loop, take_lock_loop);
|
2022-07-01 17:33:47 +00:00
|
|
|
|
2024-01-03 15:52:31 +00:00
|
|
|
debug!("lock loop exited, releasing");
|
2022-07-01 17:33:47 +00:00
|
|
|
|
|
|
|
if !held_tx.is_closed() {
|
2023-12-18 16:09:44 +00:00
|
|
|
warn!("weird...");
|
2022-07-01 17:33:47 +00:00
|
|
|
let _ = held_tx.send(false);
|
|
|
|
}
|
|
|
|
|
|
|
|
// If lock is ours, release it
|
|
|
|
let release = match &*state_rx.borrow() {
|
|
|
|
LockState::Held(pid, _, ct) if *pid == our_pid => Some(ct.clone()),
|
|
|
|
_ => None,
|
|
|
|
};
|
|
|
|
if let Some(ct) = release {
|
2023-12-27 13:58:09 +00:00
|
|
|
match storage.row_rm(&storage::Selector::Single(&ct)).await {
|
2023-12-18 16:09:44 +00:00
|
|
|
Err(e) => warn!("Unable to release lock {:?}: {}", ct, e),
|
|
|
|
Ok(_) => (),
|
|
|
|
};
|
2022-07-01 17:33:47 +00:00
|
|
|
}
|
2022-06-30 18:35:27 +00:00
|
|
|
}
|
|
|
|
|
2022-07-01 17:26:30 +00:00
|
|
|
// ---- LMTP SIDE: storing messages encrypted with user's pubkey ----
|
2022-06-30 15:40:59 +00:00
|
|
|
|
|
|
|
pub struct EncryptedMessage {
|
|
|
|
key: cryptoblob::Key,
|
|
|
|
encrypted_body: Vec<u8>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl EncryptedMessage {
|
|
|
|
pub fn new(body: Vec<u8>) -> Result<Self> {
|
|
|
|
let key = cryptoblob::gen_key();
|
|
|
|
let encrypted_body = cryptoblob::seal(&body, &key)?;
|
|
|
|
Ok(Self {
|
|
|
|
key,
|
|
|
|
encrypted_body,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> {
|
2023-12-21 20:54:36 +00:00
|
|
|
let storage = creds.storage.build().await?;
|
2022-06-30 15:40:59 +00:00
|
|
|
|
|
|
|
// Get causality token of previous watch key
|
2023-12-18 16:09:44 +00:00
|
|
|
let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK);
|
|
|
|
let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await {
|
2023-11-17 11:15:44 +00:00
|
|
|
Err(_) => query,
|
2023-12-18 16:09:44 +00:00
|
|
|
Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query),
|
2022-06-30 15:40:59 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
// Write mail to encrypted storage
|
|
|
|
let encrypted_key =
|
|
|
|
sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key);
|
2023-12-18 16:09:44 +00:00
|
|
|
let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key);
|
2022-06-30 15:40:59 +00:00
|
|
|
|
2023-12-18 16:09:44 +00:00
|
|
|
let blob_val = storage::BlobVal::new(
|
|
|
|
storage::BlobRef(format!("incoming/{}", gen_ident())),
|
|
|
|
self.encrypted_body.clone().into(),
|
2023-12-27 13:58:28 +00:00
|
|
|
)
|
|
|
|
.with_meta(MESSAGE_KEY.to_string(), key_header);
|
2023-12-22 18:32:07 +00:00
|
|
|
storage.blob_insert(blob_val).await?;
|
2022-06-30 15:40:59 +00:00
|
|
|
|
|
|
|
// Update watch key to signal new mail
|
2023-12-27 13:58:28 +00:00
|
|
|
let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec());
|
2023-12-18 16:09:44 +00:00
|
|
|
storage.row_insert(vec![watch_val]).await?;
|
2022-06-30 15:40:59 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
2022-06-30 14:18:08 +00:00
|
|
|
}
|