in-memory storage #32

Merged
quentin merged 65 commits from in-memory into main 2023-12-27 16:35:43 +00:00
5 changed files with 95 additions and 40 deletions
Showing only changes of commit 54c9736a24 - Show all commits

View file

@ -386,7 +386,7 @@ async fn k2v_lock_loop_internal(
_ => None,
};
if let Some(ct) = release {
match storage.row_rm_single(&ct).await {
match storage.row_rm(&storage::Selector::Single(&ct)).await {
Err(e) => warn!("Unable to release lock {:?}: {}", ct, e),
Ok(_) => (),
};

View file

@ -365,7 +365,7 @@ impl MailboxInternal {
.row_fetch(&storage::Selector::Single(&RowRef::new(&self.mail_path, &sk)))
.await?;
if let Some(row_val) = res.into_iter().next() {
self.storage.row_rm_single(&row_val.row_ref).await?;
self.storage.row_rm(&storage::Selector::Single(&row_val.row_ref)).await?;
}
Ok::<_, anyhow::Error>(())
}

View file

@ -168,7 +168,65 @@ impl IStore for GarageStore {
Ok(row_vals)
}
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
unimplemented!();
let del_op = match select {
Selector::Range { shard, sort_begin, sort_end } => vec![k2v_client::BatchDeleteOp {
partition_key: shard,
prefix: None,
start: Some(sort_begin),
end: Some(sort_end),
single_item: false,
}],
Selector::List(row_ref_list) => {
// Insert null values with causality token = delete
let batch_op = row_ref_list.iter().map(|v| k2v_client::BatchInsertOp {
partition_key: &v.uid.shard,
sort_key: &v.uid.sort,
causality: v.causality.clone().map(|ct| ct.into()),
value: k2v_client::K2vValue::Tombstone,
}).collect::<Vec<_>>();
return match self.k2v.insert_batch(&batch_op).await {
Err(e) => {
tracing::error!("Unable to delete the list of values: {}", e);
Err(StorageError::Internal)
},
Ok(_) => Ok(()),
};
},
Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp {
partition_key: shard,
prefix: Some(sort_prefix),
start: None,
end: None,
single_item: false,
}],
Selector::Single(row_ref) => {
// Insert null values with causality token = delete
let batch_op = vec![k2v_client::BatchInsertOp {
partition_key: &row_ref.uid.shard,
sort_key: &row_ref.uid.sort,
causality: row_ref.causality.clone().map(|ct| ct.into()),
value: k2v_client::K2vValue::Tombstone,
}];
return match self.k2v.insert_batch(&batch_op).await {
Err(e) => {
tracing::error!("Unable to delete the list of values: {}", e);
Err(StorageError::Internal)
},
Ok(_) => Ok(()),
};
},
};
// Finally here we only have prefix & range
match self.k2v.delete_batch(&del_op).await {
Err(e) => {
tracing::error!("delete batch error: {}", e);
Err(StorageError::Internal)
},
Ok(_) => Ok(()),
}
}
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> {
@ -223,10 +281,6 @@ impl IStore for GarageStore {
}
}
async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
unimplemented!();
}
async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> {
let maybe_out = self.s3
.get_object()

View file

@ -137,6 +137,32 @@ fn prefix_last_bound(prefix: &str) -> Bound<String> {
}
}
impl MemStore {
fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
tracing::trace!(entry=%entry, command="row_rm_single");
let mut store = self.row.write().or(Err(StorageError::Internal))?;
let shard = &entry.uid.shard;
let sort = &entry.uid.sort;
let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) {
Some(Ok(v)) => v,
_ => 0,
};
let bt = store.entry(shard.to_string()).or_default();
let intval = bt.entry(sort.to_string()).or_default();
if cauz == intval.version {
intval.data.clear();
}
intval.data.push(InternalData::Tombstone);
intval.version += 1;
intval.change.notify_waiters();
Ok(())
}
}
#[async_trait]
impl IStore for MemStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> {
@ -183,37 +209,17 @@ impl IStore for MemStore {
}
}
async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> {
tracing::trace!(entry=%entry, command="row_rm_single");
let mut store = self.row.write().or(Err(StorageError::Internal))?;
let shard = &entry.uid.shard;
let sort = &entry.uid.sort;
let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) {
Some(Ok(v)) => v,
_ => 0,
};
let bt = store.entry(shard.to_string()).or_default();
let intval = bt.entry(sort.to_string()).or_default();
if cauz == intval.version {
intval.data.clear();
}
intval.data.push(InternalData::Tombstone);
intval.version += 1;
intval.change.notify_waiters();
Ok(())
}
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> {
tracing::trace!(select=%select, command="row_rm");
//@FIXME not efficient at all...
let values = self.row_fetch(select).await?;
let values = match select {
Selector::Range { .. } | Selector::Prefix { .. } => self.row_fetch(select).await?.into_iter().map(|rv| rv.row_ref).collect::<Vec<_>>(),
Selector::List(rlist) => rlist.clone(),
Selector::Single(row_ref) => vec![(*row_ref).clone()],
};
for v in values.into_iter() {
self.row_rm_single(&v.row_ref).await?;
self.row_rm_single(&v)?;
}
Ok(())
}

View file

@ -90,11 +90,6 @@ impl RowVal {
#[derive(Debug, Clone)]
pub struct BlobRef(pub String);
impl BlobRef {
pub fn new(key: &str) -> Self {
Self(key.to_string())
}
}
impl std::fmt::Display for BlobRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BlobRef({})", self.0)
@ -125,6 +120,7 @@ impl BlobVal {
pub enum Selector<'a> {
Range { shard: &'a str, sort_begin: &'a str, sort_end: &'a str },
List (Vec<RowRef>), // list of (shard_key, sort_key)
#[allow(dead_code)]
Prefix { shard: &'a str, sort_prefix: &'a str },
Single(&'a RowRef),
}
@ -143,7 +139,6 @@ impl<'a> std::fmt::Display for Selector<'a> {
pub trait IStore {
async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>;
async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>;
async fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError>;
async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>;
async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>;