use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use crate::data::*; use crate::error::Error; use crate::rpc::membership::{Ring, System}; use crate::rpc::rpc_client::*; use crate::rpc::rpc_server::*; use crate::table::table_sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub struct Table { pub instance: F, pub replication: R, pub name: String, pub rpc_client: Arc>>, pub system: Arc, pub store: sled::Tree, pub syncer: ArcSwapOption>, } #[derive(Serialize, Deserialize)] pub enum TableRPC { Ok, ReadEntry(F::P, F::S), ReadEntryResponse(Option), // Read range: read all keys in partition P, possibly starting at a certain sort key offset ReadRange(F::P, Option, Option, usize), Update(Vec>), SyncRPC(SyncRPC), } impl RpcMessage for TableRPC {} pub trait PartitionKey { fn hash(&self) -> Hash; } pub trait SortKey { fn sort_key(&self) -> &[u8]; } pub trait Entry: PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { fn partition_key(&self) -> &P; fn sort_key(&self) -> &S; fn merge(&mut self, other: &Self); } #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct EmptyKey; impl SortKey for EmptyKey { fn sort_key(&self) -> &[u8] { &[] } } impl PartitionKey for EmptyKey { fn hash(&self) -> Hash { [0u8; 32].into() } } impl> PartitionKey for T { fn hash(&self) -> Hash { hash(self.as_ref().as_bytes()) } } impl> SortKey for T { fn sort_key(&self) -> &[u8] { self.as_ref().as_bytes() } } impl PartitionKey for Hash { fn hash(&self) -> Hash { self.clone() } } impl SortKey for Hash { fn sort_key(&self) -> &[u8] { self.as_slice() } } #[async_trait] pub trait TableSchema: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type E: Entry; type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; async fn updated(&self, old: Option, new: Option) -> Result<(), Error>; fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true } } pub trait TableReplication: Send + Sync { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods // Which nodes to send reads from fn read_nodes(&self, hash: &Hash, system: &System) -> Vec; fn read_quorum(&self) -> usize; // Which nodes to send writes to fn write_nodes(&self, hash: &Hash, system: &System) -> Vec; fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; fn epidemic_writes(&self) -> bool; // Which are the nodes that do actually replicate the data fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec; fn split_points(&self, ring: &Ring) -> Vec; } impl Table where F: TableSchema + 'static, R: TableReplication + 'static, { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== pub async fn new( instance: F, replication: R, system: Arc, db: &sled::Db, name: String, rpc_server: &mut RpcServer, ) -> Arc { let store = db.open_tree(&name).expect("Unable to open DB tree"); let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::>(&rpc_path); let table = Arc::new(Self { instance, replication, name, rpc_client, system, store, syncer: ArcSwapOption::from(None), }); table.clone().register_handler(rpc_server, rpc_path); let syncer = TableSyncer::launch(table.clone()).await; table.syncer.swap(Some(syncer)); table } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); let who = self.replication.write_nodes(&hash, &self.system); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::::Update(vec![e_enc]); self.rpc_client .try_call_many( &who[..], rpc, RequestStrategy::with_quorum(self.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; Ok(()) } pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { let mut call_list = HashMap::new(); for entry in entries.iter() { let hash = entry.partition_key().hash(); let who = self.replication.write_nodes(&hash, &self.system); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { call_list.insert(node, vec![]); } call_list.get_mut(&node).unwrap().push(e_enc.clone()); } } let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::::Update(entries); let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::>(); let mut errors = vec![]; while let Some(resp) = resps.next().await { if let Err(e) = resp { errors.push(e); } } if errors.len() > self.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { Ok(()) } } pub async fn get( self: &Arc, partition_key: &F::P, sort_key: &F::S, ) -> Result, Error> { let hash = partition_key.hash(); let who = self.replication.read_nodes(&hash, &self.system); //eprintln!("get who: {:?}", who); let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self .rpc_client .try_call_many( &who[..], rpc, RequestStrategy::with_quorum(self.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) .await?; let mut ret = None; let mut not_all_same = false; for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { if x != v { not_all_same = true; x.merge(&v); } Some(x) } } } } else { return Err(Error::Message(format!("Invalid return value to read"))); } } if let Some(ret_entry) = &ret { if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); self.system .background .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } } Ok(ret) } pub async fn get_range( self: &Arc, partition_key: &F::P, begin_sort_key: Option, filter: Option, limit: usize, ) -> Result, Error> { let hash = partition_key.hash(); let who = self.replication.read_nodes(&hash, &self.system); let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self .rpc_client .try_call_many( &who[..], rpc, RequestStrategy::with_quorum(self.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) .await?; let mut ret = BTreeMap::new(); let mut to_repair = BTreeMap::new(); for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { let entry = rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?; let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { ret.insert(entry_key, Some(entry)); } Some(Some(mut prev)) => { let must_repair = prev != entry; prev.merge(&entry); if must_repair { to_repair.insert(entry_key.clone(), Some(prev.clone())); } ret.insert(entry_key, Some(prev)); } Some(None) => unreachable!(), } } } } if !to_repair.is_empty() { let self2 = self.clone(); self.system.background.spawn_cancellable(async move { for (_, v) in to_repair.iter_mut() { self2.repair_on_read(&who[..], v.take().unwrap()).await?; } Ok(()) }); } let ret_vec = ret .iter_mut() .take(limit) .map(|(_k, v)| v.take().unwrap()) .collect::>(); Ok(ret_vec) } // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.rpc_client .try_call_many( &who[..], TableRPC::::Update(vec![what_enc]), RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT), ) .await?; Ok(()) } // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ============== fn register_handler(self: Arc, rpc_server: &mut RpcServer, path: String) { let self2 = self.clone(); rpc_server.add_handler::, _, _>(path, move |msg, _addr| { let self2 = self2.clone(); async move { self2.handle(&msg).await } }); let self2 = self.clone(); self.rpc_client .set_local_handler(self.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } }); } async fn handle(self: &Arc, msg: &TableRPC) -> Result, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { let value = self.handle_read_entry(key, sort_key)?; Ok(TableRPC::ReadEntryResponse(value)) } TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?; Ok(TableRPC::Update(values)) } TableRPC::Update(pairs) => { self.handle_update(pairs).await?; Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { let syncer = self.syncer.load_full().unwrap(); let response = syncer .handle_rpc(rpc, self.system.background.stop_signal.clone()) .await?; Ok(TableRPC::SyncRPC(response)) } _ => Err(Error::BadRequest(format!("Unexpected table RPC"))), } } fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result, Error> { let tree_key = self.tree_key(p, s); if let Some(bytes) = self.store.get(&tree_key)? { Ok(Some(ByteBuf::from(bytes.to_vec()))) } else { Ok(None) } } fn handle_read_range( &self, p: &F::P, s: &Option, filter: &Option, limit: usize, ) -> Result>, Error> { let partition_hash = p.hash(); let first_key = match s { None => partition_hash.to_vec(), Some(sk) => self.tree_key(p, sk), }; let mut ret = vec![]; for item in self.store.range(first_key..) { let (key, value) = item?; if &key[..32] != partition_hash.as_slice() { break; } let keep = match filter { None => true, Some(f) => { let entry = rmp_serde::decode::from_read_ref::<_, F::E>(value.as_ref())?; F::matches_filter(&entry, f) } }; if keep { ret.push(Arc::new(ByteBuf::from(value.as_ref()))); } if ret.len() >= limit { break; } } Ok(ret) } pub async fn handle_update(self: &Arc, entries: &[Arc]) -> Result<(), Error> { let syncer = self.syncer.load_full().unwrap(); let mut epidemic_propagate = vec![]; for update_bytes in entries.iter() { let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let (old_entry, new_entry) = self.store.transaction(|db| { let (old_entry, new_entry) = match db.get(&tree_key)? { Some(prev_bytes) => { let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes) .map_err(Error::RMPDecode) .map_err(sled::ConflictableTransactionError::Abort)?; let mut new_entry = old_entry.clone(); new_entry.merge(&update); (Some(old_entry), new_entry) } None => (None, update.clone()), }; let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RMPEncode) .map_err(sled::ConflictableTransactionError::Abort)?; db.insert(tree_key.clone(), new_bytes)?; Ok((old_entry, new_entry)) })?; if old_entry.as_ref() != Some(&new_entry) { if self.replication.epidemic_writes() { epidemic_propagate.push(new_entry.clone()); } self.instance.updated(old_entry, Some(new_entry)).await?; self.system .background .spawn_cancellable(syncer.clone().invalidate(tree_key)); } } if epidemic_propagate.len() > 0 { let self2 = self.clone(); self.system .background .spawn_cancellable(async move { self2.insert_many(&epidemic_propagate[..]).await }); } Ok(()) } pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { let syncer = self.syncer.load_full().unwrap(); debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end); let mut count = 0; while let Some((key, _value)) = self.store.get_lt(end.as_slice())? { if key.as_ref() < begin.as_slice() { break; } if let Some(old_val) = self.store.remove(&key)? { let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?; self.instance.updated(Some(old_entry), None).await?; self.system .background .spawn_cancellable(syncer.clone().invalidate(key.to_vec())); count += 1; } } debug!("({}) {} entries deleted", self.name, count); Ok(()) } fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { let mut ret = p.hash().to_vec(); ret.extend(s.sort_key()); ret } }