From da14343ea7ddf176588f4d6acdbe98a594a85ab7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 14 Apr 2022 16:42:18 +0200 Subject: [PATCH] prepare k2v rpc --- src/block/manager.rs | 2 +- src/model/garage.rs | 7 ++- src/model/k2v/causality.rs | 4 +- src/model/k2v/mod.rs | 4 +- src/model/k2v/rpc.rs | 95 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 108 insertions(+), 4 deletions(-) create mode 100644 src/model/k2v/rpc.rs diff --git a/src/block/manager.rs b/src/block/manager.rs index 1c04a335..9b2d9cad 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -132,7 +132,7 @@ impl BlockManager { let endpoint = system .netapp - .endpoint("garage_model/block.rs/Rpc".to_string()); + .endpoint("garage_block/manager.rs/Rpc".to_string()); let manager_locked = BlockManagerLocked(); diff --git a/src/model/garage.rs b/src/model/garage.rs index 7132ca37..3d538ecd 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -14,6 +14,7 @@ use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::k2v::item_table::*; +use crate::k2v::rpc::*; use crate::s3::block_ref_table::*; use crate::s3::object_table::*; use crate::s3::version_table::*; @@ -53,6 +54,8 @@ pub struct Garage { /// Table containing K2V items pub k2v_item_table: Arc>, + /// K2V RPC handler + pub k2v_rpc: Arc, } impl Garage { @@ -150,8 +153,9 @@ impl Garage { &db, ); - // ---- K2V tables ---- + // ---- K2V ---- let k2v_item_table = Table::new(K2VItemTable {}, meta_rep_param, system.clone(), &db); + let k2v_rpc = K2VRpcHandler::new(system.clone(), k2v_item_table.clone()); info!("Initialize Garage..."); @@ -168,6 +172,7 @@ impl Garage { version_table, block_ref_table, k2v_item_table, + k2v_rpc, }) } diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 848b200e..3e7d4a46 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -1,6 +1,8 @@ use std::collections::BTreeMap; use std::convert::TryInto; +use serde::{Deserialize, Serialize}; + use garage_util::data::*; use garage_util::error::*; @@ -14,7 +16,7 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId { u64::from_be_bytes(tmp) } -#[derive(PartialEq, Debug)] +#[derive(PartialEq, Debug, Serialize, Deserialize)] pub struct CausalContext { pub vector_clock: BTreeMap, } diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index 4d269624..d6531764 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -1,3 +1,5 @@ +pub mod causality; + pub mod item_table; -pub mod causality; +pub mod rpc; diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs new file mode 100644 index 00000000..c85a726b --- /dev/null +++ b/src/model/k2v/rpc.rs @@ -0,0 +1,95 @@ +//! Module that implements RPCs specific to K2V. +//! This is necessary for insertions into the K2V store, +//! as they have to be transmitted to one of the nodes responsible +//! for storing the entry to be processed (the API entry +//! node does not process the entry directly, as this would +//! mean the vector clock gets much larger than needed). + +use std::sync::Arc; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; +use garage_util::error::*; + +use garage_rpc::system::System; +use garage_rpc::*; + +use garage_table::replication::{TableReplication, TableShardedReplication}; +use garage_table::Table; + +use crate::k2v::causality::*; +use crate::k2v::item_table::*; + +/// RPC messages for K2V +#[derive(Debug, Serialize, Deserialize)] +pub enum K2VRpc { + Ok, + InsertItem { + bucket_id: Uuid, + partition_key: String, + sort_key: String, + causal_context: Option, + value: DvvsValue, + }, +} + +impl Rpc for K2VRpc { + type Response = Result; +} + +/// The block manager, handling block exchange between nodes, and block storage on local node +pub struct K2VRpcHandler { + system: Arc, + item_table: Arc>, + endpoint: Arc>, +} + +impl K2VRpcHandler { + pub fn new( + system: Arc, + item_table: Arc>, + ) -> Arc { + let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); + + let rpc_handler = Arc::new(Self { + system, + item_table, + endpoint, + }); + rpc_handler.endpoint.set_handler(rpc_handler.clone()); + + rpc_handler + } + + async fn handle_insert( + &self, + bucket_id: Uuid, + partition_key: &str, + sort_key: &String, + causal_context: &Option, + value: &DvvsValue, + ) -> Result { + unimplemented!() //TODO + } +} + +#[async_trait] +impl EndpointHandler for K2VRpcHandler { + async fn handle(self: &Arc, message: &K2VRpc, _from: NodeID) -> Result { + match message { + K2VRpc::InsertItem { + bucket_id, + partition_key, + sort_key, + causal_context, + value, + } => { + self.handle_insert(*bucket_id, partition_key, sort_key, causal_context, value) + .await + } + m => Err(Error::unexpected_rpc_message(m)), + } + } +}