diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 83039d5..e8111df 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -1,11 +1,5 @@ use anyhow::{anyhow, bail, Result}; -use k2v_client::K2vClient; -use k2v_client::{BatchReadOp, Filter, K2vValue}; -use rusoto_s3::{ - CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3, -}; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncReadExt; use tokio::sync::RwLock; use crate::bayou::Bayou; @@ -206,35 +200,18 @@ impl MailboxInternal { async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { let ids = ids.iter().map(|x| x.to_string()).collect::>(); - let ops = ids - .iter() - .map(|id| BatchReadOp { - partition_key: &self.mail_path, - filter: Filter { - start: Some(id), - end: None, - prefix: None, - limit: None, - reverse: false, - }, - single_item: true, - conflicts_only: false, - tombstones: false, - }) - .collect::>(); - let res_vec = self.k2v.read_batch(&ops).await?; + let ops = ids.iter().map(|id| (self.mail_path.as_str(), id.as_str())).collect::>(); + let res_vec = self.k2v.select(storage::Selector::List(ops)).await?; let mut meta_vec = vec![]; - for (op, res) in ops.iter().zip(res_vec.into_iter()) { - if res.items.len() != 1 { - bail!("Expected 1 item, got {}", res.items.len()); - } - let (_, cv) = res.items.iter().next().unwrap(); + for res in res_vec.into_iter() { let mut meta_opt = None; - for v in cv.value.iter() { + + // Resolve conflicts + for v in res.content().iter() { match v { - K2vValue::Tombstone => (), - K2vValue::Value(v) => { + storage::Alternative::Tombstone => (), + storage::Alternative::Value(v) => { let meta = open_deserialize::(v, &self.encryption_key)?; match meta_opt.as_mut() { None => { @@ -250,7 +227,7 @@ impl MailboxInternal { if let Some(meta) = meta_opt { meta_vec.push(meta); } else { - bail!("No valid meta value in k2v for {:?}", op.filter.start); + bail!("No valid meta value in k2v for {:?}", res.to_ref().sk()); } } @@ -258,19 +235,9 @@ impl MailboxInternal { } async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { - let gor = GetObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, id), - ..Default::default() - }; - - let obj_res = self.s3.get_object(gor).await?; - - let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?; - let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); - obj_body.into_async_read().read_to_end(&mut buf).await?; - - cryptoblob::open(&buf, message_key) + let obj_res = self.s3.blob(&format!("{}/{}", self.mail_path, id)).fetch().await?; + let body = obj_res.content().ok_or(anyhow!("missing body"))?; + cryptoblob::open(body, message_key) } // ---- Functions for changing the mailbox ---- @@ -303,13 +270,7 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - let por = PutObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - body: Some(message_blob.into()), - ..Default::default() - }; - self.s3.put_object(por).await?; + self.s3.blob(&format!("{}/{}", self.mail_path, ident)).set_value(message_blob).push().await?; Ok::<_, anyhow::Error>(()) }, async { @@ -321,9 +282,7 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) - .await?; + self.k2v.row(&self.mail_path, &ident.to_string()).set_value(meta_blob).push().await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -354,8 +313,8 @@ impl MailboxInternal { futures::try_join!( async { // Copy mail body from previous location - let dst = self.s3.blob(format!("{}/{}", self.mail_path, ident)); - blob_ref.copy(dst).await?; + let dst = self.s3.blob(&format!("{}/{}", self.mail_path, ident)); + blob_ref.copy(&dst).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -367,9 +326,7 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) - .await?; + self.k2v.row(&self.mail_path, &ident.to_string()).set_value(meta_blob).push().await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -393,21 +350,13 @@ impl MailboxInternal { futures::try_join!( async { // Delete mail body from S3 - let dor = DeleteObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - ..Default::default() - }; - self.s3.delete_object(dor).await?; + self.s3.blob(&format!("{}/{}", self.mail_path, ident)).rm().await?; Ok::<_, anyhow::Error>(()) }, async { // Delete mail meta from K2V let sk = ident.to_string(); - let v = self.k2v.read_item(&self.mail_path, &sk).await?; - self.k2v - .delete_item(&self.mail_path, &sk, v.causality) - .await?; + self.k2v.row(&self.mail_path, &sk).fetch().await?.to_ref().rm().await?; Ok::<_, anyhow::Error>(()) } )?; @@ -438,7 +387,7 @@ impl MailboxInternal { source_id: UniqueIdent, new_id: UniqueIdent, ) -> Result<()> { - if self.bucket != from.bucket || self.encryption_key != from.encryption_key { + if self.encryption_key != from.encryption_key { bail!("Message to be copied/moved does not belong to same account."); } @@ -453,24 +402,15 @@ impl MailboxInternal { futures::try_join!( async { - // Copy mail body from S3 - let cor = CopyObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, new_id), - copy_source: format!("{}/{}/{}", from.bucket, from.mail_path, source_id), - ..Default::default() - }; - - self.s3.copy_object(cor).await?; + let dst = self.s3.blob(&format!("{}/{}", self.mail_path, new_id)); + self.s3.blob(&format!("{}/{}", from.mail_path, source_id)).copy(&dst).await?; Ok::<_, anyhow::Error>(()) }, async { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &new_id.to_string(), meta_blob, None) - .await?; + self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(meta_blob).push().await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync(), diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c5ed1f8..b687959 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -14,17 +14,18 @@ use futures::future::BoxFuture; pub mod in_memory; pub mod garage; -pub enum Selector<'a> { - Range{ begin: &'a str, end: &'a str }, - Filter(u64), -} - pub enum Alternative { Tombstone, Value(Vec), } type ConcurrentValues = Vec; +pub enum Selector<'a> { + Range { begin: &'a str, end: &'a str }, + List (Vec<(&'a str, &'a str)>), + Prefix (&'a str), +} + #[derive(Debug)] pub enum StorageError { NotFound, @@ -78,12 +79,15 @@ impl Hash for Builders { pub trait IRowStore { fn row(&self, partition: &str, sort: &str) -> RowRef; + fn select(&self, selector: Selector) -> AsyncResult>; } pub type RowStore = Box; pub trait IRowRef { fn clone_boxed(&self) -> RowRef; + fn pk(&self) -> &str; + fn sk(&self) -> &str; fn set_value(&self, content: Vec) -> RowValue; fn fetch(&self) -> AsyncResult; fn rm(&self) -> AsyncResult<()>;