WIP incoming loop

This commit is contained in:
Alex 2022-06-30 20:35:27 +02:00
parent 2c61af6684
commit 2189f8b64b
Signed by: lx
GPG Key ID: 0E496D15096376BE
1 changed files with 85 additions and 51 deletions

View File

@ -3,10 +3,10 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use anyhow::Result;
use k2v_client::{CausalityToken, K2vClient, K2vValue};
use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue};
use rusoto_s3::{PutObjectRequest, S3Client, S3};
use tokio::sync::watch;
use tracing::error;
use tracing::{error, info};
use crate::cryptoblob;
use crate::login::{Credentials, PublicCredentials};
@ -45,13 +45,30 @@ async fn incoming_mail_watch_process_internal(
loop {
let new_mail = if *lock_held.borrow() {
info!("incoming lock held");
let wait_new_mail = async {
loop {
match k2v_wait_value_changed(&k2v, &INCOMING_PK, &INCOMING_WATCH_SK, &prev_ct)
Ok(cv) => break cv,
Err(e) => {
error!("Error in wait_new_mail: {}", e);
tokio::select! {
ct = wait_new_mail(&k2v, &prev_ct) => Some(ct),
cv = wait_new_mail => Some(cv.causality),
_ = tokio::time::sleep(Duration::from_secs(300)) => prev_ct.take(),
_ = lock_held.changed() => None,
_ = rx_inbox_id.changed() => None,
} else {
info!("incoming lock not held");
tokio::select! {
_ = lock_held.changed() => None,
_ = rx_inbox_id.changed() => None,
@ -59,7 +76,7 @@ async fn incoming_mail_watch_process_internal(
if let Some(user) = Weak::upgrade(&user) {
eprintln!("User still available");
info!("User still available");
// If INBOX no longer is same mailbox, open new mailbox
let inbox_id = rx_inbox_id.borrow().clone();
@ -92,7 +109,7 @@ async fn incoming_mail_watch_process_internal(
} else {
eprintln!("User no longer available, exiting incoming loop.");
info!("User no longer available, exiting incoming loop.");
@ -100,45 +117,6 @@ async fn incoming_mail_watch_process_internal(
async fn wait_new_mail(k2v: &K2vClient, prev_ct: &Option<CausalityToken>) -> CausalityToken {
loop {
if let Some(ct) = &prev_ct {
match k2v
.poll_item(INCOMING_PK, INCOMING_WATCH_SK, ct.clone(), None)
Err(e) => {
error!("Error when waiting for incoming watch: {}, sleeping", e);
Ok(None) => continue,
Ok(Some(cv)) => {
return cv.causality;
} else {
match k2v.read_item(INCOMING_PK, INCOMING_WATCH_SK).await {
Err(k2v_client::Error::NotFound) => {
if let Err(e) = k2v
.insert_item(INCOMING_PK, INCOMING_WATCH_SK, vec![0u8], None)
error!("Error when waiting for incoming watch: {}, sleeping", e);
Err(e) => {
error!("Error when waiting for incoming watch: {}, sleeping", e);
Ok(cv) => {
return cv.causality;
async fn handle_incoming_mail(user: &Arc<User>, s3: &S3Client, inbox: &Arc<Mailbox>) -> Result<()> {
@ -147,10 +125,7 @@ fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::R
let (held_tx, held_rx) = watch::channel(false);
tokio::spawn(async move {
if let Err(e) = k2v_lock_loop_internal(k2v, pk, sk, &held_tx).await {
error!("Error in k2v locking loop: {}", e);
let _ = held_tx.send(false);
let _ = k2v_lock_loop_internal(k2v, pk, sk, held_tx).await;
@ -160,9 +135,68 @@ async fn k2v_lock_loop_internal(
k2v: K2vClient,
pk: &'static str,
sk: &'static str,
held_tx: &watch::Sender<bool>,
) -> Result<()> {
held_tx: watch::Sender<bool>,
) -> std::result::Result<(), watch::error::SendError<bool>> {
let pid = gen_ident();
let mut state: Option<(UniqueIdent, u64, CausalityToken)> = None;
loop {
let held_until = match &state {
None => None,
Some((_holder, expiration_time, _ct)) => Some(expiration_time),
let now = now_msec();
let wait_half_held_time = async {
match held_until {
None => futures::future::pending().await,
Some(t) => tokio::time::sleep(Duration::from_millis((now_msec() - t) / 2)).await,
tokio::select! {
ret = k2v_wait_value_changed(&k2v, pk, sk, &state.as_ref().map(|(_, _, ct)| ct.clone())) => {
match ret {
Err(e) => {
Ok(cv) => {
async fn k2v_wait_value_changed<'a>(
k2v: &'a K2vClient,
pk: &'static str,
sk: &'static str,
prev_ct: &'a Option<CausalityToken>,
) -> Result<CausalValue> {
loop {
if let Some(ct) = prev_ct {
match k2v.poll_item(pk, sk, ct.clone(), None).await? {
None => continue,
Some(cv) => return Ok(cv),
} else {
match k2v.read_item(pk, sk).await {
Err(k2v_client::Error::NotFound) => {
k2v.insert_item(pk, sk, vec![0u8], None).await?;
Err(e) => return Err(e.into()),
Ok(cv) => return Ok(cv),
// ----