in-memory storage #32
2 changed files with 37 additions and 8 deletions
|
@ -122,13 +122,14 @@ fn prefix_last_bound(prefix: &str) -> Bound<String> {
|
|||
#[async_trait]
|
||||
impl IStore for MemStore {
|
||||
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
|
||||
tracing::trace!(select=%select, command="row_fetch");
|
||||
let store = self.row.read().or(Err(StorageError::Internal))?;
|
||||
|
||||
match select {
|
||||
Selector::Range { shard, sort_begin, sort_end } => {
|
||||
Ok(store
|
||||
.get(*shard)
|
||||
.ok_or(StorageError::NotFound)?
|
||||
.unwrap_or(&BTreeMap::new())
|
||||
.range((Included(sort_begin.to_string()), Excluded(sort_end.to_string())))
|
||||
.map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
|
||||
.collect::<Vec<_>>())
|
||||
|
@ -136,12 +137,10 @@ impl IStore for MemStore {
|
|||
Selector::List(rlist) => {
|
||||
let mut acc = vec![];
|
||||
for row_ref in rlist {
|
||||
let intval = store
|
||||
.get(&row_ref.uid.shard)
|
||||
.ok_or(StorageError::NotFound)?
|
||||
.get(&row_ref.uid.sort)
|
||||
.ok_or(StorageError::NotFound)?;
|
||||
acc.push(intval.to_row_val(row_ref.clone()));
|
||||
let maybe_intval = store.get(&row_ref.uid.shard).map(|v| v.get(&row_ref.uid.sort)).flatten();
|
||||
if let Some(intval) = maybe_intval {
|
||||
acc.push(intval.to_row_val(row_ref.clone()));
|
||||
}
|
||||
}
|
||||
Ok(acc)
|
||||
},
|
||||
|
@ -150,7 +149,7 @@ impl IStore for MemStore {
|
|||
|
||||
Ok(store
|
||||
.get(*shard)
|
||||
.ok_or(StorageError::NotFound)?
|
||||
.unwrap_or(&BTreeMap::new())
|
||||
.range((Included(sort_prefix.to_string()), last_bound))
|
||||
.map(|(k, v)| v.to_row_val(RowRef::new(shard, k)))
|
||||
.collect::<Vec<_>>())
|
||||
|
@ -167,6 +166,7 @@ impl IStore for MemStore {
|
|||
}
|
||||
|
||||
async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
|
||||
tracing::trace!(entry=%entry, command="row_rm_single");
|
||||
let mut store = self.row.write().or(Err(StorageError::Internal))?;
|
||||
let shard = &entry.uid.shard;
|
||||
let sort = &entry.uid.sort;
|
||||
|
@ -190,6 +190,7 @@ impl IStore for MemStore {
|
|||
}
|
||||
|
||||
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
|
||||
tracing::trace!(select=%select, command="row_rm");
|
||||
//@FIXME not efficient at all...
|
||||
let values = self.row_fetch(select).await?;
|
||||
|
||||
|
@ -200,6 +201,7 @@ impl IStore for MemStore {
|
|||
}
|
||||
|
||||
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
|
||||
tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert");
|
||||
let mut store = self.row.write().or(Err(StorageError::Internal))?;
|
||||
for v in values.into_iter() {
|
||||
let shard = v.row_ref.uid.shard;
|
||||
|
@ -228,6 +230,7 @@ impl IStore for MemStore {
|
|||
Ok(())
|
||||
}
|
||||
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> {
|
||||
tracing::trace!(entry=%value, command="row_poll");
|
||||
let shard = &value.uid.shard;
|
||||
let sort = &value.uid.sort;
|
||||
let cauz = match value.causality.as_ref().map(|v| v.parse::<u64>()) {
|
||||
|
@ -253,10 +256,12 @@ impl IStore for MemStore {
|
|||
}
|
||||
|
||||
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
|
||||
tracing::trace!(entry=%blob_ref, command="blob_fetch");
|
||||
let store = self.blob.read().or(Err(StorageError::Internal))?;
|
||||
store.get(&blob_ref.0).ok_or(StorageError::NotFound).map(|v| v.to_blob_val(blob_ref))
|
||||
}
|
||||
async fn blob_insert(&self, blob_val: &BlobVal) -> Result<(), StorageError> {
|
||||
tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert");
|
||||
let mut store = self.blob.write().or(Err(StorageError::Internal))?;
|
||||
let entry = store.entry(blob_val.blob_ref.0.clone()).or_default();
|
||||
entry.data = blob_val.value.clone();
|
||||
|
@ -264,18 +269,21 @@ impl IStore for MemStore {
|
|||
Ok(())
|
||||
}
|
||||
async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> {
|
||||
tracing::trace!(src=%src, dst=%dst, command="blob_copy");
|
||||
let mut store = self.blob.write().or(Err(StorageError::Internal))?;
|
||||
let blob_src = store.entry(src.0.clone()).or_default().clone();
|
||||
store.insert(dst.0.clone(), blob_src);
|
||||
Ok(())
|
||||
}
|
||||
async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> {
|
||||
tracing::trace!(prefix=prefix, command="blob_list");
|
||||
let store = self.blob.read().or(Err(StorageError::Internal))?;
|
||||
let last_bound = prefix_last_bound(prefix);
|
||||
let blist = store.range((Included(prefix.to_string()), last_bound)).map(|(k, _)| BlobRef(k.to_string())).collect::<Vec<_>>();
|
||||
Ok(blist)
|
||||
}
|
||||
async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> {
|
||||
tracing::trace!(entry=%blob_ref, command="blob_rm");
|
||||
let mut store = self.blob.write().or(Err(StorageError::Internal))?;
|
||||
store.remove(&blob_ref.0);
|
||||
Ok(())
|
||||
|
|
|
@ -50,6 +50,11 @@ pub struct RowRef {
|
|||
pub uid: RowUid,
|
||||
pub causality: Option<String>,
|
||||
}
|
||||
impl std::fmt::Display for RowRef {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "RowRef({}, {}, {:?})", self.uid.shard, self.uid.sort, self.causality)
|
||||
}
|
||||
}
|
||||
|
||||
impl RowRef {
|
||||
pub fn new(shard: &str, sort: &str) -> Self {
|
||||
|
@ -90,6 +95,11 @@ impl BlobRef {
|
|||
Self(key.to_string())
|
||||
}
|
||||
}
|
||||
impl std::fmt::Display for BlobRef {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "BlobRef({})", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BlobVal {
|
||||
|
@ -111,12 +121,23 @@ impl BlobVal {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Selector<'a> {
|
||||
Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str },
|
||||
List (Vec<RowRef>), // list of (shard_key, sort_key)
|
||||
Prefix { shard: &'a str, sort_prefix: &'a str },
|
||||
Single(&'a RowRef),
|
||||
}
|
||||
impl<'a> std::fmt::Display for Selector<'a> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Range { shard, sort_begin, sort_end } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end),
|
||||
Self::List(list) => write!(f, "List({:?})", list),
|
||||
Self::Prefix { shard, sort_prefix } => write!(f, "Prefix({}, {})", shard, sort_prefix),
|
||||
Self::Single(row_ref) => write!(f, "Single({})", row_ref),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait IStore {
|
||||
|
|
Loading…
Reference in a new issue