diff --git a/src/k2v_util.rs b/src/k2v_util.rs index 9dadab4..3cd969b 100644 --- a/src/k2v_util.rs +++ b/src/k2v_util.rs @@ -1,14 +1,10 @@ +/* use anyhow::Result; - -use k2v_client::{CausalValue, CausalityToken, K2vClient}; - // ---- UTIL: function to wait for a value to have changed in K2V ---- pub async fn k2v_wait_value_changed( - k2v: &K2vClient, - pk: &str, - sk: &str, - prev_ct: &Option, + k2v: &storage::RowStore, + key: &storage::RowRef, ) -> Result { loop { if let Some(ct) = prev_ct { @@ -27,3 +23,4 @@ pub async fn k2v_wait_value_changed( } } } +*/ diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 3ea7d6a..4e3fc8c 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -22,6 +22,7 @@ use crate::mail::uidindex::ImapUidvalidity; use crate::mail::unique_ident::*; use crate::mail::user::User; use crate::mail::IMF; +use crate::storage; use crate::time::now_msec; const INCOMING_PK: &str = "incoming"; @@ -60,18 +61,17 @@ async fn incoming_mail_watch_process_internal( let s3 = creds.blob_client()?; let mut inbox: Option> = None; - let mut prev_ct: Option = None; + let mut incoming_key = k2v.row(INCOMING_PK, INCOMING_WATCH_SK); loop { - let new_mail = if *lock_held.borrow() { + let maybe_updated_incoming_key = if *lock_held.borrow() { info!("incoming lock held"); let wait_new_mail = async { loop { - match k2v_wait_value_changed(&k2v, INCOMING_PK, INCOMING_WATCH_SK, &prev_ct) - .await + match incoming_key.poll().await { - Ok(cv) => break cv, + Ok(row_val) => break row_val.to_ref(), Err(e) => { error!("Error in wait_new_mail: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; @@ -81,10 +81,10 @@ async fn incoming_mail_watch_process_internal( }; tokio::select! { - cv = wait_new_mail => Some(cv.causality), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => prev_ct.clone(), - _ = lock_held.changed() => None, - _ = rx_inbox_id.changed() => None, + inc_k = wait_new_mail => Some(inc_k), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), + _ = lock_held.changed() => None, + _ = rx_inbox_id.changed() => None, } } else { info!("incoming lock not held"); @@ -123,10 +123,10 @@ async fn incoming_mail_watch_process_internal( // If we were able to open INBOX, and we have mail, // fetch new mail - if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) { + if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) { match handle_incoming_mail(&user, &s3, inbox, &lock_held).await { Ok(()) => { - prev_ct = Some(new_ct); + incoming_key = updated_incoming_key; } Err(e) => { error!("Could not fetch incoming mail: {}", e); @@ -141,27 +141,20 @@ async fn incoming_mail_watch_process_internal( async fn handle_incoming_mail( user: &Arc, - s3: &S3Client, + blobs: &storage::BlobStore, inbox: &Arc, lock_held: &watch::Receiver, ) -> Result<()> { - let lor = ListObjectsV2Request { - bucket: user.creds.storage.bucket.clone(), - max_keys: Some(1000), - prefix: Some("incoming/".into()), - ..Default::default() - }; - let mails_res = s3.list_objects_v2(lor).await?; + let mails_res = blobs.list("incoming/").await?; - for object in mails_res.contents.unwrap_or_default() { + for object in mails_res { if !*lock_held.borrow() { break; } - if let Some(key) = object.key { - if let Some(mail_id) = key.strip_prefix("incoming/") { - if let Ok(mail_id) = mail_id.parse::() { - move_incoming_message(user, s3, inbox, mail_id).await?; - } + let key = object.key(); + if let Some(mail_id) = key.strip_prefix("incoming/") { + if let Ok(mail_id) = mail_id.parse::() { + move_incoming_message(user, blobs, inbox, mail_id).await?; } } } @@ -171,7 +164,7 @@ async fn handle_incoming_mail( async fn move_incoming_message( user: &Arc, - s3: &S3Client, + s3: &storage::BlobStore, inbox: &Arc, id: UniqueIdent, ) -> Result<()> { @@ -180,20 +173,12 @@ async fn move_incoming_message( let object_key = format!("incoming/{}", id); // 1. Fetch message from S3 - let gor = GetObjectRequest { - bucket: user.creds.storage.bucket.clone(), - key: object_key.clone(), - ..Default::default() - }; - let get_result = s3.get_object(gor).await?; + let object = s3.blob(&object_key).fetch().await?; // 1.a decrypt message key from headers - info!("Object metadata: {:?}", get_result.metadata); - let key_encrypted_b64 = get_result - .metadata - .as_ref() - .ok_or(anyhow!("Missing key in metadata"))? - .get(MESSAGE_KEY) + //info!("Object metadata: {:?}", get_result.metadata); + let key_encrypted_b64 = object + .get_meta(MESSAGE_KEY) .ok_or(anyhow!("Missing key in metadata"))?; let key_encrypted = base64::decode(key_encrypted_b64)?; let message_key = sodiumoxide::crypto::sealedbox::open( @@ -206,13 +191,8 @@ async fn move_incoming_message( cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; // 1.b retrieve message body - let obj_body = get_result.body.ok_or(anyhow!("Missing object body"))?; - let mut mail_buf = Vec::with_capacity(get_result.content_length.unwrap_or(128) as usize); - obj_body - .into_async_read() - .read_to_end(&mut mail_buf) - .await?; - let plain_mail = cryptoblob::open(&mail_buf, &message_key) + let obj_body = object.content().ok_or(anyhow!("Missing object body"))?; + let plain_mail = cryptoblob::open(&obj_body, &message_key) .map_err(|_| anyhow!("Cannot decrypt email content"))?; // 2 parse mail and add to inbox @@ -222,19 +202,14 @@ async fn move_incoming_message( .await?; // 3 delete from incoming - let dor = DeleteObjectRequest { - bucket: user.creds.storage.bucket.clone(), - key: object_key.clone(), - ..Default::default() - }; - s3.delete_object(dor).await?; + object.to_ref().rm().await?; Ok(()) } // ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- -fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::Receiver { +fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> watch::Receiver { let (held_tx, held_rx) = watch::channel(false); tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx)); @@ -250,7 +225,7 @@ enum LockState { } async fn k2v_lock_loop_internal( - k2v: K2vClient, + k2v: storage::RowStore, pk: &'static str, sk: &'static str, held_tx: watch::Sender, diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 614382e..581f432 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -49,10 +49,9 @@ impl Mailbox { let mbox = RwLock::new(MailboxInternal { id, - bucket: creds.bucket().to_string(), encryption_key: creds.keys.master.clone(), - k2v: creds.storage.builders.row_store()?, - s3: creds.storage.builders.blob_store()?, + k2v: creds.storage.row_store()?, + s3: creds.storage.blob_store()?, uid_index, mail_path, }); @@ -183,7 +182,6 @@ struct MailboxInternal { // 2023-05-15 will probably be used later. #[allow(dead_code)] id: UniqueIdent, - bucket: String, mail_path: String, encryption_key: Key, diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 595a57c..46da4aa 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -27,6 +27,10 @@ impl IRowStore for GrgStore { } impl IRowRef for GrgRef { + fn clone_boxed(&self) -> RowRef { + unimplemented!(); + } + fn set_value(&self, content: Vec) -> RowValue { unimplemented!(); } @@ -36,7 +40,7 @@ impl IRowRef for GrgRef { fn rm(&self) -> AsyncResult<()> { unimplemented!(); } - fn poll(&self) -> AsyncResult> { + fn poll(&self) -> AsyncResult { unimplemented!(); } } diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index 19b55b9..144a52f 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -28,6 +28,10 @@ impl IRowStore for MemStore { } impl IRowRef for MemRef { + fn clone_boxed(&self) -> RowRef { + unimplemented!(); + } + fn set_value(&self, content: Vec) -> RowValue { unimplemented!(); } @@ -37,9 +41,10 @@ impl IRowRef for MemRef { fn rm(&self) -> AsyncResult<()> { unimplemented!(); } - fn poll(&self) -> AsyncResult> { + fn poll(&self) -> AsyncResult { async { - Ok(None) + let rv: RowValue = Box::new(MemValue{}); + Ok(rv) }.boxed() } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 3e66e84..c5ed1f8 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -83,12 +83,18 @@ pub type RowStore = Box; pub trait IRowRef { + fn clone_boxed(&self) -> RowRef; fn set_value(&self, content: Vec) -> RowValue; fn fetch(&self) -> AsyncResult; fn rm(&self) -> AsyncResult<()>; - fn poll(&self) -> AsyncResult>; + fn poll(&self) -> AsyncResult; } pub type RowRef = Box; +impl Clone for RowRef { + fn clone(&self) -> Self { + return self.clone_boxed() + } +} pub trait IRowValue { @@ -101,14 +107,15 @@ pub type RowValue = Box; // ------- Blob pub trait IBlobStore { - fn new_blob(&self, key: &str) -> BlobRef; - fn list(&self) -> AsyncResult>; + fn blob(&self, key: &str) -> BlobRef; + fn list(&self, prefix: &str) -> AsyncResult>; } pub type BlobStore = Box; pub trait IBlobRef { fn set_value(&self, content: Vec) -> BlobValue; + fn key(&self) -> &str; fn fetch(&self) -> AsyncResult; fn copy(&self, dst: &BlobRef) -> AsyncResult<()>; fn rm(&self) -> AsyncResult<()>; @@ -117,6 +124,8 @@ pub type BlobRef = Box; pub trait IBlobValue { fn to_ref(&self) -> BlobRef; + fn get_meta(&self, key: &str) -> Option<&[u8]>; + fn content(&self) -> Option<&[u8]>; fn push(&self) -> AsyncResult<()>; } pub type BlobValue = Box;