Skeleton for incoming mail processor

This commit is contained in:
Alex 2022-06-30 16:18:08 +02:00
parent 3277fb16a8
commit 484ad97b65
Signed by: lx
GPG Key ID: 0E496D15096376BE
3 changed files with 36 additions and 13 deletions

16
src/mail/incoming.rs Normal file
View File

@ -0,0 +1,16 @@
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::watch;
use crate::mail::unique_ident::UniqueIdent;
use crate::mail::user::User;
use crate::mail::uidindex::ImapUidvalidity;
pub async fn incoming_mail_watch_process(user: Weak<User>, rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>) {
while Weak::upgrade(&user).is_some() {
eprintln!("User still available");
tokio::time::sleep(Duration::from_secs(10)).await;
}
drop(rx_inbox_id);
}

View File

@ -4,6 +4,7 @@ pub mod mailbox;
pub mod uidindex;
pub mod unique_ident;
pub mod user;
pub mod incoming;
// Internet Message Format
// aka RFC 822 - RFC 2822 - RFC 5322

View File

@ -12,6 +12,7 @@ use crate::login::{Credentials, StorageCredentials};
use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity;
use crate::mail::unique_ident::{gen_ident, UniqueIdent};
use crate::mail::incoming::incoming_mail_watch_process;
use crate::time::now_msec;
const MAILBOX_HIERARCHY_DELIMITER: &str = "/";
@ -70,6 +71,7 @@ impl User {
/// Opens an existing mailbox given its IMAP name.
pub async fn open_mailbox(&self, name: &str) -> Result<Option<Arc<Mailbox>>> {
let (mut list, ct) = self.load_mailbox_list().await?;
eprintln!("List of mailboxes: {:?}", list);
match list.get_mut(name) {
Some(MailboxListEntry {
id_lww: (_, Some(mbid)),
@ -126,6 +128,8 @@ impl User {
// Ensure INBOX exists (done inside load_mailbox_list)
user.load_mailbox_list().await?;
tokio::spawn(incoming_mail_watch_process(Arc::downgrade(&user), rx_inbox_id));
Ok(user)
}
@ -156,26 +160,28 @@ impl User {
// ---- Mailbox list management ----
async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<CausalityToken>)> {
let cv = match self.k2v.read_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK).await {
Err(k2v_client::Error::NotFound) => return Ok((BTreeMap::new(), None)),
let (mut list, ct) = match self.k2v.read_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK).await {
Err(k2v_client::Error::NotFound) => (BTreeMap::new(), None),
Err(e) => return Err(e.into()),
Ok(cv) => cv,
Ok(cv) => {
let mut list = BTreeMap::new();
for v in cv.value {
if let K2vValue::Value(vbytes) = v {
let list2 = open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?;
list = merge_mailbox_lists(list, list2);
}
}
(list, Some(cv.causality))
},
};
let mut list = BTreeMap::new();
for v in cv.value {
if let K2vValue::Value(vbytes) = v {
let list2 = open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?;
list = merge_mailbox_lists(list, list2);
}
}
// If INBOX doesn't exist, create a new mailbox with that name
// and save new mailbox list.
// Also, ensure that the mpsc::watch that keeps track of the
// inbox id is up-to-date.
let (inbox_id, inbox_uidvalidity) = match self
.create_mailbox_internal(&mut list, Some(cv.causality.clone()), INBOX)
.create_mailbox_internal(&mut list, ct.clone(), INBOX)
.await?
{
CreatedMailbox::Created(i, v) => (i, v),
@ -185,7 +191,7 @@ impl User {
.send(Some((inbox_id, inbox_uidvalidity)))
.unwrap();
Ok((list, Some(cv.causality)))
Ok((list, ct))
}
async fn save_mailbox_list(
@ -243,7 +249,7 @@ impl User {
type MailboxList = BTreeMap<String, MailboxListEntry>;
#[derive(Serialize, Deserialize, Clone, Copy)]
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
struct MailboxListEntry {
id_lww: (u64, Option<UniqueIdent>),
uidvalidity: ImapUidvalidity,