From 54c9736a247bb3534a285caa637c9afb052bc2dd Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Dec 2023 14:58:09 +0100 Subject: [PATCH] implemente garage storage --- src/mail/incoming.rs | 2 +- src/mail/mailbox.rs | 2 +- src/storage/garage.rs | 64 ++++++++++++++++++++++++++++++++++++---- src/storage/in_memory.rs | 60 ++++++++++++++++++++----------------- src/storage/mod.rs | 7 +---- 5 files changed, 95 insertions(+), 40 deletions(-) diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index b17959a..7e33a9a 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -386,7 +386,7 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - match storage.row_rm_single(&ct).await { + match storage.row_rm(&storage::Selector::Single(&ct)).await { Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), Ok(_) => (), }; diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index c925f39..6fb7dea 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -365,7 +365,7 @@ impl MailboxInternal { .row_fetch(&storage::Selector::Single(&RowRef::new(&self.mail_path, &sk))) .await?; if let Some(row_val) = res.into_iter().next() { - self.storage.row_rm_single(&row_val.row_ref).await?; + self.storage.row_rm(&storage::Selector::Single(&row_val.row_ref)).await?; } Ok::<_, anyhow::Error>(()) } diff --git a/src/storage/garage.rs b/src/storage/garage.rs index fa6fbc1..f9ba756 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -168,7 +168,65 @@ impl IStore for GarageStore { Ok(row_vals) } async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { - unimplemented!(); + let del_op = match select { + Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchDeleteOp { + partition_key: shard, + prefix: None, + start: Some(sort_begin), + end: Some(sort_end), + single_item: false, + }], + Selector::List(row_ref_list) => { + // Insert null values with causality token = delete + let batch_op = row_ref_list.iter().map(|v| k2v_client::BatchInsertOp { + partition_key: &v.uid.shard, + sort_key: &v.uid.sort, + causality: v.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }).collect::>(); + + return match self.k2v.insert_batch(&batch_op).await { + Err(e) => { + tracing::error!("Unable to delete the list of values: {}", e); + Err(StorageError::Internal) + }, + Ok(_) => Ok(()), + }; + }, + Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp { + partition_key: shard, + prefix: Some(sort_prefix), + start: None, + end: None, + single_item: false, + }], + Selector::Single(row_ref) => { + // Insert null values with causality token = delete + let batch_op = vec![k2v_client::BatchInsertOp { + partition_key: &row_ref.uid.shard, + sort_key: &row_ref.uid.sort, + causality: row_ref.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }]; + + return match self.k2v.insert_batch(&batch_op).await { + Err(e) => { + tracing::error!("Unable to delete the list of values: {}", e); + Err(StorageError::Internal) + }, + Ok(_) => Ok(()), + }; + }, + }; + + // Finally here we only have prefix & range + match self.k2v.delete_batch(&del_op).await { + Err(e) => { + tracing::error!("delete batch error: {}", e); + Err(StorageError::Internal) + }, + Ok(_) => Ok(()), + } } async fn row_insert(&self, values: Vec) -> Result<(), StorageError> { @@ -223,10 +281,6 @@ impl IStore for GarageStore { } } - async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { - unimplemented!(); - } - async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result { let maybe_out = self.s3 .get_object() diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index d764da1..ee7c9a6 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -137,6 +137,32 @@ fn prefix_last_bound(prefix: &str) -> Bound { } } +impl MemStore { + fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { + tracing::trace!(entry=%entry, command="row_rm_single"); + let mut store = self.row.write().or(Err(StorageError::Internal))?; + let shard = &entry.uid.shard; + let sort = &entry.uid.sort; + + let cauz = match entry.causality.as_ref().map(|v| v.parse::()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let bt = store.entry(shard.to_string()).or_default(); + let intval = bt.entry(sort.to_string()).or_default(); + + if cauz == intval.version { + intval.data.clear(); + } + intval.data.push(InternalData::Tombstone); + intval.version += 1; + intval.change.notify_waiters(); + + Ok(()) + } +} + #[async_trait] impl IStore for MemStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result, StorageError> { @@ -183,37 +209,17 @@ impl IStore for MemStore { } } - async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { - tracing::trace!(entry=%entry, command="row_rm_single"); - let mut store = self.row.write().or(Err(StorageError::Internal))?; - let shard = &entry.uid.shard; - let sort = &entry.uid.sort; - - let cauz = match entry.causality.as_ref().map(|v| v.parse::()) { - Some(Ok(v)) => v, - _ => 0, - }; - - let bt = store.entry(shard.to_string()).or_default(); - let intval = bt.entry(sort.to_string()).or_default(); - - if cauz == intval.version { - intval.data.clear(); - } - intval.data.push(InternalData::Tombstone); - intval.version += 1; - intval.change.notify_waiters(); - - Ok(()) - } - async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { tracing::trace!(select=%select, command="row_rm"); - //@FIXME not efficient at all... - let values = self.row_fetch(select).await?; + + let values = match select { + Selector::Range { .. } | Selector::Prefix { .. } => self.row_fetch(select).await?.into_iter().map(|rv| rv.row_ref).collect::>(), + Selector::List(rlist) => rlist.clone(), + Selector::Single(row_ref) => vec![(*row_ref).clone()], + }; for v in values.into_iter() { - self.row_rm_single(&v.row_ref).await?; + self.row_rm_single(&v)?; } Ok(()) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 1b1faad..c81ffe4 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -90,11 +90,6 @@ impl RowVal { #[derive(Debug, Clone)] pub struct BlobRef(pub String); -impl BlobRef { - pub fn new(key: &str) -> Self { - Self(key.to_string()) - } -} impl std::fmt::Display for BlobRef { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "BlobRef({})", self.0) @@ -125,6 +120,7 @@ impl BlobVal { pub enum Selector<'a> { Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str }, List (Vec), // list of (shard_key, sort_key) + #[allow(dead_code)] Prefix { shard: &'a str, sort_prefix: &'a str }, Single(&'a RowRef), } @@ -143,7 +139,6 @@ impl<'a> std::fmt::Display for Selector<'a> { pub trait IStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result, StorageError>; async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>; - async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError>; async fn row_insert(&self, values: Vec) -> Result<(), StorageError>; async fn row_poll(&self, value: &RowRef) -> Result;