RPC code to insert single values in K2V item table
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing

This commit is contained in:
Alex 2022-04-15 12:14:10 +02:00
parent da14343ea7
commit a9a1d5532d
Signed by: lx
GPG key ID: 0E496D15096376BE
8 changed files with 119 additions and 34 deletions

View file

@ -50,7 +50,7 @@ impl K2VItem {
pub fn update(
&mut self,
this_node: Uuid,
context: Option<CausalContext>,
context: &Option<CausalContext>,
new_value: DvvsValue,
) {
if let Some(context) = context {
@ -191,7 +191,7 @@ impl TableSchema for K2VItemTable {
type E = K2VItem;
type Filter = ItemFilter;
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {
fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {
// nothing for now
}

View file

@ -17,7 +17,8 @@ use garage_rpc::system::System;
use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::Table;
use garage_table::table::TABLE_RPC_TIMEOUT;
use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
@ -27,8 +28,7 @@ use crate::k2v::item_table::*;
pub enum K2VRpc {
Ok,
InsertItem {
bucket_id: Uuid,
partition_key: String,
partition: K2VItemPartition,
sort_key: String,
causal_context: Option<CausalContext>,
value: DvvsValue,
@ -63,15 +63,79 @@ impl K2VRpcHandler {
rpc_handler
}
async fn handle_insert(
// ---- public interface ----
pub async fn insert(
&self,
bucket_id: Uuid,
partition_key: &str,
partition_key: String,
sort_key: String,
causal_context: Option<CausalContext>,
value: DvvsValue,
) -> Result<(), Error> {
let partition = K2VItemPartition {
bucket_id,
partition_key,
};
let mut who = self
.item_table
.data
.replication
.write_nodes(&partition.hash());
who.sort();
self.system
.rpc
.try_call_many(
&self.endpoint,
&who[..],
K2VRpc::InsertItem {
partition,
sort_key,
causal_context,
value,
},
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
Ok(())
}
// ---- internal handlers ----
#[allow(clippy::ptr_arg)]
async fn handle_insert(
&self,
partition: &K2VItemPartition,
sort_key: &String,
causal_context: &Option<CausalContext>,
value: &DvvsValue,
) -> Result<K2VRpc, Error> {
unimplemented!() //TODO
let tree_key = self.item_table.data.tree_key(partition, sort_key);
let new = self
.item_table
.data
.update_entry_with(&tree_key[..], |ent| {
let mut ent = ent.unwrap_or_else(|| {
K2VItem::new(
partition.bucket_id,
partition.partition_key.clone(),
sort_key.clone(),
)
});
ent.update(self.system.id, causal_context, value.clone());
ent
})?;
// Propagate to rest of network
if let Some(updated) = new {
self.item_table.insert(&updated).await?;
}
Ok(K2VRpc::Ok)
}
}
@ -80,13 +144,12 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> {
match message {
K2VRpc::InsertItem {
bucket_id,
partition_key,
partition,
sort_key,
causal_context,
value,
} => {
self.handle_insert(*bucket_id, partition_key, sort_key, causal_context, value)
self.handle_insert(partition, sort_key, causal_context, value)
.await
}
m => Err(Error::unexpected_rpc_message(m)),

View file

@ -51,11 +51,11 @@ impl TableSchema for BlockRefTable {
type E = BlockRef;
type Filter = DeletedFilter;
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
#[allow(clippy::or_fun_call)]
let block = &old.as_ref().or(new.as_ref()).unwrap().block;
let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
let block = &old.or(new).unwrap().block;
let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false);
let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false);
if is_after && !was_before {
if let Err(e) = self.block_manager.block_incref(block) {
warn!("block_incref failed for block {:?}: {}", block, e);

View file

@ -232,8 +232,11 @@ impl TableSchema for ObjectTable {
type E = Object;
type Filter = ObjectFilter;
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
let version_table = self.version_table.clone();
let old = old.cloned();
let new = new.cloned();
self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of old versions

View file

@ -137,8 +137,11 @@ impl TableSchema for VersionTable {
type E = Version;
type Filter = DeletedFilter;
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
let block_ref_table = self.block_ref_table.clone();
let old = old.cloned();
let new = new.cloned();
self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks

View file

@ -20,8 +20,8 @@ use crate::schema::*;
pub struct TableData<F: TableSchema, R: TableReplication> {
system: Arc<System>,
pub(crate) instance: F,
pub(crate) replication: R,
pub instance: F,
pub replication: R,
pub store: sled::Tree,
@ -136,17 +136,31 @@ where
let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
self.update_entry_with(&tree_key[..], |ent| match ent {
Some(mut ent) => {
ent.merge(&update);
ent
}
None => update.clone(),
})?;
Ok(())
}
pub fn update_entry_with(
&self,
tree_key: &[u8],
f: impl Fn(Option<F::E>) -> F::E,
) -> Result<Option<F::E>, Error> {
let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
let (old_entry, old_bytes, new_entry) = match store.get(&tree_key)? {
let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? {
Some(old_bytes) => {
let old_entry = self
.decode_entry(&old_bytes)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
let mut new_entry = old_entry.clone();
new_entry.merge(&update);
let new_entry = f(Some(old_entry.clone()));
(Some(old_entry), Some(old_bytes), new_entry)
}
None => (None, None, update.clone()),
None => (None, None, f(None)),
};
// Scenario 1: the value changed, so of course there is a change
@ -163,8 +177,8 @@ where
if value_changed || encoding_changed {
let new_bytes_hash = blake2sum(&new_bytes[..]);
mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
store.insert(tree_key.clone(), new_bytes)?;
mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?;
store.insert(tree_key.to_vec(), new_bytes)?;
Ok(Some((old_entry, new_entry, new_bytes_hash)))
} else {
Ok(None)
@ -175,7 +189,7 @@ where
self.metrics.internal_update_counter.add(1);
let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry, Some(new_entry));
self.instance.updated(old_entry.as_ref(), Some(&new_entry));
self.merkle_todo_notify.notify_one();
if is_tombstone {
// We are only responsible for GC'ing this item if we are the
@ -187,12 +201,14 @@ where
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
let nodes = self.replication.write_nodes(&pk_hash);
if nodes.first() == Some(&self.system.id) {
GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?;
}
}
}
Ok(())
Ok(Some(new_entry))
} else {
Ok(None)
}
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
@ -211,7 +227,7 @@ where
self.metrics.internal_delete_counter.add(1);
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None);
self.instance.updated(Some(&old_entry), None);
self.merkle_todo_notify.notify_one();
}
Ok(removed)
@ -235,7 +251,7 @@ where
if let Some(old_v) = removed {
let old_entry = self.decode_entry(&old_v[..])?;
self.instance.updated(Some(old_entry), None);
self.instance.updated(Some(&old_entry), None);
self.merkle_todo_notify.notify_one();
Ok(true)
} else {
@ -245,7 +261,7 @@ where
// ---- Utility functions ----
pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
let mut ret = p.hash().to_vec();
ret.extend(s.sort_key());
ret

View file

@ -86,7 +86,7 @@ pub trait TableSchema: Send + Sync {
// as the update itself is an unchangeable fact that will never go back
// due to CRDT logic. Typically errors in propagation of info should be logged
// to stderr.
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
}

View file

@ -27,7 +27,7 @@ use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub system: Arc<System>,