use std::marker::PhantomData; use std::time::Duration; use std::sync::Arc; use serde::{Serialize, Deserialize}; use async_trait::async_trait; use crate::error::Error; use crate::proto::*; use crate::data::*; use crate::membership::System; use crate::rpc_client::*; pub struct Table { phantom: PhantomData, pub name: String, pub system: Arc, pub store: sled::Tree, pub partitions: Vec, pub param: TableReplicationParams, } #[derive(Clone)] pub struct TableReplicationParams { pub replication_factor: usize, pub read_quorum: usize, pub write_quorum: usize, pub timeout: Duration, } #[async_trait] pub trait TableRpcHandler { async fn handle(&self, rpc: &[u8]) -> Result, Error>; } struct TableRpcHandlerAdapter { table: Arc>, } #[async_trait] impl TableRpcHandler for TableRpcHandlerAdapter { async fn handle(&self, rpc: &[u8]) -> Result, Error> { let msg = rmp_serde::decode::from_read_ref::<_, TableRPC>(rpc)?; let rep = self.table.handle(msg).await?; Ok(rmp_serde::encode::to_vec_named(&rep)?) } } #[derive(Debug, Serialize, Deserialize)] pub enum TableRPC { Update(F::K, F::V), } pub struct Partition { pub begin: Hash, pub end: Hash, pub other_nodes: Vec, } pub trait KeyHash { fn hash(&self) -> Hash; } pub trait ValueMerge { fn merge(&mut self, other: &Self); } #[async_trait] pub trait TableFormat: Send + Sync { type K: Clone + Serialize + for<'de> Deserialize<'de> + KeyHash + Send + Sync; type V: Clone + Serialize + for<'de> Deserialize<'de> + ValueMerge + Send + Sync; async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V); } impl Table { pub fn new(system: Arc, db: &sled::Db, name: String, param: TableReplicationParams) -> Self { let store = db.open_tree(&name) .expect("Unable to open DB tree"); Self{ phantom: PhantomData::default(), name, system, store, partitions: Vec::new(), param, } } pub fn rpc_handler(self: Arc) -> Box { Box::new(TableRpcHandlerAdapter::{ table: self }) } pub async fn insert(&self, k: &F::K, v: &F::V) -> Result<(), Error> { unimplemented!(); let hash = k.hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); let msg = rmp_serde::encode::to_vec_named(&TableRPC::::Update(k.clone(), v.clone()))?; rpc_try_call_many(self.system.clone(), &who[..], &Message::TableRPC(self.name.to_string(), msg), self.param.write_quorum, self.param.timeout).await?; Ok(()) } async fn handle(&self, msg: TableRPC) -> Result, Error> { unimplemented!() } }