in-memory storage #32

Merged
quentin merged 65 commits from in-memory into main 2023-12-27 16:35:43 +00:00
6 changed files with 58 additions and 70 deletions
Showing only changes of commit a65f5b2589 - Show all commits

View file

@ -1,14 +1,10 @@
/*
use anyhow::Result; use anyhow::Result;
use k2v_client::{CausalValue, CausalityToken, K2vClient};
// ---- UTIL: function to wait for a value to have changed in K2V ---- // ---- UTIL: function to wait for a value to have changed in K2V ----
pub async fn k2v_wait_value_changed( pub async fn k2v_wait_value_changed(
k2v: &K2vClient, k2v: &storage::RowStore,
pk: &str, key: &storage::RowRef,
sk: &str,
prev_ct: &Option<CausalityToken>,
) -> Result<CausalValue> { ) -> Result<CausalValue> {
loop { loop {
if let Some(ct) = prev_ct { if let Some(ct) = prev_ct {
@ -27,3 +23,4 @@ pub async fn k2v_wait_value_changed(
} }
} }
} }
*/

View file

@ -22,6 +22,7 @@ use crate::mail::uidindex::ImapUidvalidity;
use crate::mail::unique_ident::*; use crate::mail::unique_ident::*;
use crate::mail::user::User; use crate::mail::user::User;
use crate::mail::IMF; use crate::mail::IMF;
use crate::storage;
use crate::time::now_msec; use crate::time::now_msec;
const INCOMING_PK: &str = "incoming"; const INCOMING_PK: &str = "incoming";
@ -60,18 +61,17 @@ async fn incoming_mail_watch_process_internal(
let s3 = creds.blob_client()?; let s3 = creds.blob_client()?;
let mut inbox: Option<Arc<Mailbox>> = None; let mut inbox: Option<Arc<Mailbox>> = None;
let mut prev_ct: Option<CausalityToken> = None; let mut incoming_key = k2v.row(INCOMING_PK, INCOMING_WATCH_SK);
loop { loop {
let new_mail = if *lock_held.borrow() { let maybe_updated_incoming_key = if *lock_held.borrow() {
info!("incoming lock held"); info!("incoming lock held");
let wait_new_mail = async { let wait_new_mail = async {
loop { loop {
match k2v_wait_value_changed(&k2v, INCOMING_PK, INCOMING_WATCH_SK, &prev_ct) match incoming_key.poll().await
.await
{ {
Ok(cv) => break cv, Ok(row_val) => break row_val.to_ref(),
Err(e) => { Err(e) => {
error!("Error in wait_new_mail: {}", e); error!("Error in wait_new_mail: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(30)).await;
@ -81,8 +81,8 @@ async fn incoming_mail_watch_process_internal(
}; };
tokio::select! { tokio::select! {
cv = wait_new_mail => Some(cv.causality), inc_k = wait_new_mail => Some(inc_k),
_ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => prev_ct.clone(), _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()),
_ = lock_held.changed() => None, _ = lock_held.changed() => None,
_ = rx_inbox_id.changed() => None, _ = rx_inbox_id.changed() => None,
} }
@ -123,10 +123,10 @@ async fn incoming_mail_watch_process_internal(
// If we were able to open INBOX, and we have mail, // If we were able to open INBOX, and we have mail,
// fetch new mail // fetch new mail
if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) { if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) {
match handle_incoming_mail(&user, &s3, inbox, &lock_held).await { match handle_incoming_mail(&user, &s3, inbox, &lock_held).await {
Ok(()) => { Ok(()) => {
prev_ct = Some(new_ct); incoming_key = updated_incoming_key;
} }
Err(e) => { Err(e) => {
error!("Could not fetch incoming mail: {}", e); error!("Could not fetch incoming mail: {}", e);
@ -141,27 +141,20 @@ async fn incoming_mail_watch_process_internal(
async fn handle_incoming_mail( async fn handle_incoming_mail(
user: &Arc<User>, user: &Arc<User>,
s3: &S3Client, blobs: &storage::BlobStore,
inbox: &Arc<Mailbox>, inbox: &Arc<Mailbox>,
lock_held: &watch::Receiver<bool>, lock_held: &watch::Receiver<bool>,
) -> Result<()> { ) -> Result<()> {
let lor = ListObjectsV2Request { let mails_res = blobs.list("incoming/").await?;
bucket: user.creds.storage.bucket.clone(),
max_keys: Some(1000),
prefix: Some("incoming/".into()),
..Default::default()
};
let mails_res = s3.list_objects_v2(lor).await?;
for object in mails_res.contents.unwrap_or_default() { for object in mails_res {
if !*lock_held.borrow() { if !*lock_held.borrow() {
break; break;
} }
if let Some(key) = object.key { let key = object.key();
if let Some(mail_id) = key.strip_prefix("incoming/") { if let Some(mail_id) = key.strip_prefix("incoming/") {
if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() { if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() {
move_incoming_message(user, s3, inbox, mail_id).await?; move_incoming_message(user, blobs, inbox, mail_id).await?;
}
} }
} }
} }
@ -171,7 +164,7 @@ async fn handle_incoming_mail(
async fn move_incoming_message( async fn move_incoming_message(
user: &Arc<User>, user: &Arc<User>,
s3: &S3Client, s3: &storage::BlobStore,
inbox: &Arc<Mailbox>, inbox: &Arc<Mailbox>,
id: UniqueIdent, id: UniqueIdent,
) -> Result<()> { ) -> Result<()> {
@ -180,20 +173,12 @@ async fn move_incoming_message(
let object_key = format!("incoming/{}", id); let object_key = format!("incoming/{}", id);
// 1. Fetch message from S3 // 1. Fetch message from S3
let gor = GetObjectRequest { let object = s3.blob(&object_key).fetch().await?;
bucket: user.creds.storage.bucket.clone(),
key: object_key.clone(),
..Default::default()
};
let get_result = s3.get_object(gor).await?;
// 1.a decrypt message key from headers // 1.a decrypt message key from headers
info!("Object metadata: {:?}", get_result.metadata); //info!("Object metadata: {:?}", get_result.metadata);
let key_encrypted_b64 = get_result let key_encrypted_b64 = object
.metadata .get_meta(MESSAGE_KEY)
.as_ref()
.ok_or(anyhow!("Missing key in metadata"))?
.get(MESSAGE_KEY)
.ok_or(anyhow!("Missing key in metadata"))?; .ok_or(anyhow!("Missing key in metadata"))?;
let key_encrypted = base64::decode(key_encrypted_b64)?; let key_encrypted = base64::decode(key_encrypted_b64)?;
let message_key = sodiumoxide::crypto::sealedbox::open( let message_key = sodiumoxide::crypto::sealedbox::open(
@ -206,13 +191,8 @@ async fn move_incoming_message(
cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?;
// 1.b retrieve message body // 1.b retrieve message body
let obj_body = get_result.body.ok_or(anyhow!("Missing object body"))?; let obj_body = object.content().ok_or(anyhow!("Missing object body"))?;
let mut mail_buf = Vec::with_capacity(get_result.content_length.unwrap_or(128) as usize); let plain_mail = cryptoblob::open(&obj_body, &message_key)
obj_body
.into_async_read()
.read_to_end(&mut mail_buf)
.await?;
let plain_mail = cryptoblob::open(&mail_buf, &message_key)
.map_err(|_| anyhow!("Cannot decrypt email content"))?; .map_err(|_| anyhow!("Cannot decrypt email content"))?;
// 2 parse mail and add to inbox // 2 parse mail and add to inbox
@ -222,19 +202,14 @@ async fn move_incoming_message(
.await?; .await?;
// 3 delete from incoming // 3 delete from incoming
let dor = DeleteObjectRequest { object.to_ref().rm().await?;
bucket: user.creds.storage.bucket.clone(),
key: object_key.clone(),
..Default::default()
};
s3.delete_object(dor).await?;
Ok(()) Ok(())
} }
// ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- // ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ----
fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> { fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> {
let (held_tx, held_rx) = watch::channel(false); let (held_tx, held_rx) = watch::channel(false);
tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx)); tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx));
@ -250,7 +225,7 @@ enum LockState {
} }
async fn k2v_lock_loop_internal( async fn k2v_lock_loop_internal(
k2v: K2vClient, k2v: storage::RowStore,
pk: &'static str, pk: &'static str,
sk: &'static str, sk: &'static str,
held_tx: watch::Sender<bool>, held_tx: watch::Sender<bool>,

View file

@ -49,10 +49,9 @@ impl Mailbox {
let mbox = RwLock::new(MailboxInternal { let mbox = RwLock::new(MailboxInternal {
id, id,
bucket: creds.bucket().to_string(),
encryption_key: creds.keys.master.clone(), encryption_key: creds.keys.master.clone(),
k2v: creds.storage.builders.row_store()?, k2v: creds.storage.row_store()?,
s3: creds.storage.builders.blob_store()?, s3: creds.storage.blob_store()?,
uid_index, uid_index,
mail_path, mail_path,
}); });
@ -183,7 +182,6 @@ struct MailboxInternal {
// 2023-05-15 will probably be used later. // 2023-05-15 will probably be used later.
#[allow(dead_code)] #[allow(dead_code)]
id: UniqueIdent, id: UniqueIdent,
bucket: String,
mail_path: String, mail_path: String,
encryption_key: Key, encryption_key: Key,

View file

@ -27,6 +27,10 @@ impl IRowStore for GrgStore {
} }
impl IRowRef for GrgRef { impl IRowRef for GrgRef {
fn clone_boxed(&self) -> RowRef {
unimplemented!();
}
fn set_value(&self, content: Vec<u8>) -> RowValue { fn set_value(&self, content: Vec<u8>) -> RowValue {
unimplemented!(); unimplemented!();
} }
@ -36,7 +40,7 @@ impl IRowRef for GrgRef {
fn rm(&self) -> AsyncResult<()> { fn rm(&self) -> AsyncResult<()> {
unimplemented!(); unimplemented!();
} }
fn poll(&self) -> AsyncResult<Option<RowValue>> { fn poll(&self) -> AsyncResult<RowValue> {
unimplemented!(); unimplemented!();
} }
} }

View file

@ -28,6 +28,10 @@ impl IRowStore for MemStore {
} }
impl IRowRef for MemRef { impl IRowRef for MemRef {
fn clone_boxed(&self) -> RowRef {
unimplemented!();
}
fn set_value(&self, content: Vec<u8>) -> RowValue { fn set_value(&self, content: Vec<u8>) -> RowValue {
unimplemented!(); unimplemented!();
} }
@ -37,9 +41,10 @@ impl IRowRef for MemRef {
fn rm(&self) -> AsyncResult<()> { fn rm(&self) -> AsyncResult<()> {
unimplemented!(); unimplemented!();
} }
fn poll(&self) -> AsyncResult<Option<RowValue>> { fn poll(&self) -> AsyncResult<RowValue> {
async { async {
Ok(None) let rv: RowValue = Box::new(MemValue{});
Ok(rv)
}.boxed() }.boxed()
} }
} }

View file

@ -83,12 +83,18 @@ pub type RowStore = Box<dyn IRowStore + Sync + Send>;
pub trait IRowRef pub trait IRowRef
{ {
fn clone_boxed(&self) -> RowRef;
fn set_value(&self, content: Vec<u8>) -> RowValue; fn set_value(&self, content: Vec<u8>) -> RowValue;
fn fetch(&self) -> AsyncResult<RowValue>; fn fetch(&self) -> AsyncResult<RowValue>;
fn rm(&self) -> AsyncResult<()>; fn rm(&self) -> AsyncResult<()>;
fn poll(&self) -> AsyncResult<Option<RowValue>>; fn poll(&self) -> AsyncResult<RowValue>;
} }
pub type RowRef = Box<dyn IRowRef + Send + Sync>; pub type RowRef = Box<dyn IRowRef + Send + Sync>;
impl Clone for RowRef {
fn clone(&self) -> Self {
return self.clone_boxed()
}
}
pub trait IRowValue pub trait IRowValue
{ {
@ -101,14 +107,15 @@ pub type RowValue = Box<dyn IRowValue + Send + Sync>;
// ------- Blob // ------- Blob
pub trait IBlobStore pub trait IBlobStore
{ {
fn new_blob(&self, key: &str) -> BlobRef; fn blob(&self, key: &str) -> BlobRef;
fn list(&self) -> AsyncResult<Vec<BlobRef>>; fn list(&self, prefix: &str) -> AsyncResult<Vec<BlobRef>>;
} }
pub type BlobStore = Box<dyn IBlobStore + Send + Sync>; pub type BlobStore = Box<dyn IBlobStore + Send + Sync>;
pub trait IBlobRef pub trait IBlobRef
{ {
fn set_value(&self, content: Vec<u8>) -> BlobValue; fn set_value(&self, content: Vec<u8>) -> BlobValue;
fn key(&self) -> &str;
fn fetch(&self) -> AsyncResult<BlobValue>; fn fetch(&self) -> AsyncResult<BlobValue>;
fn copy(&self, dst: &BlobRef) -> AsyncResult<()>; fn copy(&self, dst: &BlobRef) -> AsyncResult<()>;
fn rm(&self) -> AsyncResult<()>; fn rm(&self) -> AsyncResult<()>;
@ -117,6 +124,8 @@ pub type BlobRef = Box<dyn IBlobRef + Send + Sync>;
pub trait IBlobValue { pub trait IBlobValue {
fn to_ref(&self) -> BlobRef; fn to_ref(&self) -> BlobRef;
fn get_meta(&self, key: &str) -> Option<&[u8]>;
fn content(&self) -> Option<&[u8]>;
fn push(&self) -> AsyncResult<()>; fn push(&self) -> AsyncResult<()>;
} }
pub type BlobValue = Box<dyn IBlobValue + Send + Sync>; pub type BlobValue = Box<dyn IBlobValue + Send + Sync>;