diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 86cbc07..3a1e4f9 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -8,7 +8,8 @@ use anyhow::{anyhow, bail, Result}; use futures::{future::BoxFuture, Future, FutureExt}; use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue}; use rusoto_s3::{ - GetObjectRequest, HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, + DeleteObjectRequest, GetObjectRequest, HeadObjectRequest, ListObjectsV2Request, + PutObjectRequest, S3Client, S3, }; use tokio::io::AsyncReadExt; use tokio::sync::watch; @@ -27,7 +28,7 @@ const INCOMING_PK: &str = "incoming"; const INCOMING_LOCK_SK: &str = "lock"; const INCOMING_WATCH_SK: &str = "watch"; -const MESSAGE_KEY: &str = "Message-Key"; +const MESSAGE_KEY: &str = "message-key"; // When a lock is held, it is held for LOCK_DURATION (here 5 minutes) // It is renewed every LOCK_DURATION/3 @@ -183,6 +184,7 @@ async fn move_incoming_message( let get_result = s3.get_object(gor).await?; // 1.a decrypt message key from headers + info!("Object metadata: {:?}", get_result.metadata); let key_encrypted_b64 = get_result .metadata .as_ref() @@ -215,6 +217,12 @@ async fn move_incoming_message( .append_from_s3(msg, id, &object_key, message_key) .await?; + // 3 delete from incoming + let mut dor = DeleteObjectRequest::default(); + dor.bucket = user.creds.storage.bucket.clone(); + dor.key = object_key.clone(); + s3.delete_object(dor).await?; + Ok(()) } @@ -250,6 +258,7 @@ async fn k2v_lock_loop_internal( let watch_lock_loop: BoxFuture> = async { let mut ct = None; loop { + info!("k2v watch lock loop iter: ct = {:?}", ct); match k2v_wait_value_changed(&k2v, pk, sk, &ct).await { Err(e) => { error!( @@ -275,6 +284,10 @@ async fn k2v_lock_loop_internal( } } } + info!( + "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", + ct, cv.causality, lock_state + ); state_tx.send( lock_state .map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone())) @@ -283,8 +296,8 @@ async fn k2v_lock_loop_internal( ct = Some(cv.causality); } } - info!("Stopping lock state watch"); } + info!("Stopping lock state watch"); } .boxed(); @@ -303,7 +316,10 @@ async fn k2v_lock_loop_internal( } _ => None, }; - held_tx.send(held_with_expiration_time.is_some())?; + let held = held_with_expiration_time.is_some(); + if held != *held_tx.borrow() { + held_tx.send(held)?; + } let await_expired = async { match held_with_expiration_time { @@ -360,7 +376,9 @@ async fn k2v_lock_loop_internal( // Acquire lock let mut lock = vec![0u8; 32]; - lock[..8].copy_from_slice(&u64::to_be_bytes(now_msec())); + lock[..8].copy_from_slice(&u64::to_be_bytes( + now_msec() + LOCK_DURATION.as_millis() as u64, + )); lock[8..].copy_from_slice(&our_pid.0); if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await { error!("Could not take lock: {}", e); diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index e63f712..7389173 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -207,21 +207,33 @@ impl MailboxInternal { let res_vec = self.k2v.read_batch(&ops).await?; let mut meta_vec = vec![]; - for res in res_vec { + for (op, res) in ops.iter().zip(res_vec.into_iter()) { if res.items.len() != 1 { bail!("Expected 1 item, got {}", res.items.len()); } let (_, cv) = res.items.iter().next().unwrap(); - if cv.value.len() != 1 { - bail!("Expected 1 value, got {}", cv.value.len()); - } - match &cv.value[0] { - K2vValue::Tombstone => bail!("Expected value, got tombstone"), - K2vValue::Value(v) => { - let meta = open_deserialize::(v, &self.encryption_key)?; - meta_vec.push(meta); + let mut meta_opt = None; + for v in cv.value.iter() { + match v { + K2vValue::Tombstone => (), + K2vValue::Value(v) => { + let meta = open_deserialize::(v, &self.encryption_key)?; + match meta_opt.as_mut() { + None => { + meta_opt = Some(meta); + } + Some(prevmeta) => { + prevmeta.try_merge(meta)?; + } + } + } } } + if let Some(meta) = meta_opt { + meta_vec.push(meta); + } else { + bail!("No valid meta value in k2v for {:?}", op.filter.start); + } } Ok(meta_vec) @@ -429,38 +441,42 @@ impl MailboxInternal { // ---- async fn test(&mut self) -> Result<()> { - self.uid_index.sync().await?; + Ok(()) - dump(&self.uid_index); - - let mail = br#"From: Garage team -Subject: Welcome to Aerogramme!! - -This is just a test email, feel free to ignore. -"#; - let mail = IMF::try_from(&mail[..]).unwrap(); - self.append(mail, None).await?; - - dump(&self.uid_index); - - if self.uid_index.state().idx_by_uid.len() > 6 { - for i in 0..2 { - let (_, ident) = self - .uid_index - .state() - .idx_by_uid - .iter() - .skip(3 + i) - .next() - .unwrap(); - - self.delete(*ident).await?; + /* + self.uid_index.sync().await?; dump(&self.uid_index); - } - } - Ok(()) + let mail = br#"From: Garage team + Subject: Welcome to Aerogramme!! + + This is just a test email, feel free to ignore. + "#; + let mail = IMF::try_from(&mail[..]).unwrap(); + self.append(mail, None).await?; + + dump(&self.uid_index); + + if self.uid_index.state().idx_by_uid.len() > 6 { + for i in 0..2 { + let (_, ident) = self + .uid_index + .state() + .idx_by_uid + .iter() + .skip(3 + i) + .next() + .unwrap(); + + self.delete(*ident).await?; + + dump(&self.uid_index); + } + } + + Ok(()) + */ } } @@ -496,3 +512,16 @@ pub struct MailMeta { /// RFC822 size pub rfc822_size: usize, } + +impl MailMeta { + fn try_merge(&mut self, other: Self) -> Result<()> { + if self.headers != other.headers + || self.message_key != other.message_key + || self.rfc822_size != other.rfc822_size + { + bail!("Conflicting MailMeta values."); + } + self.internaldate = std::cmp::max(self.internaldate, other.internaldate); + Ok(()) + } +}