diff --git a/Cargo.lock b/Cargo.lock index b5dd999f..58e4046b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -292,6 +292,7 @@ dependencies = [ "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.13.4 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", + "reduce 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", "serde_bytes 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -718,6 +719,11 @@ name = "redox_syscall" version = "0.1.56" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "reduce" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "rmp" version = "0.8.9" @@ -1129,6 +1135,7 @@ dependencies = [ "checksum rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" "checksum rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" "checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" +"checksum reduce 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "75b1fa5668b02f2a69746bba558f8f98cc087b123a587fd959122872ad9a3f3c" "checksum rmp 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)" = "0f10b46df14cf1ee1ac7baa4d2fbc2c52c0622a4b82fa8740e37bc452ac0184f" "checksum rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4c1ee98f14fe8b8e9c5ea13d25da7b2a1796169202c57a09d7288de90d56222b" "checksum rustversion 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b3bba175698996010c4f6dce5e7f173b6eb781fce25d2cfc45e27091ce0b79f6" diff --git a/Cargo.toml b/Cargo.toml index 51bdd070..f9c15725 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,3 +27,4 @@ rand = "0.7" hex = "0.3" sha2 = "0.8" async-trait = "0.1.30" +reduce = "0.1.2" diff --git a/src/error.rs b/src/error.rs index 30f7dac6..0cfafca3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -15,6 +15,9 @@ pub enum Error { #[error(display = "Invalid HTTP header value: {}", _0)] HTTPHeader(#[error(source)] http::header::ToStrError), + #[error(display = "Sled error: {}", _0)] + Sled(#[error(source)] sled::Error), + #[error(display = "Messagepack encode error: {}", _0)] RMPEncode(#[error(source)] rmp_serde::encode::Error), #[error(display = "Messagepack decode error: {}", _0)] @@ -32,6 +35,9 @@ pub enum Error { #[error(display = "{}", _0)] BadRequest(String), + #[error(display = "Entry not found")] + NotFound, + #[error(display = "{}", _0)] Message(String), } diff --git a/src/rpc_server.rs b/src/rpc_server.rs index eda300c4..0ac26141 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1,7 +1,6 @@ use std::net::SocketAddr; use std::sync::Arc; -use futures_util::future::FutureExt; use bytes::IntoBuf; use hyper::service::{make_service_fn, service_fn}; use hyper::server::conn::AddrStream; diff --git a/src/server.rs b/src/server.rs index 31f1cc28..1f1ac2af 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,6 +5,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use futures::channel::oneshot; use serde::Deserialize; +use tokio::sync::RwLock; use crate::data::*; use crate::proto::*; @@ -24,7 +25,7 @@ pub struct Garage { } impl Garage { - pub fn new(config: Config, id: UUID, db: sled::Db) -> Self { + pub async fn new(config: Config, id: UUID, db: sled::Db) -> Arc { let system = Arc::new(System::new(config, id)); let meta_rep_param = TableReplicationParams{ @@ -35,6 +36,7 @@ impl Garage { }; let version_table = Arc::new(Table::new( + VersionTable{garage: RwLock::new(None)}, system.clone(), &db, "version".to_string(), @@ -49,6 +51,9 @@ impl Garage { garage.table_rpc_handlers.insert( garage.version_table.name.clone(), garage.version_table.clone().rpc_handler()); + + let garage = Arc::new(garage); + *garage.version_table.instance.garage.write().await = Some(garage.clone()); garage } } @@ -139,7 +144,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { .expect("Unable to read or generate node ID"); println!("Node ID: {}", hex::encode(&id)); - let garage = Arc::new(Garage::new(config, id, db)); + let garage = Garage::new(config, id, db).await; let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); diff --git a/src/table.rs b/src/table.rs index 5c8e93a5..f45f48c2 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,8 +1,8 @@ -use std::marker::PhantomData; use std::time::Duration; use std::sync::Arc; use serde::{Serialize, Deserialize}; use async_trait::async_trait; +use reduce::Reduce; use crate::error::Error; use crate::proto::*; @@ -12,7 +12,7 @@ use crate::rpc_client::*; pub struct Table { - phantom: PhantomData, + pub instance: F, pub name: String, @@ -49,9 +49,11 @@ impl TableRpcHandler for TableRpcHandlerAdapter { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] pub enum TableRPC { - Update(F::K, F::V), + Ok, + Read(Vec), + Update(Vec<(F::K, F::V)>), } pub struct Partition { @@ -70,18 +72,18 @@ pub trait ValueMerge { #[async_trait] pub trait TableFormat: Send + Sync { - type K: Clone + Serialize + for<'de> Deserialize<'de> + KeyHash + Send + Sync; + type K: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + KeyHash + Send + Sync; type V: Clone + Serialize + for<'de> Deserialize<'de> + ValueMerge + Send + Sync; async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V); } impl Table { - pub fn new(system: Arc, db: &sled::Db, name: String, param: TableReplicationParams) -> Self { + pub fn new(instance: F, system: Arc, db: &sled::Db, name: String, param: TableReplicationParams) -> Self { let store = db.open_tree(&name) .expect("Unable to open DB tree"); Self{ - phantom: PhantomData::default(), + instance, name, system, store, @@ -95,22 +97,112 @@ impl Table { } pub async fn insert(&self, k: &F::K, v: &F::V) -> Result<(), Error> { - unimplemented!(); - let hash = k.hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); - let msg = rmp_serde::encode::to_vec_named(&TableRPC::::Update(k.clone(), v.clone()))?; - rpc_try_call_many(self.system.clone(), - &who[..], - &Message::TableRPC(self.name.to_string(), msg), - self.param.write_quorum, - self.param.timeout).await?; + let rpc = &TableRPC::::Update(vec![(k.clone(), v.clone())]); + + self.rpc_try_call_many(&who[..], + &rpc, + self.param.write_quorum).await?; Ok(()) } + pub async fn get(&self, k: &F::K) -> Result { + let hash = k.hash(); + let who = self.system.members.read().await + .walk_ring(&hash, self.param.replication_factor); + + let rpc = &TableRPC::::Read(vec![k.clone()]); + let resps = self.rpc_try_call_many(&who[..], + &rpc, + self.param.read_quorum) + .await?; + + let mut values = vec![]; + for resp in resps { + if let TableRPC::Update(mut pairs) = resp { + if pairs.len() == 0 { + continue; + } else if pairs.len() == 1 && pairs[0].0 == *k { + values.push(pairs.drain(..).next().unwrap().1); + continue; + } + } + return Err(Error::Message(format!("Invalid return value to read"))); + } + values.drain(..) + .reduce(|mut x, y| { x.merge(&y); x }) + .map(Ok) + .unwrap_or(Err(Error::NotFound)) + } + + async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC, quorum: usize) -> Result>, Error> { + let rpc_bytes = rmp_serde::encode::to_vec_named(rpc)?; + let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); + + let resps = rpc_try_call_many(self.system.clone(), + who, + &rpc_msg, + quorum, + self.param.timeout).await?; + + let mut resps_vals = vec![]; + for resp in resps { + if let Message::TableRPC(tbl, rep_by) = &resp { + if *tbl == self.name { + resps_vals.push(rmp_serde::decode::from_read_ref(&rep_by)?); + continue; + } + } + return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) + } + Ok(resps_vals) + } + async fn handle(&self, msg: TableRPC) -> Result, Error> { - unimplemented!() + match msg { + TableRPC::Read(keys) => { + Ok(TableRPC::Update(self.handle_read(&keys)?)) + } + TableRPC::Update(pairs) => { + self.handle_write(pairs).await?; + Ok(TableRPC::Ok) + } + _ => Err(Error::RPCError(format!("Unexpected table RPC"))) + } + } + + fn handle_read(&self, keys: &[F::K]) -> Result, Error> { + let mut results = vec![]; + for key in keys.iter() { + if let Some(bytes) = self.store.get(&key.hash())? { + let pair = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(bytes.as_ref())?; + results.push(pair); + } + } + Ok(results) + } + + async fn handle_write(&self, mut pairs: Vec<(F::K, F::V)>) -> Result<(), Error> { + for mut pair in pairs.drain(..) { + let hash = pair.0.hash(); + + let old_val = match self.store.get(&hash)? { + Some(prev_bytes) => { + let (_, old_val) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&prev_bytes)?; + pair.1.merge(&old_val); + Some(old_val) + } + None => None + }; + + let new_bytes = rmp_serde::encode::to_vec_named(&pair)?; + self.store.insert(&hash, new_bytes)?; + + self.instance.updated(&pair.0, old_val.as_ref(), &pair.1).await; + } + Ok(()) } } diff --git a/src/version_table.rs b/src/version_table.rs index d857ac12..86086421 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -1,13 +1,14 @@ use std::sync::Arc; use serde::{Serialize, Deserialize}; use async_trait::async_trait; +use tokio::sync::RwLock; use crate::data::*; use crate::table::*; -use crate::membership::System; +use crate::server::Garage; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct VersionMetaKey { pub bucket: String, pub key: String, @@ -33,7 +34,7 @@ pub enum VersionData { } pub struct VersionTable { - system: Arc, + pub garage: RwLock>>, } impl KeyHash for VersionMetaKey {