K2V #293

Merged
lx merged 68 commits from k2v into main 2022-05-10 11:16:58 +00:00
5 changed files with 108 additions and 4 deletions
Showing only changes of commit da14343ea7 - Show all commits

View file

@ -132,7 +132,7 @@ impl BlockManager {
let endpoint = system let endpoint = system
.netapp .netapp
.endpoint("garage_model/block.rs/Rpc".to_string()); .endpoint("garage_block/manager.rs/Rpc".to_string());
let manager_locked = BlockManagerLocked(); let manager_locked = BlockManagerLocked();

View file

@ -14,6 +14,7 @@ use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
use crate::k2v::item_table::*; use crate::k2v::item_table::*;
use crate::k2v::rpc::*;
use crate::s3::block_ref_table::*; use crate::s3::block_ref_table::*;
use crate::s3::object_table::*; use crate::s3::object_table::*;
use crate::s3::version_table::*; use crate::s3::version_table::*;
@ -53,6 +54,8 @@ pub struct Garage {
/// Table containing K2V items /// Table containing K2V items
pub k2v_item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, pub k2v_item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
/// K2V RPC handler
pub k2v_rpc: Arc<K2VRpcHandler>,
} }
impl Garage { impl Garage {
@ -150,8 +153,9 @@ impl Garage {
&db, &db,
); );
// ---- K2V tables ---- // ---- K2V ----
let k2v_item_table = Table::new(K2VItemTable {}, meta_rep_param, system.clone(), &db); let k2v_item_table = Table::new(K2VItemTable {}, meta_rep_param, system.clone(), &db);
let k2v_rpc = K2VRpcHandler::new(system.clone(), k2v_item_table.clone());
info!("Initialize Garage..."); info!("Initialize Garage...");
@ -168,6 +172,7 @@ impl Garage {
version_table, version_table,
block_ref_table, block_ref_table,
k2v_item_table, k2v_item_table,
k2v_rpc,
}) })
} }

View file

@ -1,6 +1,8 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::convert::TryInto; use std::convert::TryInto;
use serde::{Deserialize, Serialize};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
@ -14,7 +16,7 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
u64::from_be_bytes(tmp) u64::from_be_bytes(tmp)
} }
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug, Serialize, Deserialize)]
pub struct CausalContext { pub struct CausalContext {
pub vector_clock: BTreeMap<K2VNodeId, u64>, pub vector_clock: BTreeMap<K2VNodeId, u64>,
} }

View file

@ -1,3 +1,5 @@
pub mod causality;
pub mod item_table; pub mod item_table;
pub mod causality; pub mod rpc;

95
src/model/k2v/rpc.rs Normal file
View file

@ -0,0 +1,95 @@
//! Module that implements RPCs specific to K2V.
//! This is necessary for insertions into the K2V store,
//! as they have to be transmitted to one of the nodes responsible
//! for storing the entry to be processed (the API entry
//! node does not process the entry directly, as this would
//! mean the vector clock gets much larger than needed).
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use garage_util::data::*;
use garage_util::error::*;
use garage_rpc::system::System;
use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::Table;
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
/// RPC messages for K2V
#[derive(Debug, Serialize, Deserialize)]
pub enum K2VRpc {
Ok,
InsertItem {
bucket_id: Uuid,
partition_key: String,
sort_key: String,
causal_context: Option<CausalContext>,
value: DvvsValue,
},
}
impl Rpc for K2VRpc {
type Response = Result<K2VRpc, Error>;
}
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct K2VRpcHandler {
system: Arc<System>,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
endpoint: Arc<Endpoint<K2VRpc, Self>>,
}
impl K2VRpcHandler {
pub fn new(
system: Arc<System>,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
) -> Arc<Self> {
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
let rpc_handler = Arc::new(Self {
system,
item_table,
endpoint,
});
rpc_handler.endpoint.set_handler(rpc_handler.clone());
rpc_handler
}
async fn handle_insert(
&self,
bucket_id: Uuid,
partition_key: &str,
sort_key: &String,
causal_context: &Option<CausalContext>,
value: &DvvsValue,
) -> Result<K2VRpc, Error> {
unimplemented!() //TODO
}
}
#[async_trait]
impl EndpointHandler<K2VRpc> for K2VRpcHandler {
async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> {
match message {
K2VRpc::InsertItem {
bucket_id,
partition_key,
sort_key,
causal_context,
value,
} => {
self.handle_insert(*bucket_id, partition_key, sort_key, causal_context, value)
.await
}
m => Err(Error::unexpected_rpc_message(m)),
}
}
}