diff --git a/src/block/manager.rs b/src/block/manager.rs
index 1c04a335..9b2d9cad 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -132,7 +132,7 @@ impl BlockManager {
let endpoint = system
.netapp
- .endpoint("garage_model/block.rs/Rpc".to_string());
+ .endpoint("garage_block/manager.rs/Rpc".to_string());
let manager_locked = BlockManagerLocked();
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 7132ca37..3d538ecd 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -14,6 +14,7 @@ use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::k2v::item_table::*;
+use crate::k2v::rpc::*;
use crate::s3::block_ref_table::*;
use crate::s3::object_table::*;
use crate::s3::version_table::*;
@@ -53,6 +54,8 @@ pub struct Garage {
/// Table containing K2V items
pub k2v_item_table: Arc
>,
+ /// K2V RPC handler
+ pub k2v_rpc: Arc,
}
impl Garage {
@@ -150,8 +153,9 @@ impl Garage {
&db,
);
- // ---- K2V tables ----
+ // ---- K2V ----
let k2v_item_table = Table::new(K2VItemTable {}, meta_rep_param, system.clone(), &db);
+ let k2v_rpc = K2VRpcHandler::new(system.clone(), k2v_item_table.clone());
info!("Initialize Garage...");
@@ -168,6 +172,7 @@ impl Garage {
version_table,
block_ref_table,
k2v_item_table,
+ k2v_rpc,
})
}
diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs
index 848b200e..3e7d4a46 100644
--- a/src/model/k2v/causality.rs
+++ b/src/model/k2v/causality.rs
@@ -1,6 +1,8 @@
use std::collections::BTreeMap;
use std::convert::TryInto;
+use serde::{Deserialize, Serialize};
+
use garage_util::data::*;
use garage_util::error::*;
@@ -14,7 +16,7 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
u64::from_be_bytes(tmp)
}
-#[derive(PartialEq, Debug)]
+#[derive(PartialEq, Debug, Serialize, Deserialize)]
pub struct CausalContext {
pub vector_clock: BTreeMap,
}
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
index 4d269624..d6531764 100644
--- a/src/model/k2v/mod.rs
+++ b/src/model/k2v/mod.rs
@@ -1,3 +1,5 @@
+pub mod causality;
+
pub mod item_table;
-pub mod causality;
+pub mod rpc;
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
new file mode 100644
index 00000000..c85a726b
--- /dev/null
+++ b/src/model/k2v/rpc.rs
@@ -0,0 +1,95 @@
+//! Module that implements RPCs specific to K2V.
+//! This is necessary for insertions into the K2V store,
+//! as they have to be transmitted to one of the nodes responsible
+//! for storing the entry to be processed (the API entry
+//! node does not process the entry directly, as this would
+//! mean the vector clock gets much larger than needed).
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+use garage_util::error::*;
+
+use garage_rpc::system::System;
+use garage_rpc::*;
+
+use garage_table::replication::{TableReplication, TableShardedReplication};
+use garage_table::Table;
+
+use crate::k2v::causality::*;
+use crate::k2v::item_table::*;
+
+/// RPC messages for K2V
+#[derive(Debug, Serialize, Deserialize)]
+pub enum K2VRpc {
+ Ok,
+ InsertItem {
+ bucket_id: Uuid,
+ partition_key: String,
+ sort_key: String,
+ causal_context: Option,
+ value: DvvsValue,
+ },
+}
+
+impl Rpc for K2VRpc {
+ type Response = Result;
+}
+
+/// The block manager, handling block exchange between nodes, and block storage on local node
+pub struct K2VRpcHandler {
+ system: Arc,
+ item_table: Arc>,
+ endpoint: Arc>,
+}
+
+impl K2VRpcHandler {
+ pub fn new(
+ system: Arc,
+ item_table: Arc>,
+ ) -> Arc {
+ let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
+
+ let rpc_handler = Arc::new(Self {
+ system,
+ item_table,
+ endpoint,
+ });
+ rpc_handler.endpoint.set_handler(rpc_handler.clone());
+
+ rpc_handler
+ }
+
+ async fn handle_insert(
+ &self,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &String,
+ causal_context: &Option,
+ value: &DvvsValue,
+ ) -> Result {
+ unimplemented!() //TODO
+ }
+}
+
+#[async_trait]
+impl EndpointHandler for K2VRpcHandler {
+ async fn handle(self: &Arc, message: &K2VRpc, _from: NodeID) -> Result {
+ match message {
+ K2VRpc::InsertItem {
+ bucket_id,
+ partition_key,
+ sort_key,
+ causal_context,
+ value,
+ } => {
+ self.handle_insert(*bucket_id, partition_key, sort_key, causal_context, value)
+ .await
+ }
+ m => Err(Error::unexpected_rpc_message(m)),
+ }
+ }
+}