From a9a1d5532db2094790a9cc01d986ff0ff9100596 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 15 Apr 2022 12:14:10 +0200 Subject: [PATCH] RPC code to insert single values in K2V item table --- src/model/k2v/item_table.rs | 4 +- src/model/k2v/rpc.rs | 81 +++++++++++++++++++++++++++++---- src/model/s3/block_ref_table.rs | 8 ++-- src/model/s3/object_table.rs | 5 +- src/model/s3/version_table.rs | 5 +- src/table/data.rs | 46 +++++++++++++------ src/table/schema.rs | 2 +- src/table/table.rs | 2 +- 8 files changed, 119 insertions(+), 34 deletions(-) diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index d3ef5769..b369df49 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -50,7 +50,7 @@ impl K2VItem { pub fn update( &mut self, this_node: Uuid, - context: Option, + context: &Option, new_value: DvvsValue, ) { if let Some(context) = context { @@ -191,7 +191,7 @@ impl TableSchema for K2VItemTable { type E = K2VItem; type Filter = ItemFilter; - fn updated(&self, _old: Option, _new: Option) { + fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) { // nothing for now } diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index c85a726b..857b494d 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -17,7 +17,8 @@ use garage_rpc::system::System; use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; -use garage_table::Table; +use garage_table::table::TABLE_RPC_TIMEOUT; +use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; use crate::k2v::item_table::*; @@ -27,8 +28,7 @@ use crate::k2v::item_table::*; pub enum K2VRpc { Ok, InsertItem { - bucket_id: Uuid, - partition_key: String, + partition: K2VItemPartition, sort_key: String, causal_context: Option, value: DvvsValue, @@ -63,15 +63,79 @@ impl K2VRpcHandler { rpc_handler } - async fn handle_insert( + // ---- public interface ---- + + pub async fn insert( &self, bucket_id: Uuid, - partition_key: &str, + partition_key: String, + sort_key: String, + causal_context: Option, + value: DvvsValue, + ) -> Result<(), Error> { + let partition = K2VItemPartition { + bucket_id, + partition_key, + }; + let mut who = self + .item_table + .data + .replication + .write_nodes(&partition.hash()); + who.sort(); + + self.system + .rpc + .try_call_many( + &self.endpoint, + &who[..], + K2VRpc::InsertItem { + partition, + sort_key, + causal_context, + value, + }, + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(1) + .with_timeout(TABLE_RPC_TIMEOUT), + ) + .await?; + + Ok(()) + } + + // ---- internal handlers ---- + + #[allow(clippy::ptr_arg)] + async fn handle_insert( + &self, + partition: &K2VItemPartition, sort_key: &String, causal_context: &Option, value: &DvvsValue, ) -> Result { - unimplemented!() //TODO + let tree_key = self.item_table.data.tree_key(partition, sort_key); + let new = self + .item_table + .data + .update_entry_with(&tree_key[..], |ent| { + let mut ent = ent.unwrap_or_else(|| { + K2VItem::new( + partition.bucket_id, + partition.partition_key.clone(), + sort_key.clone(), + ) + }); + ent.update(self.system.id, causal_context, value.clone()); + ent + })?; + + // Propagate to rest of network + if let Some(updated) = new { + self.item_table.insert(&updated).await?; + } + + Ok(K2VRpc::Ok) } } @@ -80,13 +144,12 @@ impl EndpointHandler for K2VRpcHandler { async fn handle(self: &Arc, message: &K2VRpc, _from: NodeID) -> Result { match message { K2VRpc::InsertItem { - bucket_id, - partition_key, + partition, sort_key, causal_context, value, } => { - self.handle_insert(*bucket_id, partition_key, sort_key, causal_context, value) + self.handle_insert(partition, sort_key, causal_context, value) .await } m => Err(Error::unexpected_rpc_message(m)), diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index b6945403..9b3991bf 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -51,11 +51,11 @@ impl TableSchema for BlockRefTable { type E = BlockRef; type Filter = DeletedFilter; - fn updated(&self, old: Option, new: Option) { + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { #[allow(clippy::or_fun_call)] - let block = &old.as_ref().or(new.as_ref()).unwrap().block; - let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); - let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); + let block = &old.or(new).unwrap().block; + let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); + let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false); if is_after && !was_before { if let Err(e) = self.block_manager.block_incref(block) { warn!("block_incref failed for block {:?}: {}", block, e); diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index df3e5349..3d9a89f7 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -232,8 +232,11 @@ impl TableSchema for ObjectTable { type E = Object; type Filter = ObjectFilter; - fn updated(&self, old: Option, new: Option) { + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { let version_table = self.version_table.clone(); + let old = old.cloned(); + let new = new.cloned(); + self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of old versions diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 9b46936d..ad096772 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -137,8 +137,11 @@ impl TableSchema for VersionTable { type E = Version; type Filter = DeletedFilter; - fn updated(&self, old: Option, new: Option) { + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { let block_ref_table = self.block_ref_table.clone(); + let old = old.cloned(); + let new = new.cloned(); + self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks diff --git a/src/table/data.rs b/src/table/data.rs index ff7965f5..23ef4b4e 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -20,8 +20,8 @@ use crate::schema::*; pub struct TableData { system: Arc, - pub(crate) instance: F, - pub(crate) replication: R, + pub instance: F, + pub replication: R, pub store: sled::Tree, @@ -136,17 +136,31 @@ where let update = self.decode_entry(update_bytes)?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + self.update_entry_with(&tree_key[..], |ent| match ent { + Some(mut ent) => { + ent.merge(&update); + ent + } + None => update.clone(), + })?; + Ok(()) + } + + pub fn update_entry_with( + &self, + tree_key: &[u8], + f: impl Fn(Option) -> F::E, + ) -> Result, Error> { let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - let (old_entry, old_bytes, new_entry) = match store.get(&tree_key)? { + let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? { Some(old_bytes) => { let old_entry = self .decode_entry(&old_bytes) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); + let new_entry = f(Some(old_entry.clone())); (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, update.clone()), + None => (None, None, f(None)), }; // Scenario 1: the value changed, so of course there is a change @@ -163,8 +177,8 @@ where if value_changed || encoding_changed { let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; - store.insert(tree_key.clone(), new_bytes)?; + mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?; + store.insert(tree_key.to_vec(), new_bytes)?; Ok(Some((old_entry, new_entry, new_bytes_hash))) } else { Ok(None) @@ -175,7 +189,7 @@ where self.metrics.internal_update_counter.add(1); let is_tombstone = new_entry.is_tombstone(); - self.instance.updated(old_entry, Some(new_entry)); + self.instance.updated(old_entry.as_ref(), Some(&new_entry)); self.merkle_todo_notify.notify_one(); if is_tombstone { // We are only responsible for GC'ing this item if we are the @@ -187,12 +201,14 @@ where let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let nodes = self.replication.write_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { - GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; + GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?; } } - } - Ok(()) + Ok(Some(new_entry)) + } else { + Ok(None) + } } pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { @@ -211,7 +227,7 @@ where self.metrics.internal_delete_counter.add(1); let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None); + self.instance.updated(Some(&old_entry), None); self.merkle_todo_notify.notify_one(); } Ok(removed) @@ -235,7 +251,7 @@ where if let Some(old_v) = removed { let old_entry = self.decode_entry(&old_v[..])?; - self.instance.updated(Some(old_entry), None); + self.instance.updated(Some(&old_entry), None); self.merkle_todo_notify.notify_one(); Ok(true) } else { @@ -245,7 +261,7 @@ where // ---- Utility functions ---- - pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { + pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { let mut ret = p.hash().to_vec(); ret.extend(s.sort_key()); ret diff --git a/src/table/schema.rs b/src/table/schema.rs index eba918a2..37327037 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -86,7 +86,7 @@ pub trait TableSchema: Send + Sync { // as the update itself is an unchangeable fact that will never go back // due to CRDT logic. Typically errors in propagation of info should be logged // to stderr. - fn updated(&self, _old: Option, _new: Option) {} + fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {} fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool; } diff --git a/src/table/table.rs b/src/table/table.rs index 7f87a449..f3e5b881 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -27,7 +27,7 @@ use crate::replication::*; use crate::schema::*; use crate::sync::*; -const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); +pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub struct Table { pub system: Arc,