in-memory storage #32

Merged
quentin merged 65 commits from in-memory into main 2023-12-27 16:35:43 +00:00
2 changed files with 32 additions and 88 deletions
Showing only changes of commit 652da6efd3 - Show all commits

View file

@ -1,11 +1,5 @@
use anyhow::{anyhow, bail, Result};
use k2v_client::K2vClient;
use k2v_client::{BatchReadOp, Filter, K2vValue};
use rusoto_s3::{
CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3,
};
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;
use crate::bayou::Bayou;
@ -206,35 +200,18 @@ impl MailboxInternal {
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
let ops = ids
.iter()
.map(|id| BatchReadOp {
partition_key: &self.mail_path,
filter: Filter {
start: Some(id),
end: None,
prefix: None,
limit: None,
reverse: false,
},
single_item: true,
conflicts_only: false,
tombstones: false,
})
.collect::<Vec<_>>();
let res_vec = self.k2v.read_batch(&ops).await?;
let ops = ids.iter().map(|id| (self.mail_path.as_str(), id.as_str())).collect::<Vec<_>>();
let res_vec = self.k2v.select(storage::Selector::List(ops)).await?;
let mut meta_vec = vec![];
for (op, res) in ops.iter().zip(res_vec.into_iter()) {
if res.items.len() != 1 {
bail!("Expected 1 item, got {}", res.items.len());
}
let (_, cv) = res.items.iter().next().unwrap();
for res in res_vec.into_iter() {
let mut meta_opt = None;
for v in cv.value.iter() {
// Resolve conflicts
for v in res.content().iter() {
match v {
K2vValue::Tombstone => (),
K2vValue::Value(v) => {
storage::Alternative::Tombstone => (),
storage::Alternative::Value(v) => {
let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
match meta_opt.as_mut() {
None => {
@ -250,7 +227,7 @@ impl MailboxInternal {
if let Some(meta) = meta_opt {
meta_vec.push(meta);
} else {
bail!("No valid meta value in k2v for {:?}", op.filter.start);
bail!("No valid meta value in k2v for {:?}", res.to_ref().sk());
}
}
@ -258,19 +235,9 @@ impl MailboxInternal {
}
async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
let gor = GetObjectRequest {
bucket: self.bucket.clone(),
key: format!("{}/{}", self.mail_path, id),
..Default::default()
};
let obj_res = self.s3.get_object(gor).await?;
let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
obj_body.into_async_read().read_to_end(&mut buf).await?;
cryptoblob::open(&buf, message_key)
let obj_res = self.s3.blob(&format!("{}/{}", self.mail_path, id)).fetch().await?;
let body = obj_res.content().ok_or(anyhow!("missing body"))?;
cryptoblob::open(body, message_key)
}
// ---- Functions for changing the mailbox ----
@ -303,13 +270,7 @@ impl MailboxInternal {
async {
// Encrypt and save mail body
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
let por = PutObjectRequest {
bucket: self.bucket.clone(),
key: format!("{}/{}", self.mail_path, ident),
body: Some(message_blob.into()),
..Default::default()
};
self.s3.put_object(por).await?;
self.s3.blob(&format!("{}/{}", self.mail_path, ident)).set_value(message_blob).push().await?;
Ok::<_, anyhow::Error>(())
},
async {
@ -321,9 +282,7 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
self.k2v
.insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
.await?;
self.k2v.row(&self.mail_path, &ident.to_string()).set_value(meta_blob).push().await?;
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync()
@ -354,8 +313,8 @@ impl MailboxInternal {
futures::try_join!(
async {
// Copy mail body from previous location
let dst = self.s3.blob(format!("{}/{}", self.mail_path, ident));
blob_ref.copy(dst).await?;
let dst = self.s3.blob(&format!("{}/{}", self.mail_path, ident));
blob_ref.copy(&dst).await?;
Ok::<_, anyhow::Error>(())
},
async {
@ -367,9 +326,7 @@ impl MailboxInternal {
rfc822_size: mail.raw.len(),
};
let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
self.k2v
.insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
.await?;
self.k2v.row(&self.mail_path, &ident.to_string()).set_value(meta_blob).push().await?;
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync()
@ -393,21 +350,13 @@ impl MailboxInternal {
futures::try_join!(
async {
// Delete mail body from S3
let dor = DeleteObjectRequest {
bucket: self.bucket.clone(),
key: format!("{}/{}", self.mail_path, ident),
..Default::default()
};
self.s3.delete_object(dor).await?;
self.s3.blob(&format!("{}/{}", self.mail_path, ident)).rm().await?;
Ok::<_, anyhow::Error>(())
},
async {
// Delete mail meta from K2V
let sk = ident.to_string();
let v = self.k2v.read_item(&self.mail_path, &sk).await?;
self.k2v
.delete_item(&self.mail_path, &sk, v.causality)
.await?;
self.k2v.row(&self.mail_path, &sk).fetch().await?.to_ref().rm().await?;
Ok::<_, anyhow::Error>(())
}
)?;
@ -438,7 +387,7 @@ impl MailboxInternal {
source_id: UniqueIdent,
new_id: UniqueIdent,
) -> Result<()> {
if self.bucket != from.bucket || self.encryption_key != from.encryption_key {
if self.encryption_key != from.encryption_key {
bail!("Message to be copied/moved does not belong to same account.");
}
@ -453,24 +402,15 @@ impl MailboxInternal {
futures::try_join!(
async {
// Copy mail body from S3
let cor = CopyObjectRequest {
bucket: self.bucket.clone(),
key: format!("{}/{}", self.mail_path, new_id),
copy_source: format!("{}/{}/{}", from.bucket, from.mail_path, source_id),
..Default::default()
};
self.s3.copy_object(cor).await?;
let dst = self.s3.blob(&format!("{}/{}", self.mail_path, new_id));
self.s3.blob(&format!("{}/{}", from.mail_path, source_id)).copy(&dst).await?;
Ok::<_, anyhow::Error>(())
},
async {
// Copy mail meta in K2V
let meta = &from.fetch_meta(&[source_id]).await?[0];
let meta_blob = seal_serialize(meta, &self.encryption_key)?;
self.k2v
.insert_item(&self.mail_path, &new_id.to_string(), meta_blob, None)
.await?;
self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(meta_blob).push().await?;
Ok::<_, anyhow::Error>(())
},
self.uid_index.opportunistic_sync(),

View file

@ -14,17 +14,18 @@ use futures::future::BoxFuture;
pub mod in_memory;
pub mod garage;
pub enum Selector<'a> {
Range{ begin: &'a str, end: &'a str },
Filter(u64),
}
pub enum Alternative {
Tombstone,
Value(Vec<u8>),
}
type ConcurrentValues = Vec<Alternative>;
pub enum Selector<'a> {
Range { begin: &'a str, end: &'a str },
List (Vec<(&'a str, &'a str)>),
Prefix (&'a str),
}
#[derive(Debug)]
pub enum StorageError {
NotFound,
@ -78,12 +79,15 @@ impl Hash for Builders {
pub trait IRowStore
{
fn row(&self, partition: &str, sort: &str) -> RowRef;
fn select(&self, selector: Selector) -> AsyncResult<Vec<RowValue>>;
}
pub type RowStore = Box<dyn IRowStore + Sync + Send>;
pub trait IRowRef
{
fn clone_boxed(&self) -> RowRef;
fn pk(&self) -> &str;
fn sk(&self) -> &str;
fn set_value(&self, content: Vec<u8>) -> RowValue;
fn fetch(&self) -> AsyncResult<RowValue>;
fn rm(&self) -> AsyncResult<()>;