Handling of incoming messages from LMTP seems to work!

This commit is contained in:
Alex 2022-07-04 12:44:48 +02:00
parent 9b4fcf58df
commit 1abfb60b8e
Signed by: lx
GPG key ID: 0E496D15096376BE
2 changed files with 89 additions and 42 deletions

View file

@ -8,7 +8,8 @@ use anyhow::{anyhow, bail, Result};
use futures::{future::BoxFuture, Future, FutureExt}; use futures::{future::BoxFuture, Future, FutureExt};
use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue}; use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue};
use rusoto_s3::{ use rusoto_s3::{
GetObjectRequest, HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, DeleteObjectRequest, GetObjectRequest, HeadObjectRequest, ListObjectsV2Request,
PutObjectRequest, S3Client, S3,
}; };
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::sync::watch; use tokio::sync::watch;
@ -27,7 +28,7 @@ const INCOMING_PK: &str = "incoming";
const INCOMING_LOCK_SK: &str = "lock"; const INCOMING_LOCK_SK: &str = "lock";
const INCOMING_WATCH_SK: &str = "watch"; const INCOMING_WATCH_SK: &str = "watch";
const MESSAGE_KEY: &str = "Message-Key"; const MESSAGE_KEY: &str = "message-key";
// When a lock is held, it is held for LOCK_DURATION (here 5 minutes) // When a lock is held, it is held for LOCK_DURATION (here 5 minutes)
// It is renewed every LOCK_DURATION/3 // It is renewed every LOCK_DURATION/3
@ -183,6 +184,7 @@ async fn move_incoming_message(
let get_result = s3.get_object(gor).await?; let get_result = s3.get_object(gor).await?;
// 1.a decrypt message key from headers // 1.a decrypt message key from headers
info!("Object metadata: {:?}", get_result.metadata);
let key_encrypted_b64 = get_result let key_encrypted_b64 = get_result
.metadata .metadata
.as_ref() .as_ref()
@ -215,6 +217,12 @@ async fn move_incoming_message(
.append_from_s3(msg, id, &object_key, message_key) .append_from_s3(msg, id, &object_key, message_key)
.await?; .await?;
// 3 delete from incoming
let mut dor = DeleteObjectRequest::default();
dor.bucket = user.creds.storage.bucket.clone();
dor.key = object_key.clone();
s3.delete_object(dor).await?;
Ok(()) Ok(())
} }
@ -250,6 +258,7 @@ async fn k2v_lock_loop_internal(
let watch_lock_loop: BoxFuture<Result<()>> = async { let watch_lock_loop: BoxFuture<Result<()>> = async {
let mut ct = None; let mut ct = None;
loop { loop {
info!("k2v watch lock loop iter: ct = {:?}", ct);
match k2v_wait_value_changed(&k2v, pk, sk, &ct).await { match k2v_wait_value_changed(&k2v, pk, sk, &ct).await {
Err(e) => { Err(e) => {
error!( error!(
@ -275,6 +284,10 @@ async fn k2v_lock_loop_internal(
} }
} }
} }
info!(
"k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}",
ct, cv.causality, lock_state
);
state_tx.send( state_tx.send(
lock_state lock_state
.map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone())) .map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone()))
@ -283,8 +296,8 @@ async fn k2v_lock_loop_internal(
ct = Some(cv.causality); ct = Some(cv.causality);
} }
} }
info!("Stopping lock state watch");
} }
info!("Stopping lock state watch");
} }
.boxed(); .boxed();
@ -303,7 +316,10 @@ async fn k2v_lock_loop_internal(
} }
_ => None, _ => None,
}; };
held_tx.send(held_with_expiration_time.is_some())?; let held = held_with_expiration_time.is_some();
if held != *held_tx.borrow() {
held_tx.send(held)?;
}
let await_expired = async { let await_expired = async {
match held_with_expiration_time { match held_with_expiration_time {
@ -360,7 +376,9 @@ async fn k2v_lock_loop_internal(
// Acquire lock // Acquire lock
let mut lock = vec![0u8; 32]; let mut lock = vec![0u8; 32];
lock[..8].copy_from_slice(&u64::to_be_bytes(now_msec())); lock[..8].copy_from_slice(&u64::to_be_bytes(
now_msec() + LOCK_DURATION.as_millis() as u64,
));
lock[8..].copy_from_slice(&our_pid.0); lock[8..].copy_from_slice(&our_pid.0);
if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await { if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await {
error!("Could not take lock: {}", e); error!("Could not take lock: {}", e);

View file

@ -207,20 +207,32 @@ impl MailboxInternal {
let res_vec = self.k2v.read_batch(&ops).await?; let res_vec = self.k2v.read_batch(&ops).await?;
let mut meta_vec = vec![]; let mut meta_vec = vec![];
for res in res_vec { for (op, res) in ops.iter().zip(res_vec.into_iter()) {
if res.items.len() != 1 { if res.items.len() != 1 {
bail!("Expected 1 item, got {}", res.items.len()); bail!("Expected 1 item, got {}", res.items.len());
} }
let (_, cv) = res.items.iter().next().unwrap(); let (_, cv) = res.items.iter().next().unwrap();
if cv.value.len() != 1 { let mut meta_opt = None;
bail!("Expected 1 value, got {}", cv.value.len()); for v in cv.value.iter() {
} match v {
match &cv.value[0] { K2vValue::Tombstone => (),
K2vValue::Tombstone => bail!("Expected value, got tombstone"),
K2vValue::Value(v) => { K2vValue::Value(v) => {
let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?; let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
meta_vec.push(meta); match meta_opt.as_mut() {
None => {
meta_opt = Some(meta);
} }
Some(prevmeta) => {
prevmeta.try_merge(meta)?;
}
}
}
}
}
if let Some(meta) = meta_opt {
meta_vec.push(meta);
} else {
bail!("No valid meta value in k2v for {:?}", op.filter.start);
} }
} }
@ -429,15 +441,18 @@ impl MailboxInternal {
// ---- // ----
async fn test(&mut self) -> Result<()> { async fn test(&mut self) -> Result<()> {
Ok(())
/*
self.uid_index.sync().await?; self.uid_index.sync().await?;
dump(&self.uid_index); dump(&self.uid_index);
let mail = br#"From: Garage team <garagehq@deuxfleurs.fr> let mail = br#"From: Garage team <garagehq@deuxfleurs.fr>
Subject: Welcome to Aerogramme!! Subject: Welcome to Aerogramme!!
This is just a test email, feel free to ignore. This is just a test email, feel free to ignore.
"#; "#;
let mail = IMF::try_from(&mail[..]).unwrap(); let mail = IMF::try_from(&mail[..]).unwrap();
self.append(mail, None).await?; self.append(mail, None).await?;
@ -461,6 +476,7 @@ This is just a test email, feel free to ignore.
} }
Ok(()) Ok(())
*/
} }
} }
@ -496,3 +512,16 @@ pub struct MailMeta {
/// RFC822 size /// RFC822 size
pub rfc822_size: usize, pub rfc822_size: usize,
} }
impl MailMeta {
fn try_merge(&mut self, other: Self) -> Result<()> {
if self.headers != other.headers
|| self.message_key != other.message_key
|| self.rfc822_size != other.rfc822_size
{
bail!("Conflicting MailMeta values.");
}
self.internaldate = std::cmp::max(self.internaldate, other.internaldate);
Ok(())
}
}