From 3d41f40dc8cd6bdfa7a9279ab1959564d06eefaf Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 18 Dec 2023 17:09:44 +0100 Subject: [PATCH] Storage trait new implementation --- src/bayou.rs | 134 +++++++++------------ src/login/ldap_provider.rs | 14 +-- src/login/mod.rs | 12 +- src/login/static_provider.rs | 16 +-- src/mail/incoming.rs | 103 +++++++++-------- src/mail/mailbox.rs | 59 ++++++---- src/mail/user.rs | 68 ++++------- src/storage/garage.rs | 30 ++++- src/storage/in_memory.rs | 35 ++++-- src/storage/mod.rs | 217 +++++++++++++---------------------- 10 files changed, 315 insertions(+), 373 deletions(-) diff --git a/src/bayou.rs b/src/bayou.rs index 3042f94..afe3c75 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -45,8 +45,7 @@ pub struct Bayou { path: String, key: Key, - k2v: storage::RowStore, - s3: storage::BlobStore, + storage: storage::Store, checkpoint: (Timestamp, S), history: Vec<(Timestamp, S::Op, Option)>, @@ -60,17 +59,16 @@ pub struct Bayou { impl Bayou { pub fn new(creds: &Credentials, path: String) -> Result { - let k2v_client = creds.row_client()?; - let s3_client = creds.blob_client()?; + let storage = creds.storage.build()?; - let target = k2v_client.row(&path, WATCH_SK); - let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?; + //let target = k2v_client.row(&path, WATCH_SK); + let target = storage::RowRef::new(&path, WATCH_SK); + let watch = K2vWatch::new(creds, target.clone())?; Ok(Self { path, + storage, key: creds.keys.master.clone(), - k2v: k2v_client, - s3: s3_client, checkpoint: (Timestamp::zero(), S::default()), history: vec![], last_sync: None, @@ -96,9 +94,7 @@ impl Bayou { } else { debug!("(sync) loading checkpoint: {}", key); - let obj_res = self.s3.blob(key).fetch().await?; - let buf = obj_res.content().ok_or(anyhow!("object can't be empty"))?; - + let buf = self.storage.blob_fetch(&storage::BlobRef(key.to_string())).await?.value; debug!("(sync) checkpoint body length: {}", buf.len()); let ck = open_deserialize::(&buf, &self.key)?; @@ -129,42 +125,26 @@ impl Bayou { // 3. List all operations starting from checkpoint let ts_ser = self.checkpoint.0.to_string(); debug!("(sync) looking up operations starting at {}", ts_ser); - let ops_map = self.k2v.select(storage::Selector::Range { shard_key: &self.path, begin: &ts_ser, end: WATCH_SK }).await?; - /*let ops_map = self - .k2v - .read_batch(&[BatchReadOp { - partition_key: &self.path, - filter: Filter { - start: Some(&ts_ser), - end: Some(WATCH_SK), - prefix: None, - limit: None, - reverse: false, - }, - single_item: false, - conflicts_only: false, - tombstones: false, - }]) - .await? - .into_iter() - .next() - .ok_or(anyhow!("Missing K2V result"))? - .items;*/ + let ops_map = self.storage.row_fetch(&storage::Selector::Range { + shard: &self.path, + sort_begin: &ts_ser, + sort_end: WATCH_SK + }).await?; let mut ops = vec![]; for row_value in ops_map { - let row = row_value.to_ref(); - let sort_key = row.key().1; + let row = row_value.row_ref; + let sort_key = row.uid.sort; let ts = sort_key.parse::().map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?; - let val = row_value.content(); + let val = row_value.value; if val.len() != 1 { - bail!("Invalid operation, has {} values", row_value.content().len()); + bail!("Invalid operation, has {} values", val.len()); } match &val[0] { storage::Alternative::Value(v) => { let op = open_deserialize::(v, &self.key)?; - debug!("(sync) operation {}: {} {:?}", sort_key, base64::encode(v), op); + debug!("(sync) operation {}: {:?}", sort_key, op); ops.push((ts, op)); } storage::Alternative::Tombstone => { @@ -231,7 +211,7 @@ impl Bayou { // Save info that sync has been done self.last_sync = new_last_sync; - self.last_sync_watch_ct = self.k2v.from_orphan(new_last_sync_watch_ct).expect("Source & target storage must be compatible"); + self.last_sync_watch_ct = new_last_sync_watch_ct; Ok(()) } @@ -243,7 +223,7 @@ impl Bayou { Some(t) => Instant::now() > t + (CHECKPOINT_INTERVAL / 5), _ => true, }; - let changed = self.last_sync_watch_ct.to_orphan() != *self.watch.rx.borrow(); + let changed = self.last_sync_watch_ct != *self.watch.rx.borrow(); if too_old || changed { self.sync().await?; } @@ -263,12 +243,12 @@ impl Bayou { .map(|(ts, _, _)| ts) .unwrap_or(&self.checkpoint.0), ); - self.k2v - .row(&self.path, &ts.to_string()) - .set_value(&seal_serialize(&op, &self.key)?) - .push() - .await?; + let row_val = storage::RowVal::new( + storage::RowRef::new(&self.path, &ts.to_string()), + seal_serialize(&op, &self.key)?, + ); + self.storage.row_insert(vec![row_val]).await?; self.watch.notify.notify_one(); let new_state = self.state().apply(&op); @@ -368,12 +348,11 @@ impl Bayou { let cryptoblob = seal_serialize(&state_cp, &self.key)?; debug!("(cp) checkpoint body length: {}", cryptoblob.len()); - self.s3 - .blob(format!("{}/checkpoint/{}", self.path, ts_cp.to_string()).as_str()) - .set_value(cryptoblob.into()) - .push() - .await?; - + let blob_val = storage::BlobVal::new( + storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())), + cryptoblob.into(), + ); + self.storage.blob_insert(&blob_val).await?; // Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them) let ecp_len = existing_checkpoints.len(); @@ -383,22 +362,16 @@ impl Bayou { // Delete blobs for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { debug!("(cp) drop old checkpoint {}", key); - self.s3 - .blob(key) - .rm() - .await?; + self.storage.blob_rm(&storage::BlobRef(key.to_string())).await?; } // Delete corresponding range of operations let ts_ser = existing_checkpoints[last_to_keep].0.to_string(); - self.k2v - .rm(storage::Selector::Range{ - shard_key: &self.path, - begin: "", - end: &ts_ser - }) - .await?; - + self.storage.row_rm(&storage::Selector::Range { + shard: &self.path, + sort_begin: "", + sort_end: &ts_ser + }).await? } Ok(()) @@ -417,11 +390,11 @@ impl Bayou { async fn list_checkpoints(&self) -> Result> { let prefix = format!("{}/checkpoint/", self.path); - let checkpoints_res = self.s3.list(&prefix).await?; + let checkpoints_res = self.storage.blob_list(&prefix).await?; let mut checkpoints = vec![]; for object in checkpoints_res { - let key = object.key(); + let key = object.0; if let Some(ckid) = key.strip_prefix(&prefix) { if let Ok(ts) = ckid.parse::() { checkpoints.push((ts, key.into())); @@ -436,9 +409,8 @@ impl Bayou { // ---- Bayou watch in K2V ---- struct K2vWatch { - pk: String, - sk: String, - rx: watch::Receiver, + target: storage::RowRef, + rx: watch::Receiver, notify: Notify, } @@ -446,17 +418,17 @@ impl K2vWatch { /// Creates a new watch and launches subordinate threads. /// These threads hold Weak pointers to the struct; /// they exit when the Arc is dropped. - fn new(creds: &Credentials, pk: String, sk: String) -> Result> { - let row_client = creds.row_client()?; + fn new(creds: &Credentials, target: storage::RowRef) -> Result> { + let storage = creds.storage.build()?; - let (tx, rx) = watch::channel::(row_client.row(&pk, &sk).to_orphan()); + let (tx, rx) = watch::channel::(target.clone()); let notify = Notify::new(); - let watch = Arc::new(K2vWatch { pk, sk, rx, notify }); + let watch = Arc::new(K2vWatch { target, rx, notify }); tokio::spawn(Self::background_task( Arc::downgrade(&watch), - row_client, + storage, tx, )); @@ -465,11 +437,11 @@ impl K2vWatch { async fn background_task( self_weak: Weak, - k2v: storage::RowStore, - tx: watch::Sender, + storage: storage::Store, + tx: watch::Sender, ) { let mut row = match Weak::upgrade(&self_weak) { - Some(this) => k2v.row(&this.pk, &this.sk), + Some(this) => this.target.clone(), None => { error!("can't start loop"); return @@ -479,20 +451,19 @@ impl K2vWatch { while let Some(this) = Weak::upgrade(&self_weak) { debug!( "bayou k2v watch bg loop iter ({}, {})", - this.pk, this.sk + this.target.uid.shard, this.target.uid.sort ); tokio::select!( _ = tokio::time::sleep(Duration::from_secs(60)) => continue, - update = row.poll() => { - //update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => { + update = storage.row_poll(&row) => { match update { Err(e) => { error!("Error in bayou k2v wait value changed: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } Ok(new_value) => { - row = new_value.to_ref(); - if tx.send(row.to_orphan()).is_err() { + row = new_value.row_ref; + if tx.send(row.clone()).is_err() { break; } } @@ -500,7 +471,8 @@ impl K2vWatch { } _ = this.notify.notified() => { let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); - if let Err(e) = row.set_value(&rand).push().await + let row_val = storage::RowVal::new(row.clone(), rand); + if let Err(e) = storage.row_insert(vec![row_val]).await { error!("Error in bayou k2v watch updater loop: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs index 4e3af3c..009605d 100644 --- a/src/login/ldap_provider.rs +++ b/src/login/ldap_provider.rs @@ -85,11 +85,11 @@ impl LdapLoginProvider { }) } - fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result { - let storage: Builders = match &self.storage_specific { - StorageSpecific::InMemory => Box::new(storage::in_memory::FullMem::new( + fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result { + let storage: Builder = match &self.storage_specific { + StorageSpecific::InMemory => storage::in_memory::MemBuilder::new( &get_attr(user, &self.username_attr)? - )), + ), StorageSpecific::Garage { from_config, bucket_source } => { let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?; let aws_secret_access_key = get_attr(user, &from_config.aws_secret_access_key_attr)?; @@ -99,14 +99,14 @@ impl LdapLoginProvider { }; - Box::new(storage::garage::GrgCreds { + storage::garage::GarageBuilder::new(storage::garage::GarageConf { region: from_config.aws_region.clone(), s3_endpoint: from_config.s3_endpoint.clone(), k2v_endpoint: from_config.k2v_endpoint.clone(), aws_access_key_id, aws_secret_access_key, - bucket - }) + bucket, + })? }, }; diff --git a/src/login/mod.rs b/src/login/mod.rs index 9e0c437..d331522 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -33,23 +33,15 @@ pub type ArcLoginProvider = Arc; #[derive(Clone, Debug)] pub struct Credentials { /// The storage credentials are used to authenticate access to the underlying storage (S3, K2V) - pub storage: Builders, + pub storage: Builder, /// The cryptographic keys are used to encrypt and decrypt data stored in S3 and K2V pub keys: CryptoKeys, } -impl Credentials { - pub fn row_client(&self) -> Result { - Ok(self.storage.row_store()?) - } - pub fn blob_client(&self) -> Result { - Ok(self.storage.blob_store()?) - } -} #[derive(Clone, Debug)] pub struct PublicCredentials { /// The storage credentials are used to authenticate access to the underlying storage (S3, K2V) - pub storage: Builders, + pub storage: Builder, pub public_key: PublicKey, } diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs index 788a4c5..5896f16 100644 --- a/src/login/static_provider.rs +++ b/src/login/static_provider.rs @@ -88,16 +88,16 @@ impl LoginProvider for StaticLoginProvider { } tracing::debug!(user=%username, "fetch keys"); - let storage: storage::Builders = match &user.config.storage { - StaticStorage::InMemory => Box::new(storage::in_memory::FullMem::new(username)), - StaticStorage::Garage(grgconf) => Box::new(storage::garage::GrgCreds { + let storage: storage::Builder = match &user.config.storage { + StaticStorage::InMemory => storage::in_memory::MemBuilder::new(username), + StaticStorage::Garage(grgconf) => storage::garage::GarageBuilder::new(storage::garage::GarageConf { region: grgconf.aws_region.clone(), k2v_endpoint: grgconf.k2v_endpoint.clone(), s3_endpoint: grgconf.s3_endpoint.clone(), aws_access_key_id: grgconf.aws_access_key_id.clone(), aws_secret_access_key: grgconf.aws_secret_access_key.clone(), bucket: grgconf.bucket.clone(), - }), + })?, }; let cr = CryptoRoot(user.config.crypto_root.clone()); @@ -114,16 +114,16 @@ impl LoginProvider for StaticLoginProvider { Some(u) => u, }; - let storage: storage::Builders = match &user.config.storage { - StaticStorage::InMemory => Box::new(storage::in_memory::FullMem::new(&user.username)), - StaticStorage::Garage(grgconf) => Box::new(storage::garage::GrgCreds { + let storage: storage::Builder = match &user.config.storage { + StaticStorage::InMemory => storage::in_memory::MemBuilder::new(&user.username), + StaticStorage::Garage(grgconf) => storage::garage::GarageBuilder::new(storage::garage::GarageConf { region: grgconf.aws_region.clone(), k2v_endpoint: grgconf.k2v_endpoint.clone(), s3_endpoint: grgconf.s3_endpoint.clone(), aws_access_key_id: grgconf.aws_access_key_id.clone(), aws_secret_access_key: grgconf.aws_secret_access_key.clone(), bucket: grgconf.bucket.clone(), - }), + })?, }; let cr = CryptoRoot(user.config.crypto_root.clone()); diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index e3c729f..f6b831d 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -5,6 +5,7 @@ use std::sync::{Arc, Weak}; use std::time::Duration; use anyhow::{anyhow, bail, Result}; +use base64::Engine; use futures::{future::BoxFuture, FutureExt}; //use tokio::io::AsyncReadExt; use tokio::sync::watch; @@ -50,13 +51,11 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.row_client()?, INCOMING_PK, INCOMING_LOCK_SK); - - let k2v = creds.row_client()?; - let s3 = creds.blob_client()?; + let mut lock_held = k2v_lock_loop(creds.storage.build()?, storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK)); + let storage = creds.storage.build()?; let mut inbox: Option> = None; - let mut incoming_key = k2v.row(INCOMING_PK, INCOMING_WATCH_SK); + let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); loop { let maybe_updated_incoming_key = if *lock_held.borrow() { @@ -64,9 +63,9 @@ async fn incoming_mail_watch_process_internal( let wait_new_mail = async { loop { - match incoming_key.poll().await + match storage.row_poll(&incoming_key).await { - Ok(row_val) => break row_val.to_ref(), + Ok(row_val) => break row_val.row_ref, Err(e) => { error!("Error in wait_new_mail: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; @@ -77,7 +76,7 @@ async fn incoming_mail_watch_process_internal( tokio::select! { inc_k = wait_new_mail => Some(inc_k), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(k2v.from_orphan(incoming_key.to_orphan()).expect("Incompatible source & target storage")), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), _ = lock_held.changed() => None, _ = rx_inbox_id.changed() => None, } @@ -119,7 +118,7 @@ async fn incoming_mail_watch_process_internal( // If we were able to open INBOX, and we have mail, // fetch new mail if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) { - match handle_incoming_mail(&user, &s3, inbox, &lock_held).await { + match handle_incoming_mail(&user, &storage, inbox, &lock_held).await { Ok(()) => { incoming_key = updated_incoming_key; } @@ -136,20 +135,20 @@ async fn incoming_mail_watch_process_internal( async fn handle_incoming_mail( user: &Arc, - blobs: &storage::BlobStore, + storage: &storage::Store, inbox: &Arc, lock_held: &watch::Receiver, ) -> Result<()> { - let mails_res = blobs.list("incoming/").await?; + let mails_res = storage.blob_list("incoming/").await?; for object in mails_res { if !*lock_held.borrow() { break; } - let key = object.key(); + let key = object.0; if let Some(mail_id) = key.strip_prefix("incoming/") { if let Ok(mail_id) = mail_id.parse::() { - move_incoming_message(user, blobs, inbox, mail_id).await?; + move_incoming_message(user, storage, inbox, mail_id).await?; } } } @@ -159,7 +158,7 @@ async fn handle_incoming_mail( async fn move_incoming_message( user: &Arc, - s3: &storage::BlobStore, + storage: &storage::Store, inbox: &Arc, id: UniqueIdent, ) -> Result<()> { @@ -168,14 +167,15 @@ async fn move_incoming_message( let object_key = format!("incoming/{}", id); // 1. Fetch message from S3 - let object = s3.blob(&object_key).fetch().await?; + let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?; // 1.a decrypt message key from headers //info!("Object metadata: {:?}", get_result.metadata); let key_encrypted_b64 = object - .get_meta(MESSAGE_KEY) + .meta + .get(MESSAGE_KEY) .ok_or(anyhow!("Missing key in metadata"))?; - let key_encrypted = base64::decode(key_encrypted_b64)?; + let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?; let message_key = sodiumoxide::crypto::sealedbox::open( &key_encrypted, &user.creds.keys.public, @@ -186,28 +186,28 @@ async fn move_incoming_message( cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; // 1.b retrieve message body - let obj_body = object.content().ok_or(anyhow!("Missing object body"))?; + let obj_body = object.value; let plain_mail = cryptoblob::open(&obj_body, &message_key) .map_err(|_| anyhow!("Cannot decrypt email content"))?; // 2 parse mail and add to inbox let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?; inbox - .append_from_s3(msg, id, object.to_ref(), message_key) + .append_from_s3(msg, id, object.blob_ref.clone(), message_key) .await?; // 3 delete from incoming - object.to_ref().rm().await?; + storage.blob_rm(&object.blob_ref).await?; Ok(()) } // ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- -fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> watch::Receiver { +fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver { let (held_tx, held_rx) = watch::channel(false); - tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx)); + tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx)); held_rx } @@ -216,13 +216,12 @@ fn k2v_lock_loop(k2v: storage::RowStore, pk: &'static str, sk: &'static str) -> enum LockState { Unknown, Empty, - Held(UniqueIdent, u64, storage::OrphanRowRef), + Held(UniqueIdent, u64, storage::RowRef), } async fn k2v_lock_loop_internal( - k2v: storage::RowStore, - pk: &'static str, - sk: &'static str, + storage: storage::Store, + row_ref: storage::RowRef, held_tx: watch::Sender, ) { let (state_tx, mut state_rx) = watch::channel::(LockState::Unknown); @@ -232,10 +231,10 @@ async fn k2v_lock_loop_internal( // Loop 1: watch state of lock in K2V, save that in corresponding watch channel let watch_lock_loop: BoxFuture> = async { - let mut ct = k2v.row(pk, sk); + let mut ct = row_ref.clone(); loop { info!("k2v watch lock loop iter: ct = {:?}", ct); - match ct.poll().await { + match storage.row_poll(&ct).await { Err(e) => { error!( "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", @@ -246,7 +245,7 @@ async fn k2v_lock_loop_internal( } Ok(cv) => { let mut lock_state = None; - for v in cv.content().iter() { + for v in cv.value.iter() { if let storage::Alternative::Value(vbytes) = v { if vbytes.len() == 32 { let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); @@ -260,7 +259,7 @@ async fn k2v_lock_loop_internal( } } } - let new_ct = cv.to_ref(); + let new_ct = cv.row_ref; info!( "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", @@ -268,7 +267,7 @@ async fn k2v_lock_loop_internal( ); state_tx.send( lock_state - .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.to_orphan())) + .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone())) .unwrap_or(LockState::Empty), )?; ct = new_ct; @@ -358,10 +357,10 @@ async fn k2v_lock_loop_internal( )); lock[8..].copy_from_slice(&our_pid.0); let row = match ct { - Some(orphan) => k2v.from_orphan(orphan).expect("Source & target must be storage compatible"), - None => k2v.row(pk, sk), + Some(existing) => existing, + None => row_ref.clone(), }; - if let Err(e) = row.set_value(&lock).push().await { + if let Err(e) = storage.row_insert(vec![storage::RowVal::new(row, lock)]).await { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -377,7 +376,7 @@ async fn k2v_lock_loop_internal( info!("lock loop exited, releasing"); if !held_tx.is_closed() { - warn!("wierd..."); + warn!("weird..."); let _ = held_tx.send(false); } @@ -387,8 +386,10 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - let row = k2v.from_orphan(ct).expect("Incompatible source & target storage"); - let _ = row.rm().await; + match storage.row_rm(&storage::Selector::Single(&ct)).await { + Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), + Ok(_) => (), + }; } } @@ -410,30 +411,32 @@ impl EncryptedMessage { } pub async fn deliver_to(self: Arc, creds: PublicCredentials) -> Result<()> { - let s3_client = creds.storage.blob_store()?; - let k2v_client = creds.storage.row_store()?; + let storage = creds.storage.build()?; // Get causality token of previous watch key - let query = k2v_client.row(INCOMING_PK, INCOMING_WATCH_SK); - let watch_ct = match query.fetch().await { + let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); + let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await { Err(_) => query, - Ok(cv) => cv.to_ref(), + Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query), }; // Write mail to encrypted storage let encrypted_key = sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); - let key_header = base64::encode(&encrypted_key); + let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key); - let mut send = s3_client - .blob(&format!("incoming/{}", gen_ident())) - .set_value(self.encrypted_body.clone().into()); - send.set_meta(MESSAGE_KEY, &key_header); - send.push().await?; + let blob_val = storage::BlobVal::new( + storage::BlobRef(format!("incoming/{}", gen_ident())), + self.encrypted_body.clone().into(), + ).with_meta(MESSAGE_KEY.to_string(), key_header); + storage.blob_insert(&blob_val).await?; // Update watch key to signal new mail - watch_ct.set_value(gen_ident().0.as_ref()).push().await?; - + let watch_val = storage::RowVal::new( + watch_ct.clone(), + gen_ident().0.to_vec(), + ); + storage.row_insert(vec![watch_val]).await?; Ok(()) } } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 060267a..b4afd5e 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -8,7 +8,7 @@ use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; -use crate::storage::{RowStore, BlobStore, self}; +use crate::storage::{Store, RowRef, RowVal, BlobRef, BlobVal, Selector, self}; use crate::timestamp::now_msec; pub struct Mailbox { @@ -44,8 +44,7 @@ impl Mailbox { let mbox = RwLock::new(MailboxInternal { id, encryption_key: creds.keys.master.clone(), - k2v: creds.storage.row_store()?, - s3: creds.storage.blob_store()?, + storage: creds.storage.build()?, uid_index, mail_path, }); @@ -178,10 +177,7 @@ struct MailboxInternal { id: UniqueIdent, mail_path: String, encryption_key: Key, - - k2v: RowStore, - s3: BlobStore, - + storage: Store, uid_index: Bayou, } @@ -200,15 +196,15 @@ impl MailboxInternal { async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { let ids = ids.iter().map(|x| x.to_string()).collect::>(); - let ops = ids.iter().map(|id| (self.mail_path.as_str(), id.as_str())).collect::>(); - let res_vec = self.k2v.select(storage::Selector::List(ops)).await?; + let ops = ids.iter().map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())).collect::>(); + let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; let mut meta_vec = vec![]; for res in res_vec.into_iter() { let mut meta_opt = None; // Resolve conflicts - for v in res.content().iter() { + for v in res.value.iter() { match v { storage::Alternative::Tombstone => (), storage::Alternative::Value(v) => { @@ -227,7 +223,7 @@ impl MailboxInternal { if let Some(meta) = meta_opt { meta_vec.push(meta); } else { - bail!("No valid meta value in k2v for {:?}", res.to_ref().key()); + bail!("No valid meta value in k2v for {:?}", res.row_ref); } } @@ -235,9 +231,9 @@ impl MailboxInternal { } async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result> { - let obj_res = self.s3.blob(&format!("{}/{}", self.mail_path, id)).fetch().await?; - let body = obj_res.content().ok_or(anyhow!("missing body"))?; - cryptoblob::open(body, message_key) + let obj_res = self.storage.blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))).await?; + let body = obj_res.value; + cryptoblob::open(&body, message_key) } // ---- Functions for changing the mailbox ---- @@ -270,7 +266,10 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - self.s3.blob(&format!("{}/{}", self.mail_path, ident)).set_value(message_blob).push().await?; + self.storage.blob_insert(&BlobVal::new( + BlobRef(format!("{}/{}", self.mail_path, ident)), + message_blob, + )).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -282,7 +281,10 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?; + self.storage.row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]).await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -307,14 +309,14 @@ impl MailboxInternal { &mut self, mail: IMF<'a>, ident: UniqueIdent, - blob_ref: storage::BlobRef, + blob_src: storage::BlobRef, message_key: Key, ) -> Result<()> { futures::try_join!( async { // Copy mail body from previous location - let dst = self.s3.blob(&format!("{}/{}", self.mail_path, ident)); - blob_ref.copy(&dst).await?; + let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident)); + self.storage.blob_copy(&blob_src, &blob_dst).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -326,7 +328,10 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &ident.to_string()).set_value(&meta_blob).push().await?; + self.storage.row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]).await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync() @@ -350,13 +355,13 @@ impl MailboxInternal { futures::try_join!( async { // Delete mail body from S3 - self.s3.blob(&format!("{}/{}", self.mail_path, ident)).rm().await?; + self.storage.blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))).await?; Ok::<_, anyhow::Error>(()) }, async { // Delete mail meta from K2V let sk = ident.to_string(); - self.k2v.row(&self.mail_path, &sk).fetch().await?.to_ref().rm().await?; + self.storage.row_rm(&Selector::Single(&RowRef::new(&self.mail_path, &sk))).await?; Ok::<_, anyhow::Error>(()) } )?; @@ -402,15 +407,19 @@ impl MailboxInternal { futures::try_join!( async { - let dst = self.s3.blob(&format!("{}/{}", self.mail_path, new_id)); - self.s3.blob(&format!("{}/{}", from.mail_path, source_id)).copy(&dst).await?; + let dst = BlobRef(format!("{}/{}", self.mail_path, new_id)); + let src = BlobRef(format!("{}/{}", from.mail_path, source_id)); + self.storage.blob_copy(&src, &dst).await?; Ok::<_, anyhow::Error>(()) }, async { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.k2v.row(&self.mail_path, &new_id.to_string()).set_value(&meta_blob).push().await?; + self.storage.row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &new_id.to_string()), + meta_blob, + )]).await?; Ok::<_, anyhow::Error>(()) }, self.uid_index.opportunistic_sync(), diff --git a/src/mail/user.rs b/src/mail/user.rs index bdfb30c..8413cbf 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -33,7 +33,7 @@ const MAILBOX_LIST_SK: &str = "list"; pub struct User { pub username: String, pub creds: Credentials, - pub k2v: storage::RowStore, + pub storage: storage::Store, pub mailboxes: std::sync::Mutex>>, tx_inbox_id: watch::Sender>, @@ -41,7 +41,7 @@ pub struct User { impl User { pub async fn new(username: String, creds: Credentials) -> Result> { - let cache_key = (username.clone(), creds.storage.clone()); + let cache_key = (username.clone(), creds.storage.unique()); { let cache = USER_CACHE.lock().unwrap(); @@ -81,11 +81,7 @@ impl User { let mb_uidvalidity = mb.current_uid_index().await.uidvalidity; if mb_uidvalidity > uidvalidity { list.update_uidvalidity(name, mb_uidvalidity); - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; } Ok(Some(mb)) } else { @@ -108,11 +104,7 @@ impl User { let (mut list, ct) = self.load_mailbox_list().await?; match list.create_mailbox(name) { CreatedMailbox::Created(_, _) => { - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; Ok(()) } CreatedMailbox::Existed(_, _) => Err(anyhow!("Mailbox {} already exists", name)), @@ -129,11 +121,7 @@ impl User { if list.has_mailbox(name) { // TODO: actually delete mailbox contents list.set_mailbox(name, None); - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; Ok(()) } else { bail!("Mailbox {} does not exist", name); @@ -154,11 +142,7 @@ impl User { if old_name == INBOX { list.rename_mailbox(old_name, new_name)?; if !self.ensure_inbox_exists(&mut list, &ct).await? { - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; } } else { let names = list.existing_mailbox_names(); @@ -182,11 +166,7 @@ impl User { } } - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(&list, orphan).await?; + self.save_mailbox_list(&list, ct).await?; } Ok(()) } @@ -194,14 +174,14 @@ impl User { // ---- Internal user & mailbox management ---- async fn open(username: String, creds: Credentials) -> Result> { - let k2v = creds.row_client()?; + let storage = creds.storage.build()?; let (tx_inbox_id, rx_inbox_id) = watch::channel(None); let user = Arc::new(Self { username, creds: creds.clone(), - k2v, + storage, tx_inbox_id, mailboxes: std::sync::Mutex::new(HashMap::new()), }); @@ -245,19 +225,25 @@ impl User { // ---- Mailbox list management ---- async fn load_mailbox_list(&self) -> Result<(MailboxList, Option)> { - let (mut list, row) = match self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK).fetch().await { + let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK); + let (mut list, row) = match self.storage.row_fetch(&storage::Selector::Single(&row_ref)).await { Err(storage::StorageError::NotFound) => (MailboxList::new(), None), Err(e) => return Err(e.into()), Ok(rv) => { let mut list = MailboxList::new(); - for v in rv.content() { + let (row_ref, row_vals) = match rv.into_iter().next() { + Some(row_val) => (row_val.row_ref, row_val.value), + None => (row_ref, vec![]), + }; + + for v in row_vals { if let storage::Alternative::Value(vbytes) = v { let list2 = open_deserialize::(&vbytes, &self.creds.keys.master)?; list.merge(list2); } } - (list, Some(rv.to_ref())) + (list, Some(row_ref)) } }; @@ -278,11 +264,7 @@ impl User { let saved; let (inbox_id, inbox_uidvalidity) = match list.create_mailbox(INBOX) { CreatedMailbox::Created(i, v) => { - let orphan = match ct { - Some(x) => Some(x.to_orphan()), - None => None, - }; - self.save_mailbox_list(list, orphan).await?; + self.save_mailbox_list(list, ct.clone()).await?; saved = true; (i, v) } @@ -302,14 +284,12 @@ impl User { async fn save_mailbox_list( &self, list: &MailboxList, - ct: Option, + ct: Option, ) -> Result<()> { let list_blob = seal_serialize(list, &self.creds.keys.master)?; - let rref = match ct { - Some(x) => self.k2v.from_orphan(x).expect("Source & target must be same storage"), - None => self.k2v.row(MAILBOX_LIST_PK, MAILBOX_LIST_SK), - }; - rref.set_value(&list_blob).push().await?; + let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK)); + let row_val = storage::RowVal::new(rref, list_blob); + self.storage.row_insert(vec![row_val]).await?; Ok(()) } } @@ -482,6 +462,6 @@ enum CreatedMailbox { // ---- User cache ---- lazy_static! { - static ref USER_CACHE: std::sync::Mutex>> = + static ref USER_CACHE: std::sync::Mutex>> = std::sync::Mutex::new(HashMap::new()); } diff --git a/src/storage/garage.rs b/src/storage/garage.rs index 8276f70..ff37287 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -1,7 +1,8 @@ use crate::storage::*; +use serde::Serialize; -#[derive(Clone, Debug, Hash)] -pub struct GarageBuilder { +#[derive(Clone, Debug, Serialize)] +pub struct GarageConf { pub region: String, pub s3_endpoint: String, pub k2v_endpoint: String, @@ -10,10 +11,28 @@ pub struct GarageBuilder { pub bucket: String, } +#[derive(Clone, Debug)] +pub struct GarageBuilder { + conf: GarageConf, + unicity: Vec, +} + +impl GarageBuilder { + pub fn new(conf: GarageConf) -> anyhow::Result> { + let mut unicity: Vec = vec![]; + unicity.extend_from_slice(file!().as_bytes()); + unicity.append(&mut rmp_serde::to_vec(&conf)?); + Ok(Arc::new(Self { conf, unicity })) + } +} + impl IBuilder for GarageBuilder { - fn build(&self) -> Box { + fn build(&self) -> Result { unimplemented!(); } + fn unique(&self) -> UnicityBuffer { + UnicityBuffer(self.unicity.clone()) + } } pub struct GarageStore { @@ -33,7 +52,7 @@ impl IStore for GarageStore { unimplemented!(); } - async fn row_poll(&self, value: RowRef) -> Result { + async fn row_poll(&self, value: &RowRef) -> Result { unimplemented!(); } @@ -41,6 +60,9 @@ impl IStore for GarageStore { unimplemented!(); } + async fn blob_insert(&self, blob_val: &BlobVal) -> Result { + unimplemented!(); + } async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result { unimplemented!(); diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index 09c6763..6d0460f 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -14,18 +14,36 @@ pub type ArcBlob = Arc>>>; #[derive(Clone, Debug)] pub struct MemBuilder { user: String, - url: String, + unicity: Vec, row: ArcRow, blob: ArcBlob, } +impl MemBuilder { + pub fn new(user: &str) -> Arc { + let mut unicity: Vec = vec![]; + unicity.extend_from_slice(file!().as_bytes()); + unicity.extend_from_slice(user.as_bytes()); + Arc::new(Self { + user: user.to_string(), + unicity, + row: Arc::new(RwLock::new(HashMap::new())), + blob: Arc::new(RwLock::new(HashMap::new())), + }) + } +} + impl IBuilder for MemBuilder { - fn build(&self) -> Box { - Box::new(MemStore { + fn build(&self) -> Result { + Ok(Box::new(MemStore { row: self.row.clone(), blob: self.blob.clone(), - }) + })) } + + fn unique(&self) -> UnicityBuffer { + UnicityBuffer(self.unicity.clone()) + } } pub struct MemStore { @@ -56,7 +74,7 @@ impl IStore for MemStore { .or(Err(StorageError::Internal))? .get(*shard) .ok_or(StorageError::NotFound)? - .range((Included(sort_begin.to_string()), Included(sort_end.to_string()))) + .range((Included(sort_begin.to_string()), Excluded(sort_end.to_string()))) .map(|(k, v)| RowVal { row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) }, value: vec![Alternative::Value(v.clone())], @@ -100,7 +118,7 @@ impl IStore for MemStore { }, Selector::Single(row_ref) => { let bytes = self.inner_fetch(row_ref)?; - Ok(vec![RowVal{ row_ref: row_ref.clone(), value: vec![Alternative::Value(bytes)]}]) + Ok(vec![RowVal{ row_ref: (*row_ref).clone(), value: vec![Alternative::Value(bytes)]}]) } } } @@ -113,7 +131,7 @@ impl IStore for MemStore { unimplemented!(); } - async fn row_poll(&self, value: RowRef) -> Result { + async fn row_poll(&self, value: &RowRef) -> Result { unimplemented!(); } @@ -121,6 +139,9 @@ impl IStore for MemStore { unimplemented!(); } + async fn blob_insert(&self, blob_val: &BlobVal) -> Result { + unimplemented!(); + } async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result { unimplemented!(); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index cb66d58..8004ac5 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -11,9 +11,9 @@ pub mod in_memory; pub mod garage; -use std::hash::{Hash, Hasher}; +use std::sync::Arc; +use std::hash::Hash; use std::collections::HashMap; -use futures::future::BoxFuture; use async_trait::async_trait; #[derive(Debug, Clone)] @@ -23,45 +23,95 @@ pub enum Alternative { } type ConcurrentValues = Vec; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum StorageError { NotFound, Internal, } +impl std::fmt::Display for StorageError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Storage Error: ")?; + match self { + Self::NotFound => f.write_str("Item not found"), + Self::Internal => f.write_str("An internal error occured"), + } + } +} +impl std::error::Error for StorageError {} -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct RowUid { - shard: String, - sort: String, + pub shard: String, + pub sort: String, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct RowRef { - uid: RowUid, - causality: Option, + pub uid: RowUid, + pub causality: Option, +} + +impl RowRef { + pub fn new(shard: &str, sort: &str) -> Self { + Self { + uid: RowUid { + shard: shard.to_string(), + sort: sort.to_string(), + }, + causality: None, + } + } } #[derive(Debug, Clone)] pub struct RowVal { - row_ref: RowRef, - value: ConcurrentValues, + pub row_ref: RowRef, + pub value: ConcurrentValues, +} + +impl RowVal { + pub fn new(row_ref: RowRef, value: Vec) -> Self { + Self { + row_ref, + value: vec![Alternative::Value(value)], + } + } +} + + +#[derive(Debug, Clone)] +pub struct BlobRef(pub String); +impl BlobRef { + pub fn new(key: &str) -> Self { + Self(key.to_string()) + } } -#[derive(Debug, Clone)] -pub struct BlobRef(String); - #[derive(Debug, Clone)] pub struct BlobVal { - blob_ref: BlobRef, - meta: HashMap, - value: Vec, + pub blob_ref: BlobRef, + pub meta: HashMap, + pub value: Vec, +} +impl BlobVal { + pub fn new(blob_ref: BlobRef, value: Vec) -> Self { + Self { + blob_ref, value, + meta: HashMap::new(), + } + } + + pub fn with_meta(mut self, k: String, v: String) -> Self { + self.meta.insert(k, v); + self + } } pub enum Selector<'a> { Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str }, List (Vec), // list of (shard_key, sort_key) Prefix { shard: &'a str, sort_prefix: &'a str }, - Single(RowRef), + Single(&'a RowRef), } #[async_trait] @@ -69,131 +119,24 @@ pub trait IStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result, StorageError>; async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>; async fn row_insert(&self, values: Vec) -> Result<(), StorageError>; - async fn row_poll(&self, value: RowRef) -> Result; + async fn row_poll(&self, value: &RowRef) -> Result; async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result; + async fn blob_insert(&self, blob_val: &BlobVal) -> Result; async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result; async fn blob_list(&self, prefix: &str) -> Result, StorageError>; async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>; } -pub trait IBuilder { - fn build(&self) -> Box; +#[derive(Clone,Debug,PartialEq,Eq,Hash)] +pub struct UnicityBuffer(Vec); + +pub trait IBuilder: std::fmt::Debug { + fn build(&self) -> Result; + + /// Returns an opaque buffer that uniquely identifies this builder + fn unique(&self) -> UnicityBuffer; } - - - - - -/* -#[derive(Clone, Debug, PartialEq)] -pub enum OrphanRowRef { - Garage(garage::GrgOrphanRowRef), - Memory(in_memory::MemOrphanRowRef), -} - - - - -impl std::fmt::Display for StorageError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("Storage Error: ")?; - match self { - Self::NotFound => f.write_str("Item not found"), - Self::Internal => f.write_str("An internal error occured"), - Self::IncompatibleOrphan => f.write_str("Incompatible orphan"), - } - } -} -impl std::error::Error for StorageError {} - -// Utils -pub type AsyncResult<'a, T> = BoxFuture<'a, Result>; - -// ----- Builders -pub trait IBuilders { - fn box_clone(&self) -> Builders; - fn row_store(&self) -> Result; - fn blob_store(&self) -> Result; - fn url(&self) -> &str; -} -pub type Builders = Box; -impl Clone for Builders { - fn clone(&self) -> Self { - self.box_clone() - } -} -impl std::fmt::Debug for Builders { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("aerogramme::storage::Builder") - } -} -impl PartialEq for Builders { - fn eq(&self, other: &Self) -> bool { - self.url() == other.url() - } -} -impl Eq for Builders {} -impl Hash for Builders { - fn hash(&self, state: &mut H) { - self.url().hash(state); - } -} - -// ------ Row -pub trait IRowStore -{ - fn row(&self, partition: &str, sort: &str) -> RowRef; - fn select(&self, selector: Selector) -> AsyncResult>; - fn rm(&self, selector: Selector) -> AsyncResult<()>; - fn from_orphan(&self, orphan: OrphanRowRef) -> Result; -} -pub type RowStore = Box; - -pub trait IRowRef: std::fmt::Debug -{ - fn to_orphan(&self) -> OrphanRowRef; - fn key(&self) -> (&str, &str); - fn set_value(&self, content: &[u8]) -> RowValue; - fn fetch(&self) -> AsyncResult; - fn rm(&self) -> AsyncResult<()>; - fn poll(&self) -> AsyncResult; -} -pub type RowRef<'a> = Box; - -pub trait IRowValue: std::fmt::Debug -{ - fn to_ref(&self) -> RowRef; - fn content(&self) -> ConcurrentValues; - fn push(&self) -> AsyncResult<()>; -} -pub type RowValue = Box; - -// ------- Blob -pub trait IBlobStore -{ - fn blob(&self, key: &str) -> BlobRef; - fn list(&self, prefix: &str) -> AsyncResult>; -} -pub type BlobStore = Box; - -pub trait IBlobRef -{ - fn set_value(&self, content: Vec) -> BlobValue; - fn key(&self) -> &str; - fn fetch(&self) -> AsyncResult; - fn copy(&self, dst: &BlobRef) -> AsyncResult<()>; - fn rm(&self) -> AsyncResult<()>; -} -pub type BlobRef = Box; - -pub trait IBlobValue { - fn to_ref(&self) -> BlobRef; - fn get_meta(&self, key: &str) -> Option<&[u8]>; - fn set_meta(&mut self, key: &str, val: &str); - fn content(&self) -> Option<&[u8]>; - fn push(&self) -> AsyncResult<()>; -} -pub type BlobValue = Box; -*/ +pub type Builder = Arc; +pub type Store = Box;