small rewrite

This commit is contained in:
Alex 2022-07-01 19:41:29 +02:00
parent a8e33383f4
commit 14b420d9df
Signed by: lx
GPG Key ID: 0E496D15096376BE
1 changed files with 35 additions and 29 deletions

View File

@ -28,6 +28,9 @@ const INCOMING_WATCH_SK: &str = "watch";
// lost the lock. // lost the lock.
const LOCK_DURATION: Duration = Duration::from_secs(300); const LOCK_DURATION: Duration = Duration::from_secs(300);
// In addition to checking when notified, also check for new mail every 10 minutes
const MAIL_CHECK_INTERVAL: Duration = Duration::from_secs(600);
pub async fn incoming_mail_watch_process( pub async fn incoming_mail_watch_process(
user: Weak<User>, user: Weak<User>,
creds: Credentials, creds: Credentials,
@ -71,7 +74,7 @@ async fn incoming_mail_watch_process_internal(
tokio::select! { tokio::select! {
cv = wait_new_mail => Some(cv.causality), cv = wait_new_mail => Some(cv.causality),
_ = tokio::time::sleep(Duration::from_secs(300)) => prev_ct.take(), _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => prev_ct.clone(),
_ = lock_held.changed() => None, _ = lock_held.changed() => None,
_ = rx_inbox_id.changed() => None, _ = rx_inbox_id.changed() => None,
} }
@ -83,42 +86,45 @@ async fn incoming_mail_watch_process_internal(
} }
}; };
if let Some(user) = Weak::upgrade(&user) { let user = match Weak::upgrade(&user) {
info!("User still available"); Some(user) => user,
None => {
// If INBOX no longer is same mailbox, open new mailbox info!("User no longer available, exiting incoming loop.");
let inbox_id = rx_inbox_id.borrow().clone(); break;
if let Some((id, uidvalidity)) = inbox_id {
if Some(id) != inbox.as_ref().map(|b| b.id) {
match user.open_mailbox_by_id(id, uidvalidity).await {
Ok(mb) => {
inbox = mb;
}
Err(e) => {
inbox = None;
error!("Error when opening inbox ({}): {}", id, e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
}
} }
};
info!("User still available");
// If we were able to open INBOX, and we have mail (implies lock is held), // If INBOX no longer is same mailbox, open new mailbox
// fetch new mail let inbox_id = rx_inbox_id.borrow().clone();
if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) { if let Some((id, uidvalidity)) = inbox_id {
match handle_incoming_mail(&user, &s3, inbox).await { if Some(id) != inbox.as_ref().map(|b| b.id) {
Ok(()) => { match user.open_mailbox_by_id(id, uidvalidity).await {
prev_ct = Some(new_ct); Ok(mb) => {
inbox = mb;
} }
Err(e) => { Err(e) => {
error!("Could not fetch incoming mail: {}", e); inbox = None;
error!("Error when opening inbox ({}): {}", id, e);
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(30)).await;
continue;
} }
} }
} }
} else { }
info!("User no longer available, exiting incoming loop.");
break; // If we were able to open INBOX, and we have mail (implies lock is held),
// fetch new mail
if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) {
match handle_incoming_mail(&user, &s3, inbox).await {
Ok(()) => {
prev_ct = Some(new_ct);
}
Err(e) => {
error!("Could not fetch incoming mail: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
} }
} }
drop(rx_inbox_id); drop(rx_inbox_id);