diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index f6b831d..2a6c947 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -386,7 +386,7 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - match storage.row_rm(&storage::Selector::Single(&ct)).await { + match storage.row_rm_single(&ct).await { Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), Ok(_) => (), }; diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index b4afd5e..65f44b1 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -361,7 +361,12 @@ impl MailboxInternal { async { // Delete mail meta from K2V let sk = ident.to_string(); - self.storage.row_rm(&Selector::Single(&RowRef::new(&self.mail_path, &sk))).await?; + let res = self.storage + .row_fetch(&storage::Selector::Single(&RowRef::new(&self.mail_path, &sk))) + .await?; + if let Some(row_val) = res.into_iter().next() { + self.storage.row_rm_single(&row_val.row_ref).await?; + } Ok::<_, anyhow::Error>(()) } )?; diff --git a/src/storage/garage.rs b/src/storage/garage.rs index ff37287..f202067 100644 --- a/src/storage/garage.rs +++ b/src/storage/garage.rs @@ -56,14 +56,18 @@ impl IStore for GarageStore { unimplemented!(); } + async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { + unimplemented!(); + } + async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result { unimplemented!(); } - async fn blob_insert(&self, blob_val: &BlobVal) -> Result { + async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> { unimplemented!(); } - async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result { + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { unimplemented!(); } diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs index 6d0460f..c18bec3 100644 --- a/src/storage/in_memory.rs +++ b/src/storage/in_memory.rs @@ -1,19 +1,67 @@ use crate::storage::*; use std::collections::{HashMap, BTreeMap}; -use std::ops::Bound::{Included, Unbounded, Excluded}; +use std::ops::Bound::{Included, Unbounded, Excluded, self}; use std::sync::{Arc, RwLock}; +use tokio::sync::Notify; /// This implementation is very inneficient, and not completely correct /// Indeed, when the connector is dropped, the memory is freed. /// It means that when a user disconnects, its data are lost. /// It's intended only for basic debugging, do not use it for advanced tests... -pub type ArcRow = Arc>>>>; -pub type ArcBlob = Arc>>>; +#[derive(Debug, Clone)] +enum InternalData { + Tombstone, + Value(Vec), +} +impl InternalData { + fn to_alternative(&self) -> Alternative { + match self { + Self::Tombstone => Alternative::Tombstone, + Self::Value(x) => Alternative::Value(x.clone()), + } + } +} + +#[derive(Debug, Default)] +struct InternalRowVal { + data: Vec, + version: u64, + change: Arc, +} +impl InternalRowVal { + fn concurrent_values(&self) -> Vec { + self.data.iter().map(InternalData::to_alternative).collect() + } + + fn to_row_val(&self, row_ref: RowRef) -> RowVal { + RowVal{ + row_ref: row_ref.with_causality(self.version.to_string()), + value: self.concurrent_values(), + } + } +} + +#[derive(Debug, Default, Clone)] +struct InternalBlobVal { + data: Vec, + metadata: HashMap, +} +impl InternalBlobVal { + fn to_blob_val(&self, bref: &BlobRef) -> BlobVal { + BlobVal { + blob_ref: bref.clone(), + meta: self.metadata.clone(), + value: self.data.clone(), + } + } +} + +type ArcRow = Arc>>>; +type ArcBlob = Arc>>; #[derive(Clone, Debug)] pub struct MemBuilder { - user: String, unicity: Vec, row: ArcRow, blob: ArcBlob, @@ -25,10 +73,9 @@ impl MemBuilder { unicity.extend_from_slice(file!().as_bytes()); unicity.extend_from_slice(user.as_bytes()); Arc::new(Self { - user: user.to_string(), unicity, row: Arc::new(RwLock::new(HashMap::new())), - blob: Arc::new(RwLock::new(HashMap::new())), + blob: Arc::new(RwLock::new(BTreeMap::new())), }) } } @@ -51,105 +98,180 @@ pub struct MemStore { blob: ArcBlob, } -impl MemStore { - fn inner_fetch(&self, row_ref: &RowRef) -> Result, StorageError> { - Ok(self.row - .read() - .or(Err(StorageError::Internal))? - .get(&row_ref.uid.shard) - .ok_or(StorageError::NotFound)? - .get(&row_ref.uid.sort) - .ok_or(StorageError::NotFound)? - .clone()) +fn prefix_last_bound(prefix: &str) -> Bound { + let mut sort_end = prefix.to_string(); + match sort_end.pop() { + None => Unbounded, + Some(ch) => { + let nc = char::from_u32(ch as u32 + 1).unwrap(); + sort_end.push(nc); + Excluded(sort_end) + } } } #[async_trait] impl IStore for MemStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result, StorageError> { + let store = self.row.read().or(Err(StorageError::Internal))?; + match select { Selector::Range { shard, sort_begin, sort_end } => { - Ok(self.row - .read() - .or(Err(StorageError::Internal))? + Ok(store .get(*shard) .ok_or(StorageError::NotFound)? .range((Included(sort_begin.to_string()), Excluded(sort_end.to_string()))) - .map(|(k, v)| RowVal { - row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) }, - value: vec![Alternative::Value(v.clone())], - }) + .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) .collect::>()) }, Selector::List(rlist) => { let mut acc = vec![]; for row_ref in rlist { - let bytes = self.inner_fetch(row_ref)?; - let row_val = RowVal { - row_ref: row_ref.clone(), - value: vec![Alternative::Value(bytes)] - }; - acc.push(row_val); + let intval = store + .get(&row_ref.uid.shard) + .ok_or(StorageError::NotFound)? + .get(&row_ref.uid.sort) + .ok_or(StorageError::NotFound)?; + acc.push(intval.to_row_val(row_ref.clone())); } Ok(acc) }, Selector::Prefix { shard, sort_prefix } => { - let mut sort_end = sort_prefix.to_string(); - let last_bound = match sort_end.pop() { - None => Unbounded, - Some(ch) => { - let nc = char::from_u32(ch as u32 + 1).unwrap(); - sort_end.push(nc); - Excluded(sort_end) - } - }; + let last_bound = prefix_last_bound(sort_prefix); - Ok(self.row - .read() - .or(Err(StorageError::Internal))? + Ok(store .get(*shard) .ok_or(StorageError::NotFound)? .range((Included(sort_prefix.to_string()), last_bound)) - .map(|(k, v)| RowVal { - row_ref: RowRef { uid: RowUid { shard: shard.to_string(), sort: k.to_string() }, causality: Some("c".to_string()) }, - value: vec![Alternative::Value(v.clone())], - }) + .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) .collect::>()) }, Selector::Single(row_ref) => { - let bytes = self.inner_fetch(row_ref)?; - Ok(vec![RowVal{ row_ref: (*row_ref).clone(), value: vec![Alternative::Value(bytes)]}]) + let intval = store + .get(&row_ref.uid.shard) + .ok_or(StorageError::NotFound)? + .get(&row_ref.uid.sort) + .ok_or(StorageError::NotFound)?; + Ok(vec![intval.to_row_val((*row_ref).clone())]) } } } + async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { + let mut store = self.row.write().or(Err(StorageError::Internal))?; + let shard = &entry.uid.shard; + let sort = &entry.uid.sort; + + let cauz = match entry.causality.as_ref().map(|v| v.parse::()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let bt = store.entry(shard.to_string()).or_default(); + let intval = bt.entry(sort.to_string()).or_default(); + + if cauz == intval.version { + intval.data.clear(); + } + intval.data.push(InternalData::Tombstone); + intval.version += 1; + intval.change.notify_waiters(); + + Ok(()) + } + async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { - unimplemented!(); + //@FIXME not efficient at all... + let values = self.row_fetch(select).await?; + + for v in values.into_iter() { + self.row_rm_single(&v.row_ref).await?; + } + Ok(()) } async fn row_insert(&self, values: Vec) -> Result<(), StorageError> { - unimplemented!(); + let mut store = self.row.write().or(Err(StorageError::Internal))?; + for v in values.into_iter() { + let shard = v.row_ref.uid.shard; + let sort = v.row_ref.uid.sort; + let val = match v.value.into_iter().next() { + Some(Alternative::Value(x)) => x, + _ => vec![], + }; + + let cauz = match v.row_ref.causality.map(|v| v.parse::()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let bt = store.entry(shard).or_default(); + let intval = bt.entry(sort).or_default(); + + if cauz == intval.version { + intval.data.clear(); + } + intval.data.push(InternalData::Value(val)); + intval.version += 1; + intval.change.notify_waiters(); + } + Ok(()) } async fn row_poll(&self, value: &RowRef) -> Result { - unimplemented!(); + let shard = &value.uid.shard; + let sort = &value.uid.sort; + let cauz = match value.causality.as_ref().map(|v| v.parse::()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let notify_me = { + let store = self.row.read().or(Err(StorageError::Internal))?; + let intval = store + .get(shard) + .ok_or(StorageError::NotFound)? + .get(sort) + .ok_or(StorageError::NotFound)?; + + if intval.version != cauz { + return Ok(intval.to_row_val(value.clone())); + } + intval.change.clone() + }; + + notify_me.notified().await; + + let res = self.row_fetch(&Selector::Single(value)).await?; + res.into_iter().next().ok_or(StorageError::NotFound) } async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result { - unimplemented!(); - + let store = self.blob.read().or(Err(StorageError::Internal))?; + store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref)) } - async fn blob_insert(&self, blob_val: &BlobVal) -> Result { - unimplemented!(); + async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> { + let mut store = self.blob.write().or(Err(StorageError::Internal))?; + let entry = store.entry(blob_val.blob_ref.0.clone()).or_default(); + entry.data = blob_val.value.clone(); + entry.metadata = blob_val.meta.clone(); + Ok(()) } - async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result { - unimplemented!(); - + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { + let mut store = self.blob.write().or(Err(StorageError::Internal))?; + let blob_src = store.entry(src.0.clone()).or_default().clone(); + store.insert(dst.0.clone(), blob_src); + Ok(()) } async fn blob_list(&self, prefix: &str) -> Result, StorageError> { - unimplemented!(); + let store = self.blob.read().or(Err(StorageError::Internal))?; + let last_bound = prefix_last_bound(prefix); + let blist = store.range((Included(prefix.to_string()), last_bound)).map(|(k, _)| BlobRef(k.to_string())).collect::>(); + Ok(blist) } async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { - unimplemented!(); + let mut store = self.blob.write().or(Err(StorageError::Internal))?; + store.remove(&blob_ref.0); + Ok(()) } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 8004ac5..a21e07d 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -61,6 +61,10 @@ impl RowRef { causality: None, } } + pub fn with_causality(mut self, causality: String) -> Self { + self.causality = Some(causality); + self + } } #[derive(Debug, Clone)] @@ -118,12 +122,13 @@ pub enum Selector<'a> { pub trait IStore { async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result, StorageError>; async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>; + async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError>; async fn row_insert(&self, values: Vec) -> Result<(), StorageError>; async fn row_poll(&self, value: &RowRef) -> Result; async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result; - async fn blob_insert(&self, blob_val: &BlobVal) -> Result; - async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result; + async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError>; + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>; async fn blob_list(&self, prefix: &str) -> Result, StorageError>; async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>; }