in-memory storage #32

Merged
quentin merged 65 commits from in-memory into main 2023-12-27 16:35:43 +00:00
5 changed files with 28 additions and 26 deletions
Showing only changes of commit 1e192f93d5 - Show all commits

View file

@ -54,10 +54,10 @@ async fn incoming_mail_watch_process_internal(
creds: Credentials, creds: Credentials,
mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>, mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>,
) -> Result<()> { ) -> Result<()> {
let mut lock_held = k2v_lock_loop(creds.k2v_client()?, INCOMING_PK, INCOMING_LOCK_SK); let mut lock_held = k2v_lock_loop(creds.row_client()?, INCOMING_PK, INCOMING_LOCK_SK);
let k2v = creds.k2v_client()?; let k2v = creds.row_client()?;
let s3 = creds.s3_client()?; let s3 = creds.blob_client()?;
let mut inbox: Option<Arc<Mailbox>> = None; let mut inbox: Option<Arc<Mailbox>> = None;
let mut prev_ct: Option<CausalityToken> = None; let mut prev_ct: Option<CausalityToken> = None;

View file

@ -34,7 +34,7 @@ const MAILBOX_LIST_SK: &str = "list";
pub struct User { pub struct User {
pub username: String, pub username: String,
pub creds: Credentials, pub creds: Credentials,
pub k2v: K2vClient, pub k2v: storage::RowStore,
pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>, pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>,
tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>, tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>,
@ -174,7 +174,7 @@ impl User {
// ---- Internal user & mailbox management ---- // ---- Internal user & mailbox management ----
async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> { async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> {
let k2v = creds.k2v_client()?; let k2v = creds.row_client()?;
let (tx_inbox_id, rx_inbox_id) = watch::channel(None); let (tx_inbox_id, rx_inbox_id) = watch::channel(None);
@ -224,32 +224,32 @@ impl User {
// ---- Mailbox list management ---- // ---- Mailbox list management ----
async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<CausalityToken>)> { async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> {
let (mut list, ct) = match self.k2v.read_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK).await { let (mut list, row) = match self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK).fetch().await {
Err(k2v_client::Error::NotFound) => (MailboxList::new(), None), Err(storage::StorageError::NotFound) => (MailboxList::new(), None),
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
Ok(cv) => { Ok(rv) => {
let mut list = MailboxList::new(); let mut list = MailboxList::new();
for v in cv.value { for v in rv.content() {
if let K2vValue::Value(vbytes) = v { if let storage::Alternative::Value(vbytes) = v {
let list2 = let list2 =
open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?; open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?;
list.merge(list2); list.merge(list2);
} }
} }
(list, Some(cv.causality)) (list, Some(rv.to_ref()))
} }
}; };
self.ensure_inbox_exists(&mut list, &ct).await?; self.ensure_inbox_exists(&mut list, &row).await?;
Ok((list, ct)) Ok((list, row))
} }
async fn ensure_inbox_exists( async fn ensure_inbox_exists(
&self, &self,
list: &mut MailboxList, list: &mut MailboxList,
ct: &Option<CausalityToken>, ct: &Option<storage::RowRef>,
) -> Result<bool> { ) -> Result<bool> {
// If INBOX doesn't exist, create a new mailbox with that name // If INBOX doesn't exist, create a new mailbox with that name
// and save new mailbox list. // and save new mailbox list.
@ -278,12 +278,14 @@ impl User {
async fn save_mailbox_list( async fn save_mailbox_list(
&self, &self,
list: &MailboxList, list: &MailboxList,
ct: Option<CausalityToken>, ct: Option<storage::RowRef>,
) -> Result<()> { ) -> Result<()> {
let list_blob = seal_serialize(list, &self.creds.keys.master)?; let list_blob = seal_serialize(list, &self.creds.keys.master)?;
self.k2v let rref = match ct {
.insert_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK, list_blob, ct) Some(x) => x,
.await?; None => self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK),
};
rref.set_value(list_blob).push().await?;
Ok(()) Ok(())
} }
} }

View file

@ -21,7 +21,7 @@ impl IBuilders for GrgCreds {
} }
impl IRowStore for GrgStore { impl IRowStore for GrgStore {
fn new_row(&self, partition: &str, sort: &str) -> RowRef { fn row(&self, partition: &str, sort: &str) -> RowRef {
unimplemented!(); unimplemented!();
} }
} }

View file

@ -22,7 +22,7 @@ impl IBuilders for FullMem {
} }
impl IRowStore for MemStore { impl IRowStore for MemStore {
fn new_row(&self, partition: &str, sort: &str) -> RowRef { fn row(&self, partition: &str, sort: &str) -> RowRef {
unimplemented!(); unimplemented!();
} }
} }

View file

@ -77,7 +77,7 @@ impl Hash for Builders {
// ------ Row // ------ Row
pub trait IRowStore pub trait IRowStore
{ {
fn new_row(&self, partition: &str, sort: &str) -> RowRef; fn row(&self, partition: &str, sort: &str) -> RowRef;
} }
pub type RowStore = Box<dyn IRowStore + Sync + Send>; pub type RowStore = Box<dyn IRowStore + Sync + Send>;
@ -88,7 +88,7 @@ pub trait IRowRef
fn rm(&self) -> AsyncResult<()>; fn rm(&self) -> AsyncResult<()>;
fn poll(&self) -> AsyncResult<Option<RowValue>>; fn poll(&self) -> AsyncResult<Option<RowValue>>;
} }
pub type RowRef = Box<dyn IRowRef>; pub type RowRef = Box<dyn IRowRef + Send + Sync>;
pub trait IRowValue pub trait IRowValue
{ {
@ -96,7 +96,7 @@ pub trait IRowValue
fn content(&self) -> ConcurrentValues; fn content(&self) -> ConcurrentValues;
fn push(&self) -> AsyncResult<()>; fn push(&self) -> AsyncResult<()>;
} }
pub type RowValue = Box<dyn IRowValue>; pub type RowValue = Box<dyn IRowValue + Send + Sync>;
// ------- Blob // ------- Blob
pub trait IBlobStore pub trait IBlobStore
@ -113,10 +113,10 @@ pub trait IBlobRef
fn copy(&self, dst: &BlobRef) -> AsyncResult<()>; fn copy(&self, dst: &BlobRef) -> AsyncResult<()>;
fn rm(&self) -> AsyncResult<()>; fn rm(&self) -> AsyncResult<()>;
} }
pub type BlobRef = Box<dyn IBlobRef>; pub type BlobRef = Box<dyn IBlobRef + Send + Sync>;
pub trait IBlobValue { pub trait IBlobValue {
fn to_ref(&self) -> BlobRef; fn to_ref(&self) -> BlobRef;
fn push(&self) -> AsyncResult<()>; fn push(&self) -> AsyncResult<()>;
} }
pub type BlobValue = Box<dyn IBlobValue>; pub type BlobValue = Box<dyn IBlobValue + Send + Sync>;