From a48e2e0cb2bdc75e14dfde199dbca0a779b1316b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 10:30:59 +0100 Subject: [PATCH 01/16] K2V: Subscription to ranges of items --- src/api/k2v/item.rs | 2 +- src/model/garage.rs | 2 +- src/model/k2v/item_table.rs | 2 +- src/model/k2v/mod.rs | 3 +- src/model/k2v/poll.rs | 50 ----------------- src/model/k2v/rpc.rs | 10 ++-- src/model/k2v/sub.rs | 107 ++++++++++++++++++++++++++++++++++++ 7 files changed, 117 insertions(+), 59 deletions(-) delete mode 100644 src/model/k2v/poll.rs create mode 100644 src/model/k2v/sub.rs diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index f85138c7..9b78bc07 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -211,7 +211,7 @@ pub async fn handle_poll_item( let item = garage .k2v .rpc - .poll( + .poll_item( bucket_id, partition_key, sort_key, diff --git a/src/model/garage.rs b/src/model/garage.rs index ac1846ce..ce479465 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -27,7 +27,7 @@ use crate::index_counter::*; use crate::key_table::*; #[cfg(feature = "k2v")] -use crate::k2v::{item_table::*, poll::*, rpc::*}; +use crate::k2v::{item_table::*, rpc::*, sub::*}; /// An entire Garage full of data pub struct Garage { diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index ce3e4129..a22df68a 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -11,7 +11,7 @@ use garage_table::*; use crate::index_counter::*; use crate::k2v::causality::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; pub const ENTRIES: &str = "entries"; pub const CONFLICTS: &str = "conflicts"; diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index f6a96151..83ad2512 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -2,5 +2,6 @@ pub mod causality; pub mod item_table; -pub mod poll; pub mod rpc; + +pub(crate) mod sub; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs deleted file mode 100644 index 93105207..00000000 --- a/src/model/k2v/poll.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::collections::HashMap; -use std::sync::Mutex; - -use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast; - -use crate::k2v::item_table::*; - -#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PollKey { - pub partition: K2VItemPartition, - pub sort_key: String, -} - -#[derive(Default)] -pub struct SubscriptionManager { - subscriptions: Mutex>>, -} - -impl SubscriptionManager { - pub fn new() -> Self { - Self::default() - } - - pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver { - let mut subs = self.subscriptions.lock().unwrap(); - if let Some(s) = subs.get(key) { - s.subscribe() - } else { - let (tx, rx) = broadcast::channel(8); - subs.insert(key.clone(), tx); - rx - } - } - - pub fn notify(&self, item: &K2VItem) { - let key = PollKey { - partition: item.partition.clone(), - sort_key: item.sort_key.clone(), - }; - let mut subs = self.subscriptions.lock().unwrap(); - if let Some(s) = subs.get(&key) { - if s.send(item.clone()).is_err() { - // no more subscribers, remove channel from here - // (we will re-create it later if we need to subscribe again) - subs.remove(&key); - } - } - } -} diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index f64a7984..8860676b 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -27,7 +27,7 @@ use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; use crate::k2v::item_table::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; /// RPC messages for K2V #[derive(Debug, Serialize, Deserialize)] @@ -181,7 +181,7 @@ impl K2VRpcHandler { Ok(()) } - pub async fn poll( + pub async fn poll_item( &self, bucket_id: Uuid, partition_key: String, @@ -288,8 +288,8 @@ impl K2VRpcHandler { }) } - async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result { - let mut chan = self.subscriptions.subscribe(key); + async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result { + let mut chan = self.subscriptions.subscribe_item(key); let mut value = self .item_table @@ -326,7 +326,7 @@ impl EndpointHandler for K2VRpcHandler { } => { let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); select! { - ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), + ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), _ = delay => Ok(K2VRpc::PollItemResponse(None)), } } diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs new file mode 100644 index 00000000..c4273dba --- /dev/null +++ b/src/model/k2v/sub.rs @@ -0,0 +1,107 @@ +use std::collections::HashMap; +use std::sync::Mutex; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +use crate::k2v::item_table::*; + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollKey { + pub partition: K2VItemPartition, + pub sort_key: String, +} + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollRange { + pub partition: K2VItemPartition, + pub prefix: Option, + pub start: Option, + pub end: Option, +} + +#[derive(Default)] +pub struct SubscriptionManager(Mutex); + +#[derive(Default)] +pub struct SubscriptionManagerInner { + item_subscriptions: HashMap>, + range_subscriptions: HashMap>, +} + +impl SubscriptionManager { + pub fn new() -> Self { + Self::default() + } + + pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver { + let mut inner = self.0.lock().unwrap(); + if let Some(s) = inner.item_subscriptions.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + inner.item_subscriptions.insert(key.clone(), tx); + rx + } + } + + pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver { + let mut inner = self.0.lock().unwrap(); + if let Some(s) = inner.range_subscriptions.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + inner.range_subscriptions.insert(key.clone(), tx); + rx + } + } + + pub fn notify(&self, item: &K2VItem) { + let mut inner = self.0.lock().unwrap(); + + // 1. Notify single item subscribers, + // removing subscriptions with no more listeners if any + let key = PollKey { + partition: item.partition.clone(), + sort_key: item.sort_key.clone(), + }; + if let Some(s) = inner.item_subscriptions.get(&key) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + inner.item_subscriptions.remove(&key); + } + } + + // 2. Notify range subscribers, + // removing subscriptions with no more listeners if any + inner.range_subscriptions.retain(|sub, chan| { + if sub.matches(&item) { + chan.send(item.clone()).is_ok() + } else { + chan.receiver_count() != 0 + } + }); + } +} + +impl PollRange { + fn matches(&self, item: &K2VItem) -> bool { + item.partition == self.partition + && self + .prefix + .as_ref() + .map(|x| item.sort_key.starts_with(x)) + .unwrap_or(true) + && self + .start + .as_ref() + .map(|x| item.sort_key >= *x) + .unwrap_or(true) + && self + .end + .as_ref() + .map(|x| item.sort_key < *x) + .unwrap_or(true) + } +} From 9f5419f465de3ddbb9afc91464d0eb00636049b9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 11:01:49 +0100 Subject: [PATCH 02/16] Make K2V item timestamps globally increasing on each node --- src/model/garage.rs | 6 ++++- src/model/k2v/item_table.rs | 7 ++++-- src/model/k2v/rpc.rs | 45 +++++++++++++++++++++++++++++++++---- src/table/data.rs | 24 +++++++++++--------- 4 files changed, 65 insertions(+), 17 deletions(-) diff --git a/src/model/garage.rs b/src/model/garage.rs index ce479465..74add688 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -305,8 +305,10 @@ impl GarageK2V { fn new(system: Arc, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { info!("Initialize K2V counter table..."); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); + info!("Initialize K2V subscription manager..."); let subscriptions = Arc::new(SubscriptionManager::new()); + info!("Initialize K2V item table..."); let item_table = Table::new( K2VItemTable { @@ -317,7 +319,9 @@ impl GarageK2V { system.clone(), db, ); - let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions); + + info!("Initialize K2V RPC handler..."); + let rpc = K2VRpcHandler::new(system, db, item_table.clone(), subscriptions); Self { item_table, diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index a22df68a..1ba9bb46 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -73,7 +73,8 @@ impl K2VItem { this_node: Uuid, context: &Option, new_value: DvvsValue, - ) { + node_ts: u64, + ) -> u64 { if let Some(context) = context { for (node, t_discard) in context.vector_clock.iter() { if let Some(e) = self.items.get_mut(node) { @@ -98,7 +99,9 @@ impl K2VItem { values: vec![], }); let t_prev = e.max_time(); - e.values.push((t_prev + 1, new_value)); + let t_new = std::cmp::max(t_prev + 1, node_ts + 1); + e.values.push((t_new, new_value)); + t_new } /// Extract the causality context of a K2V Item diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 8860676b..e5497215 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -6,7 +6,8 @@ //! mean the vector clock gets much larger than needed). use std::collections::HashMap; -use std::sync::Arc; +use std::convert::TryInto; +use std::sync::{Arc, Mutex}; use std::time::Duration; use async_trait::async_trait; @@ -15,9 +16,12 @@ use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::select; +use garage_db as db; + use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::now_msec; use garage_rpc::system::System; use garage_rpc::*; @@ -29,6 +33,8 @@ use crate::k2v::causality::*; use crate::k2v::item_table::*; use crate::k2v::sub::*; +const TIMESTAMP_KEY: &'static [u8] = b"timestamp"; + /// RPC messages for K2V #[derive(Debug, Serialize, Deserialize)] enum K2VRpc { @@ -59,6 +65,7 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc, item_table: Arc>, + local_timestamp_tree: Mutex, endpoint: Arc>, subscriptions: Arc, } @@ -66,14 +73,19 @@ pub struct K2VRpcHandler { impl K2VRpcHandler { pub fn new( system: Arc, + db: &db::Db, item_table: Arc>, subscriptions: Arc, ) -> Arc { + let local_timestamp_tree = db + .open_tree("k2v_local_timestamp") + .expect("Unable to open DB tree for k2v local timestamp"); let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); let rpc_handler = Arc::new(Self { system, item_table, + local_timestamp_tree: Mutex::new(local_timestamp_tree), endpoint, subscriptions, }); @@ -273,9 +285,22 @@ impl K2VRpcHandler { } fn local_insert(&self, item: &InsertedItem) -> Result, Error> { + // Using a mutex on the local_timestamp_tree is not strictly necessary, + // but it helps to not try to do several inserts at the same time, + // which would create transaction conflicts and force many useless retries. + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + + let now = now_msec(); + self.item_table .data - .update_entry_with(&item.partition, &item.sort_key, |ent| { + .update_entry_with(&item.partition, &item.sort_key, |tx, ent| { + let old_local_timestamp = tx + .get(&local_timestamp_tree, TIMESTAMP_KEY)? + .and_then(|x| x.try_into().ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); + let mut ent = ent.unwrap_or_else(|| { K2VItem::new( item.partition.bucket_id, @@ -283,8 +308,20 @@ impl K2VRpcHandler { item.sort_key.clone(), ) }); - ent.update(self.system.id, &item.causal_context, item.value.clone()); - ent + let new_local_timestamp = ent.update( + self.system.id, + &item.causal_context, + item.value.clone(), + std::cmp::max(old_local_timestamp, now), + ); + + tx.insert( + &local_timestamp_tree, + TIMESTAMP_KEY, + u64::to_be_bytes(new_local_timestamp), + )?; + + Ok(ent) }) } diff --git a/src/table/data.rs b/src/table/data.rs index 5c792f1f..26cc3a5a 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -181,13 +181,17 @@ impl TableData { pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { let update = self.decode_entry(update_bytes)?; - self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent { - Some(mut ent) => { - ent.merge(&update); - ent - } - None => update.clone(), - })?; + self.update_entry_with( + update.partition_key(), + update.sort_key(), + |_tx, ent| match ent { + Some(mut ent) => { + ent.merge(&update); + Ok(ent) + } + None => Ok(update.clone()), + }, + )?; Ok(()) } @@ -195,7 +199,7 @@ impl TableData { &self, partition_key: &F::P, sort_key: &F::S, - f: impl Fn(Option) -> F::E, + update_fn: impl Fn(&mut db::Transaction, Option) -> db::TxOpResult, ) -> Result, Error> { let tree_key = self.tree_key(partition_key, sort_key); @@ -203,10 +207,10 @@ impl TableData { let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { Some(old_bytes) => { let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; - let new_entry = f(Some(old_entry.clone())); + let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?; (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, f(None)), + None => (None, None, update_fn(&mut tx, None)?), }; // Changed can be true in two scenarios From 789540ca379efd710f1e2699fd458b4821f36a5b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 11:59:57 +0100 Subject: [PATCH 03/16] Type definition for range seen marker --- src/model/k2v/causality.rs | 39 +++++++++++++---- src/model/k2v/item_table.rs | 2 +- src/model/k2v/mod.rs | 1 + src/model/k2v/seen.rs | 85 +++++++++++++++++++++++++++++++++++++ 4 files changed, 117 insertions(+), 10 deletions(-) create mode 100644 src/model/k2v/seen.rs diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 9a692870..665b02b2 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -1,3 +1,13 @@ +//! Implements a CausalContext, which is a set of timestamps for each +//! node -- a vector clock --, indicating that the versions with +//! timestamps <= these numbers have been seen and can be +//! overwritten by a subsequent write. +//! +//! The textual representation of a CausalContext, which we call a +//! "causality token", is used in the API and must be sent along with +//! each write or delete operation to indicate the previously seen +//! versions that we want to overwrite or delete. + use std::collections::BTreeMap; use std::convert::TryInto; @@ -9,23 +19,36 @@ use garage_util::data::*; /// of full Garage node IDs which are 256-bit UUIDs. pub type K2VNodeId = u64; +pub type VectorClock = BTreeMap; + pub fn make_node_id(node_id: Uuid) -> K2VNodeId { let mut tmp = [0u8; 8]; tmp.copy_from_slice(&node_id.as_slice()[..8]); u64::from_be_bytes(tmp) } -#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)] +pub fn vclock_gt(a: &VectorClock, b: &VectorClock) -> bool { + a.iter().any(|(n, ts)| ts > b.get(n).unwrap_or(&0)) +} + +pub fn vclock_max(a: &VectorClock, b: &VectorClock) -> VectorClock { + let mut ret = a.clone(); + for (n, ts) in b.iter() { + let ent = ret.entry(*n).or_insert(0); + *ent = std::cmp::max(*ts, *ent); + } + ret +} + +#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Default)] pub struct CausalContext { - pub vector_clock: BTreeMap, + pub vector_clock: VectorClock, } impl CausalContext { /// Empty causality context - pub fn new_empty() -> Self { - Self { - vector_clock: BTreeMap::new(), - } + pub fn new() -> Self { + Self::default() } /// Make binary representation and encode in base64 pub fn serialize(&self) -> String { @@ -72,9 +95,7 @@ impl CausalContext { } /// Check if this causal context contains newer items than another one pub fn is_newer_than(&self, other: &Self) -> bool { - self.vector_clock - .iter() - .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0)) + vclock_gt(&self.vector_clock, &other.vector_clock) } } diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 1ba9bb46..bc2b1aef 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -106,7 +106,7 @@ impl K2VItem { /// Extract the causality context of a K2V Item pub fn causal_context(&self) -> CausalContext { - let mut cc = CausalContext::new_empty(); + let mut cc = CausalContext::new(); for (node, ent) in self.items.iter() { cc.vector_clock.insert(*node, ent.max_time()); } diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index 83ad2512..c488b4c6 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -1,4 +1,5 @@ pub mod causality; +pub mod seen; pub mod item_table; diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs new file mode 100644 index 00000000..b8f4ff27 --- /dev/null +++ b/src/model/k2v/seen.rs @@ -0,0 +1,85 @@ +//! Implements a RangeSeenMarker, a data type used in the PollRange API +//! to indicate which items in the range have already been seen +//! and which have not been seen yet. +//! +//! It consists of a vector clock that indicates that for each node, +//! all items produced by that node with timestamps <= the value in the +//! vector clock has been seen, as well as a set of causal contexts for +//! individual items. + +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; + +use garage_util::data::Uuid; +use garage_util::encode::{nonversioned_decode, nonversioned_encode}; +use garage_util::error::{Error, OkOrMessage}; + +use crate::k2v::causality::*; +use crate::k2v::item_table::*; + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct RangeSeenMarker { + vector_clock: VectorClock, + items: BTreeMap, +} + +impl RangeSeenMarker { + pub fn new() -> Self { + Self::default() + } + + pub fn mark_seen_node_items<'a, I: IntoIterator>( + &mut self, + node: Uuid, + items: I, + ) { + let node = make_node_id(node); + for item in items.into_iter() { + let cc = item.causal_context(); + + if let Some(ts) = cc.vector_clock.get(&node) { + let ent = self.vector_clock.entry(node).or_insert(0); + *ent = std::cmp::max(*ent, *ts); + } + + if vclock_gt(&cc.vector_clock, &self.vector_clock) { + match self.items.get_mut(&item.sort_key) { + None => { + self.items.insert(item.sort_key.clone(), cc.vector_clock); + } + Some(ent) => *ent = vclock_max(&ent, &cc.vector_clock), + } + } + } + } + + pub fn canonicalize(&mut self) { + let self_vc = &self.vector_clock; + self.items.retain(|_sk, vc| vclock_gt(&vc, self_vc)) + } + + pub fn encode(&mut self) -> Result { + self.canonicalize(); + + let bytes = nonversioned_encode(&self)?; + let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?; + Ok(base64::encode(&bytes)) + } + + pub fn decode(s: &str) -> Result { + let bytes = base64::decode(&s).ok_or_message("invalid base64")?; + let bytes = zstd::stream::decode_all(&mut &bytes[..])?; + Ok(nonversioned_decode(&bytes)?) + } + + pub fn is_new_item(&self, item: &K2VItem) -> bool { + let cc = item.causal_context(); + vclock_gt(&cc.vector_clock, &self.vector_clock) + && self + .items + .get(&item.sort_key) + .map(|vc| vclock_gt(&cc.vector_clock, &vc)) + .unwrap_or(true) + } +} From 43fd6c1526339fb444e5e25021f42d0aee252b27 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 12:54:24 +0100 Subject: [PATCH 04/16] PollRange RPC --- src/model/k2v/rpc.rs | 133 ++++++++++++++++++++++++++++++++++++++++-- src/model/k2v/seen.rs | 13 +++++ src/model/k2v/sub.rs | 31 +++++----- 3 files changed, 159 insertions(+), 18 deletions(-) diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index e5497215..8b070885 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -5,7 +5,7 @@ //! node does not process the entry directly, as this would //! mean the vector clock gets much larger than needed). -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -31,6 +31,7 @@ use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; use crate::k2v::item_table::*; +use crate::k2v::seen::*; use crate::k2v::sub::*; const TIMESTAMP_KEY: &'static [u8] = b"timestamp"; @@ -46,7 +47,13 @@ enum K2VRpc { causal_context: CausalContext, timeout_msec: u64, }, + PollRange { + range: PollRange, + seen_str: Option, + timeout_msec: u64, + }, PollItemResponse(Option), + PollRangeResponse(Uuid, Vec), } #[derive(Debug, Serialize, Deserialize)] @@ -242,9 +249,7 @@ impl K2VRpcHandler { resp = Some(x); } } - K2VRpc::PollItemResponse(None) => { - return Ok(None); - } + K2VRpc::PollItemResponse(None) => (), v => return Err(Error::unexpected_rpc_message(v)), } } @@ -252,6 +257,69 @@ impl K2VRpcHandler { Ok(resp) } + pub async fn poll_range( + &self, + range: PollRange, + seen_str: Option, + timeout_msec: u64, + ) -> Result, String)>, Error> { + let mut seen = seen_str + .as_deref() + .map(RangeSeenMarker::decode) + .transpose()? + .unwrap_or_default(); + seen.restrict(&range); + + let nodes = self + .item_table + .data + .replication + .write_nodes(&range.partition.hash()); + + let rpc = self.system.rpc.try_call_many( + &self.endpoint, + &nodes[..], + K2VRpc::PollRange { + range, + seen_str, + timeout_msec, + }, + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.item_table.data.replication.read_quorum()) + .without_timeout(), + ); + let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let resps = select! { + r = rpc => r?, + _ = tokio::time::sleep(timeout_duration) => return Ok(None), + }; + + let mut new_items = BTreeMap::::new(); + for v in resps { + if let K2VRpc::PollRangeResponse(node, items) = v { + seen.mark_seen_node_items(node, items.iter()); + for item in items.into_iter() { + match new_items.get_mut(&item.sort_key) { + Some(ent) => { + ent.merge(&item); + } + None => { + new_items.insert(item.sort_key.clone(), item); + } + } + } + } else { + return Err(Error::unexpected_rpc_message(v)); + } + } + + if new_items.is_empty() { + Ok(None) + } else { + Ok(Some((new_items, seen.encode()?))) + } + } + // ---- internal handlers ---- async fn handle_insert(&self, item: &InsertedItem) -> Result { @@ -348,6 +416,52 @@ impl K2VRpcHandler { Ok(value) } + + async fn handle_poll_range( + &self, + range: &PollRange, + seen_str: &Option, + ) -> Result, Error> { + let seen = seen_str + .as_deref() + .map(RangeSeenMarker::decode) + .transpose()? + .unwrap_or_default(); + let mut new_items = vec![]; + + let mut chan = self.subscriptions.subscribe_partition(&range.partition); + + // Read current state of the specified range to check new items + let partition_hash = range.partition.hash(); + let first_key = match &range.start { + None => partition_hash.to_vec(), + Some(sk) => self.item_table.data.tree_key(&range.partition, sk), + }; + for item in self.item_table.data.store.range(first_key..)? { + let (key, value) = item?; + if &key[..32] != partition_hash.as_slice() { + break; + } + let item = self.item_table.data.decode_entry(&value)?; + if !range.matches(&item) { + break; + } + if seen.is_new_item(&item) { + new_items.push(item); + } + } + + // If we found no new items, wait for a matching item to arrive + // on the channel + while new_items.is_empty() { + let item = chan.recv().await?; + if range.matches(&item) && seen.is_new_item(&item) { + new_items.push(item); + } + } + + Ok(new_items) + } } #[async_trait] @@ -367,6 +481,17 @@ impl EndpointHandler for K2VRpcHandler { _ = delay => Ok(K2VRpc::PollItemResponse(None)), } } + K2VRpc::PollRange { + range, + seen_str, + timeout_msec, + } => { + let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); + select! { + ret = self.handle_poll_range(range, seen_str) => ret.map(|items| K2VRpc::PollRangeResponse(self.system.id, items)), + _ = delay => Ok(K2VRpc::PollRangeResponse(self.system.id, vec![])), + } + } m => Err(Error::unexpected_rpc_message(m)), } } diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs index b8f4ff27..d2cd54c7 100644 --- a/src/model/k2v/seen.rs +++ b/src/model/k2v/seen.rs @@ -17,6 +17,7 @@ use garage_util::error::{Error, OkOrMessage}; use crate::k2v::causality::*; use crate::k2v::item_table::*; +use crate::k2v::sub::*; #[derive(Debug, Serialize, Deserialize, Default)] pub struct RangeSeenMarker { @@ -29,6 +30,18 @@ impl RangeSeenMarker { Self::default() } + pub fn restrict(&mut self, range: &PollRange) { + if let Some(start) = &range.start { + self.items = self.items.split_off(start); + } + if let Some(end) = &range.end { + self.items.split_off(end); + } + if let Some(pfx) = &range.prefix { + self.items.retain(|k, _v| k.starts_with(pfx)); + } + } + pub fn mark_seen_node_items<'a, I: IntoIterator>( &mut self, node: Uuid, diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs index c4273dba..b1daa271 100644 --- a/src/model/k2v/sub.rs +++ b/src/model/k2v/sub.rs @@ -26,7 +26,7 @@ pub struct SubscriptionManager(Mutex); #[derive(Default)] pub struct SubscriptionManagerInner { item_subscriptions: HashMap>, - range_subscriptions: HashMap>, + part_subscriptions: HashMap>, } impl SubscriptionManager { @@ -34,7 +34,7 @@ impl SubscriptionManager { Self::default() } - pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver { + pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver { let mut inner = self.0.lock().unwrap(); if let Some(s) = inner.item_subscriptions.get(key) { s.subscribe() @@ -45,18 +45,21 @@ impl SubscriptionManager { } } - pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver { + pub(crate) fn subscribe_partition( + &self, + part: &K2VItemPartition, + ) -> broadcast::Receiver { let mut inner = self.0.lock().unwrap(); - if let Some(s) = inner.range_subscriptions.get(key) { + if let Some(s) = inner.part_subscriptions.get(part) { s.subscribe() } else { let (tx, rx) = broadcast::channel(8); - inner.range_subscriptions.insert(key.clone(), tx); + inner.part_subscriptions.insert(part.clone(), tx); rx } } - pub fn notify(&self, item: &K2VItem) { + pub(crate) fn notify(&self, item: &K2VItem) { let mut inner = self.0.lock().unwrap(); // 1. Notify single item subscribers, @@ -73,20 +76,20 @@ impl SubscriptionManager { } } - // 2. Notify range subscribers, + // 2. Notify partition subscribers, // removing subscriptions with no more listeners if any - inner.range_subscriptions.retain(|sub, chan| { - if sub.matches(&item) { - chan.send(item.clone()).is_ok() - } else { - chan.receiver_count() != 0 + if let Some(s) = inner.part_subscriptions.get(&item.partition) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + inner.part_subscriptions.remove(&item.partition); } - }); + } } } impl PollRange { - fn matches(&self, item: &K2VItem) -> bool { + pub fn matches(&self, item: &K2VItem) -> bool { item.partition == self.partition && self .prefix From 57eabe787948ad6c23eae7761d02545c675bf7ff Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 14:56:57 +0100 Subject: [PATCH 05/16] Add proposal spec for PollRange API endpoint --- doc/drafts/k2v-spec.md | 57 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/doc/drafts/k2v-spec.md b/doc/drafts/k2v-spec.md index 9d41b2c0..a335aee3 100644 --- a/doc/drafts/k2v-spec.md +++ b/doc/drafts/k2v-spec.md @@ -706,6 +706,63 @@ HTTP/1.1 200 OK ] ``` +**PollRange: `POST //?poll_range`**, or alternatively
+**PollRange: `SEARCH //?poll_range`** + +Polls a range of items for changes. + +The query body is a JSON object consisting of the following fields: + +| name | default value | meaning | +|-----------------|---------------|----------------------------------------------------------------------------------------| +| `prefix` | `null` | Restrict items to poll to those whose sort keys start with this prefix | +| `start` | `null` | The sort key of the first item to poll | +| `end` | `null` | The sort key of the last item to poll (excluded) | +| `timeout` | 300 | The timeout before 304 NOT MODIFIED is returned if no value in the range is updated | +| `seenMarker` | `null` | An opaque string returned by a previous PollRange call, that represents items already seen | + +The timeout can be set to any number of seconds, with a maximum of 600 seconds (10 minutes). + +The response is either: + +- A HTTP 304 NOT MODIFIED response with an empty body, if the timeout expired and no changes occurred + +- A HTTP 200 response, indicating that some changes have occurred since the last PollRange call, in which case a JSON object is returned in the body with the following fields: + +| name | meaning | +|-----------------|----------------------------------------------------------------------------------------| +| `seenMarker` | An opaque string that represents items already seen for future PollRange calls | +| `items` | The list of items that have changed since last PollRange call, in the same format as ReadBatch | + + +Example query: + +```json +SEARCH /my_bucket?poll_range HTTP/1.1 + +{ + "prefix": "0391.", + "start": "0391.000001973107", + "seenMarker": "opaquestring123", +} +``` + + +Example response: + +```json +HTTP/1.1 200 OK +Content-Type: application/json + +{ + "seenMarker": "opaquestring456", + "items": [ + { sk: "0391.000001973221", ct: "opaquetoken123", v: ["b64cryptoblob123", "b64cryptoblob'123"] }, + { sk: "0391.000001974191", ct: "opaquetoken456", v: ["b64cryptoblob456", "b64cryptoblob'456"] }, + ] +} +``` + ## Internals: causality tokens From b83517d521b1bea7585ce45a803fad373f28225c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 15:22:25 +0100 Subject: [PATCH 06/16] Implement PollRange API endpoint --- src/api/k2v/api_server.rs | 3 ++ src/api/k2v/batch.rs | 79 ++++++++++++++++++++++++++++++++++----- src/api/k2v/index.rs | 9 ++--- src/api/k2v/item.rs | 4 +- src/api/k2v/router.rs | 8 +++- src/model/k2v/mod.rs | 2 +- 6 files changed, 87 insertions(+), 18 deletions(-) diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 084867b5..bb85b2e7 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -164,6 +164,9 @@ impl ApiHandler for K2VApiServer { Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await, + Endpoint::PollRange { partition_key } => { + handle_poll_range(garage, bucket_id, &partition_key, req).await + } Endpoint::Options => unreachable!(), }; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 78035362..be3fba07 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -4,7 +4,6 @@ use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::data::*; -use garage_util::error::Error as GarageError; use garage_table::{EnumerationOrder, TableSchema}; @@ -65,10 +64,7 @@ pub async fn handle_read_batch( resps.push(resp?); } - let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resps)?) } async fn handle_read_batch_query( @@ -160,10 +156,7 @@ pub async fn handle_delete_batch( resps.push(resp?); } - let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resps)?) } async fn handle_delete_batch_query( @@ -257,6 +250,53 @@ async fn handle_delete_batch_query( }) } +pub(crate) async fn handle_poll_range( + garage: Arc, + bucket_id: Uuid, + partition_key: &str, + req: Request, +) -> Result, Error> { + use garage_model::k2v::sub::PollRange; + + let query = parse_json_body::(req).await?; + + let timeout_msec = query.timeout.unwrap_or(300).clamp(10, 600) * 1000; + + let resp = garage + .k2v + .rpc + .poll_range( + PollRange { + partition: K2VItemPartition { + bucket_id, + partition_key: partition_key.to_string(), + }, + start: query.start, + end: query.end, + prefix: query.prefix, + }, + query.seen_marker, + timeout_msec, + ) + .await?; + + if let Some((items, seen_marker)) = resp { + let resp = PollRangeResponse { + items: items + .into_iter() + .map(|(_k, i)| ReadBatchResponseItem::from(i)) + .collect::>(), + seen_marker, + }; + + Ok(json_ok_response(&resp)?) + } else { + Ok(Response::builder() + .status(StatusCode::NOT_MODIFIED) + .body(Body::empty())?) + } +} + #[derive(Deserialize)] struct InsertBatchItem { pk: String, @@ -361,3 +401,24 @@ struct DeleteBatchResponse { #[serde(rename = "deletedItems")] deleted_items: usize, } + +#[derive(Deserialize)] +struct PollRangeQuery { + #[serde(default)] + prefix: Option, + #[serde(default)] + start: Option, + #[serde(default)] + end: Option, + #[serde(default)] + timeout: Option, + #[serde(default, rename = "seenMarker")] + seen_marker: Option, +} + +#[derive(Serialize)] +struct PollRangeResponse { + items: Vec, + #[serde(rename = "seenMarker")] + seen_marker: String, +} diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 210950bf..6c1d4a91 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -1,10 +1,9 @@ use std::sync::Arc; -use hyper::{Body, Response, StatusCode}; +use hyper::{Body, Response}; use serde::Serialize; use garage_util::data::*; -use garage_util::error::Error as GarageError; use garage_rpc::ring::Ring; use garage_table::util::*; @@ -12,6 +11,7 @@ use garage_table::util::*; use garage_model::garage::Garage; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; +use crate::helpers::*; use crate::k2v::error::*; use crate::k2v::range::read_range; @@ -68,10 +68,7 @@ pub async fn handle_read_index( next_start, }; - let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resp)?) } #[derive(Serialize)] diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 9b78bc07..ebf34723 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -208,6 +208,8 @@ pub async fn handle_poll_item( let causal_context = CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?; + let timeout_msec = timeout_secs.unwrap_or(300).clamp(10, 600) * 1000; + let item = garage .k2v .rpc @@ -216,7 +218,7 @@ pub async fn handle_poll_item( partition_key, sort_key, causal_context, - timeout_secs.unwrap_or(300) * 1000, + timeout_msec, ) .await?; diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs index e7a3dd69..1cc58be5 100644 --- a/src/api/k2v/router.rs +++ b/src/api/k2v/router.rs @@ -32,6 +32,9 @@ pub enum Endpoint { causality_token: String, timeout: Option, }, + PollRange { + partition_key: String, + }, ReadBatch { }, ReadIndex { @@ -113,6 +116,7 @@ impl Endpoint { @gen_parser (query.keyword.take().unwrap_or_default(), partition_key, query, None), key: [ + POLL_RANGE => PollRange, ], no_key: [ EMPTY => ReadBatch, @@ -142,6 +146,7 @@ impl Endpoint { @gen_parser (query.keyword.take().unwrap_or_default(), partition_key, query, None), key: [ + POLL_RANGE => PollRange, ], no_key: [ EMPTY => InsertBatch, @@ -234,7 +239,8 @@ impl Endpoint { generateQueryParameters! { keywords: [ "delete" => DELETE, - "search" => SEARCH + "search" => SEARCH, + "poll_range" => POLL_RANGE ], fields: [ "prefix" => prefix, diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index c488b4c6..acc1fcdc 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -5,4 +5,4 @@ pub mod item_table; pub mod rpc; -pub(crate) mod sub; +pub mod sub; From de1111076b28add706bded9fb738a1ef89ee0882 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 10:04:41 +0100 Subject: [PATCH 07/16] PollRange integration test --- src/garage/tests/k2v/poll.rs | 170 ++++++++++++++++++++++++++++++++++- 1 file changed, 168 insertions(+), 2 deletions(-) diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs index e56705ae..f54cc5d4 100644 --- a/src/garage/tests/k2v/poll.rs +++ b/src/garage/tests/k2v/poll.rs @@ -1,12 +1,16 @@ use hyper::{Method, StatusCode}; use std::time::Duration; +use assert_json_diff::assert_json_eq; +use serde_json::json; + +use super::json_body; use crate::common; #[tokio::test] -async fn test_poll() { +async fn test_poll_item() { let ctx = common::context(); - let bucket = ctx.create_bucket("test-k2v-poll"); + let bucket = ctx.create_bucket("test-k2v-poll-item"); // Write initial value let res = ctx @@ -96,3 +100,165 @@ async fn test_poll() { .to_vec(); assert_eq!(poll_res_body, b"New value"); } + +#[tokio::test] +async fn test_poll_range() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-poll-range"); + + // Write initial value + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .body(b"Initial value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Retrieve initial value to get its causality token + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res2.status(), StatusCode::OK); + let ct = res2 + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + // Initial poll range, retrieve single item and first seen_marker + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::) + .body(b"{}".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res2.status(), StatusCode::OK); + let json_res = json_body(res2).await; + let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string(); + assert_json_eq!( + json_res, + json!( + { + "items": [ + {"sk": "test1", "ct": ct, "v": [base64::encode(b"Initial value")]}, + ], + "seenMarker": seen_marker, + } + ) + ); + + // Second poll range, which will complete later + let poll = { + let bucket = bucket.clone(); + tokio::spawn(async move { + let ctx = common::context(); + ctx.k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::) + .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes()) + .send() + .await + }) + }; + + // Write new value that supersedes initial one + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("x-garage-causality-token", ct) + .body(b"New value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Check poll finishes with correct value + let poll_res = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"), + res = poll => res.unwrap().unwrap(), + }; + + assert_eq!(poll_res.status(), StatusCode::OK); + let json_res = json_body(poll_res).await; + let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string(); + assert_eq!(json_res["items"].as_array().unwrap().len(), 1); + assert_json_eq!(&json_res["items"][0]["sk"], json!("test1")); + assert_json_eq!( + &json_res["items"][0]["v"], + json!([base64::encode(b"New value")]) + ); + + // Now we will add a value on a different key + // Start a new poll operation + let poll = { + let bucket = bucket.clone(); + tokio::spawn(async move { + let ctx = common::context(); + ctx.k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::) + .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes()) + .send() + .await + }) + }; + + // Write value on different key + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test2")) + .body(b"Other value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Check poll finishes with correct value + let poll_res = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"), + res = poll => res.unwrap().unwrap(), + }; + + assert_eq!(poll_res.status(), StatusCode::OK); + let json_res = json_body(poll_res).await; + assert_eq!(json_res["items"].as_array().unwrap().len(), 1); + assert_json_eq!(&json_res["items"][0]["sk"], json!("test2")); + assert_json_eq!( + &json_res["items"][0]["v"], + json!([base64::encode(b"Other value")]) + ); +} From 32aab06929f09ce69bc49c4737b4801dd31a3b6f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 11:12:16 +0100 Subject: [PATCH 08/16] k2v-client libary poll_range and CLI poll-range --- Cargo.lock | 3 +- Cargo.nix | 9 +- src/k2v-client/Cargo.toml | 3 +- src/k2v-client/bin/k2v-cli.rs | 203 +++++++++++++++++++++++++--------- src/k2v-client/lib.rs | 89 ++++++++++++++- 5 files changed, 248 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb8b4f5c..83bc90e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1752,12 +1752,13 @@ dependencies = [ [[package]] name = "k2v-client" -version = "0.0.1" +version = "0.1.1" dependencies = [ "base64", "clap 3.1.18", "garage_util", "http", + "hyper-rustls 0.23.0", "log", "rusoto_core", "rusoto_credential", diff --git a/Cargo.nix b/Cargo.nix index 79601cdd..1bf2bb06 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -32,7 +32,7 @@ args@{ ignoreLockHash, }: let - nixifiedLockHash = "b6aeefc112eb232904b24398f4e5da776c8ee2c13d427a26dbdf1732205d4fc9"; + nixifiedLockHash = "8f036894ab81a528f76e97e904ff3e496a9b1500569312489d444f615fb781bf"; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); lockHashIgnored = if ignoreLockHash @@ -65,7 +65,7 @@ in garage_api = rustPackages.unknown.garage_api."0.8.1"; garage_web = rustPackages.unknown.garage_web."0.8.1"; garage = rustPackages.unknown.garage."0.8.1"; - k2v-client = rustPackages.unknown.k2v-client."0.0.1"; + k2v-client = rustPackages.unknown.k2v-client."0.1.1"; }; "registry+https://github.com/rust-lang/crates.io-index".addr2line."0.17.0" = overridableMkRustCrate (profileName: rec { name = "addr2line"; @@ -2434,9 +2434,9 @@ in }; }); - "unknown".k2v-client."0.0.1" = overridableMkRustCrate (profileName: rec { + "unknown".k2v-client."0.1.1" = overridableMkRustCrate (profileName: rec { name = "k2v-client"; - version = "0.0.1"; + version = "0.1.1"; registry = "unknown"; src = fetchCrateLocal (workspaceSrc + "/src/k2v-client"); features = builtins.concatLists [ @@ -2449,6 +2449,7 @@ in ${ if rootFeatures' ? "k2v-client/clap" || rootFeatures' ? "k2v-client/cli" then "clap" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".clap."3.1.18" { inherit profileName; }).out; ${ if rootFeatures' ? "k2v-client/cli" || rootFeatures' ? "k2v-client/garage_util" then "garage_util" else null } = (rustPackages."unknown".garage_util."0.8.1" { inherit profileName; }).out; http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.8" { inherit profileName; }).out; + hyper_rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.23.0" { inherit profileName; }).out; log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }).out; rusoto_core = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_core."0.48.0" { inherit profileName; }).out; rusoto_credential = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_credential."0.48.0" { inherit profileName; }).out; diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml index f57ce849..d95a1b3f 100644 --- a/src/k2v-client/Cargo.toml +++ b/src/k2v-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "k2v-client" -version = "0.0.1" +version = "0.1.1" authors = ["Trinity Pointard ", "Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -15,6 +15,7 @@ log = "0.4" rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] } rusoto_credential = "0.48.0" rusoto_signature = "0.48.0" +hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12", "logging" ] } serde = "1.0.137" serde_json = "1.0.81" thiserror = "1.0.31" diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs index 925ebeb8..cdd63cce 100644 --- a/src/k2v-client/bin/k2v-cli.rs +++ b/src/k2v-client/bin/k2v-cli.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; +use std::process::exit; use std::time::Duration; use k2v_client::*; @@ -57,22 +59,39 @@ enum Command { #[clap(flatten)] output_kind: ReadOutputKind, }, - /// Watch changes on a single value - Poll { - /// Partition key to delete from + /// Watch changes on a single value + PollItem { + /// Partition key of item to watch partition_key: String, - /// Sort key to delete from + /// Sort key of item to watch sort_key: String, /// Causality information #[clap(short, long)] causality: String, /// Timeout, in seconds - #[clap(short, long)] + #[clap(short = 'T', long)] timeout: Option, /// Output formating #[clap(flatten)] output_kind: ReadOutputKind, }, + /// Watch changes on a range of values + PollRange { + /// Partition key to poll from + partition_key: String, + /// Output only sort keys matching this filter + #[clap(flatten)] + filter: Filter, + /// Marker of data that had previously been seen by a PollRange + #[clap(short = 'S', long)] + seen_marker: Option, + /// Timeout, in seconds + #[clap(short = 'T', long)] + timeout: Option, + /// Output formating + #[clap(flatten)] + output_kind: BatchOutputKind, + }, /// Delete a single value Delete { /// Partition key to delete from @@ -176,7 +195,6 @@ struct ReadOutputKind { impl ReadOutputKind { fn display_output(&self, val: CausalValue) -> ! { use std::io::Write; - use std::process::exit; if self.json { let stdout = std::io::stdout(); @@ -254,6 +272,83 @@ struct BatchOutputKind { json: bool, } +impl BatchOutputKind { + fn display_human_output(&self, values: BTreeMap) -> ! { + for (key, values) in values { + println!("key: {}", key); + let causality: String = values.causality.into(); + println!("causality: {}", causality); + for value in values.value { + match value { + K2vValue::Value(v) => { + if let Ok(string) = std::str::from_utf8(&v) { + println!(" value(utf-8): {}", string); + } else { + println!(" value(base64): {}", base64::encode(&v)); + } + } + K2vValue::Tombstone => { + println!(" tombstone"); + } + } + } + } + exit(0); + } + + fn values_json(&self, values: BTreeMap) -> Vec { + values + .into_iter() + .map(|(k, v)| { + let mut value = serde_json::to_value(v).unwrap(); + value + .as_object_mut() + .unwrap() + .insert("sort_key".to_owned(), k.into()); + value + }) + .collect::>() + } + + fn display_poll_range_output( + &self, + seen_marker: String, + values: BTreeMap, + ) -> ! { + if self.json { + let json = serde_json::json!({ + "values": self.values_json(values), + "seen_marker": seen_marker, + }); + + let stdout = std::io::stdout(); + serde_json::to_writer_pretty(stdout, &json).unwrap(); + exit(0) + } else { + println!("seen marker: {}", seen_marker); + self.display_human_output(values) + } + } + + fn display_read_range_output(&self, res: PaginatedRange) -> ! { + if self.json { + let json = serde_json::json!({ + "next_key": res.next_start, + "values": self.values_json(res.items), + }); + + let stdout = std::io::stdout(); + serde_json::to_writer_pretty(stdout, &json).unwrap(); + exit(0) + } else { + if let Some(next) = res.next_start { + println!("next key: {}", next); + } + self.display_human_output(res.items) + } + } +} + /// Filter for batch operations #[derive(Parser, Debug)] #[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))] @@ -342,7 +437,7 @@ async fn main() -> Result<(), Error> { let res = client.read_item(&partition_key, &sort_key).await?; output_kind.display_output(res); } - Command::Poll { + Command::PollItem { partition_key, sort_key, causality, @@ -356,7 +451,54 @@ async fn main() -> Result<(), Error> { if let Some(res) = res_opt { output_kind.display_output(res); } else { - println!("Delay expired and value didn't change."); + if output_kind.json { + println!("null"); + } else { + println!("Delay expired and value didn't change."); + } + } + } + Command::PollRange { + partition_key, + filter, + seen_marker, + timeout, + output_kind, + } => { + if filter.conflicts_only + || filter.tombstones + || filter.reverse + || filter.limit.is_some() + { + return Err(Error::Message( + "limit, reverse, conlicts-only, tombstones are invalid for poll-range".into(), + )); + } + + let timeout = timeout.map(Duration::from_secs); + let res = client + .poll_range( + &partition_key, + Some(PollRangeFilter { + start: filter.start.as_deref(), + end: filter.end.as_deref(), + prefix: filter.prefix.as_deref(), + }), + seen_marker.as_deref(), + timeout, + ) + .await?; + match res { + Some((items, seen_marker)) => { + output_kind.display_poll_range_output(seen_marker, items); + } + None => { + if output_kind.json { + println!("null"); + } else { + println!("Delay expired and value didn't change."); + } + } } } Command::ReadIndex { @@ -419,50 +561,7 @@ async fn main() -> Result<(), Error> { }; let mut res = client.read_batch(&[op]).await?; let res = res.pop().unwrap(); - if output_kind.json { - let values = res - .items - .into_iter() - .map(|(k, v)| { - let mut value = serde_json::to_value(v).unwrap(); - value - .as_object_mut() - .unwrap() - .insert("sort_key".to_owned(), k.into()); - value - }) - .collect::>(); - let json = serde_json::json!({ - "next_key": res.next_start, - "values": values, - }); - - let stdout = std::io::stdout(); - serde_json::to_writer_pretty(stdout, &json).unwrap(); - } else { - if let Some(next) = res.next_start { - println!("next key: {}", next); - } - for (key, values) in res.items { - println!("key: {}", key); - let causality: String = values.causality.into(); - println!("causality: {}", causality); - for value in values.value { - match value { - K2vValue::Value(v) => { - if let Ok(string) = std::str::from_utf8(&v) { - println!(" value(utf-8): {}", string); - } else { - println!(" value(base64): {}", base64::encode(&v)); - } - } - K2vValue::Tombstone => { - println!(" tombstone"); - } - } - } - } - } + output_kind.display_read_range_output(res); } Command::DeleteRange { partition_key, diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs index c2606af4..ca52d0cf 100644 --- a/src/k2v-client/lib.rs +++ b/src/k2v-client/lib.rs @@ -40,7 +40,13 @@ impl K2vClient { creds: AwsCredentials, user_agent: Option, ) -> Result { - let mut client = HttpClient::new()?; + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots() + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + let mut client = HttpClient::from_connector(connector); if let Some(ua) = user_agent { client.local_agent_prepend(ua); } else { @@ -153,6 +159,58 @@ impl K2vClient { } } + /// Perform a PollRange request, waiting for any change in a given range of keys + /// to occur + pub async fn poll_range( + &self, + partition_key: &str, + filter: Option>, + seen_marker: Option<&str>, + timeout: Option, + ) -> Result, String)>, Error> { + let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT); + + let request = PollRangeRequest { + filter: filter.unwrap_or_default(), + seen_marker, + timeout: timeout.as_secs(), + }; + + let mut req = SignedRequest::new( + "POST", + SERVICE, + &self.region, + &format!("/{}/{}", self.bucket, partition_key), + ); + req.add_param("poll_range", ""); + + let payload = serde_json::to_vec(&request)?; + req.set_payload(Some(payload)); + let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; + + if res.status == StatusCode::NOT_MODIFIED { + return Ok(None); + } + + let resp: PollRangeResponse = serde_json::from_slice(&res.body)?; + + let items = resp + .items + .into_iter() + .map(|BatchReadItem { sk, ct, v }| { + ( + sk, + CausalValue { + causality: ct, + value: v, + }, + ) + }) + .collect::>(); + + Ok(Some((items, resp.seen_marker))) + } + /// Perform an InsertItem request, inserting a value for a single pk+sk. pub async fn insert_item( &self, @@ -389,6 +447,12 @@ impl From for String { } } +impl AsRef for CausalityToken { + fn as_ref(&self) -> &str { + &self.0 + } +} + /// A value in K2V. can be either a binary value, or a tombstone. #[derive(Debug, Clone, PartialEq, Eq)] pub enum K2vValue { @@ -466,6 +530,29 @@ pub struct Filter<'a> { pub reverse: bool, } +#[derive(Debug, Default, Clone, Serialize)] +pub struct PollRangeFilter<'a> { + pub start: Option<&'a str>, + pub end: Option<&'a str>, + pub prefix: Option<&'a str>, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct PollRangeRequest<'a> { + #[serde(flatten)] + filter: PollRangeFilter<'a>, + seen_marker: Option<&'a str>, + timeout: u64, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PollRangeResponse { + items: Vec, + seen_marker: String, +} + impl<'a> Filter<'a> { fn insert_params(&self, req: &mut SignedRequest) { if let Some(start) = &self.start { From 09a3dad0f2c1641e4e300809dbdb3599b32efc03 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 11:35:36 +0100 Subject: [PATCH 09/16] Lock once for insert_many --- src/model/k2v/rpc.rs | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 8b070885..04ab3ab9 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -7,7 +7,7 @@ use std::collections::{BTreeMap, HashMap}; use std::convert::TryInto; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::time::Duration; use async_trait::async_trait; @@ -72,7 +72,12 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc, item_table: Arc>, + + // Using a mutex on the local_timestamp_tree is not strictly necessary, + // but it helps to not try to do several inserts at the same time, + // which would create transaction conflicts and force many useless retries. local_timestamp_tree: Mutex, + endpoint: Arc>, subscriptions: Arc, } @@ -323,7 +328,10 @@ impl K2VRpcHandler { // ---- internal handlers ---- async fn handle_insert(&self, item: &InsertedItem) -> Result { - let new = self.local_insert(item)?; + let new = { + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + self.local_insert(&local_timestamp_tree, item)? + }; // Propagate to rest of network if let Some(updated) = new { @@ -336,11 +344,14 @@ impl K2VRpcHandler { async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result { let mut updated_vec = vec![]; - for item in items { - let new = self.local_insert(item)?; + { + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + for item in items { + let new = self.local_insert(&local_timestamp_tree, item)?; - if let Some(updated) = new { - updated_vec.push(updated); + if let Some(updated) = new { + updated_vec.push(updated); + } } } @@ -352,12 +363,11 @@ impl K2VRpcHandler { Ok(K2VRpc::Ok) } - fn local_insert(&self, item: &InsertedItem) -> Result, Error> { - // Using a mutex on the local_timestamp_tree is not strictly necessary, - // but it helps to not try to do several inserts at the same time, - // which would create transaction conflicts and force many useless retries. - let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); - + fn local_insert( + &self, + local_timestamp_tree: &MutexGuard<'_, db::Tree>, + item: &InsertedItem, + ) -> Result, Error> { let now = now_msec(); self.item_table From ba384e61c0951036b0c4fb394011f3498abf67ca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 12:03:17 +0100 Subject: [PATCH 10/16] PollRange: return immediately if no seen marker is provided --- doc/drafts/k2v-spec.md | 7 ++++++ src/model/k2v/rpc.rs | 57 +++++++++++++++++++++++++++++------------- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/doc/drafts/k2v-spec.md b/doc/drafts/k2v-spec.md index a335aee3..b3c79c08 100644 --- a/doc/drafts/k2v-spec.md +++ b/doc/drafts/k2v-spec.md @@ -723,6 +723,13 @@ The query body is a JSON object consisting of the following fields: The timeout can be set to any number of seconds, with a maximum of 600 seconds (10 minutes). +If no seen marker is known by the caller, it can do a PollRange call +without specifying `seenMarker`. In this case, the PollRange call will +complete immediately, and return the current content of the range (which +can be empty) and a seen marker to be used in further PollRange calls. This +is the only case in which PollRange might return an HTTP 200 with an empty +set of items. + The response is either: - A HTTP 304 NOT MODIFIED response with an empty body, if the timeout expired and no changes occurred diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 04ab3ab9..04801ebf 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -268,6 +268,8 @@ impl K2VRpcHandler { seen_str: Option, timeout_msec: u64, ) -> Result, String)>, Error> { + let has_seen_marker = seen_str.is_some(); + let mut seen = seen_str .as_deref() .map(RangeSeenMarker::decode) @@ -318,7 +320,7 @@ impl K2VRpcHandler { } } - if new_items.is_empty() { + if new_items.is_empty() && has_seen_marker { Ok(None) } else { Ok(Some((new_items, seen.encode()?))) @@ -432,16 +434,44 @@ impl K2VRpcHandler { range: &PollRange, seen_str: &Option, ) -> Result, Error> { - let seen = seen_str - .as_deref() - .map(RangeSeenMarker::decode) - .transpose()? - .unwrap_or_default(); + if let Some(seen_str) = seen_str { + let seen = RangeSeenMarker::decode(seen_str)?; + + // Subscribe now to all changes on that partition, + // so that new items that are inserted while we are reading the range + // will be seen in the loop below + let mut chan = self.subscriptions.subscribe_partition(&range.partition); + + // Check for the presence of any new items already stored in the item table + let mut new_items = self.poll_range_read_range(range, &seen)?; + + // If we found no new items, wait for a matching item to arrive + // on the channel + while new_items.is_empty() { + let item = chan.recv().await?; + if range.matches(&item) && seen.is_new_item(&item) { + new_items.push(item); + } + } + + Ok(new_items) + } else { + // If no seen marker was specified, we do not poll for anything. + // We return immediately with the set of known items (even if + // it is empty), which will give the client an inital view of + // the dataset and an initial seen marker for further + // PollRange calls. + self.poll_range_read_range(range, &RangeSeenMarker::default()) + } + } + + fn poll_range_read_range( + &self, + range: &PollRange, + seen: &RangeSeenMarker, + ) -> Result, Error> { let mut new_items = vec![]; - let mut chan = self.subscriptions.subscribe_partition(&range.partition); - - // Read current state of the specified range to check new items let partition_hash = range.partition.hash(); let first_key = match &range.start { None => partition_hash.to_vec(), @@ -461,15 +491,6 @@ impl K2VRpcHandler { } } - // If we found no new items, wait for a matching item to arrive - // on the channel - while new_items.is_empty() { - let item = chan.recv().await?; - if range.matches(&item) && seen.is_new_item(&item) { - new_items.push(item); - } - } - Ok(new_items) } } From bba13f40fc2e411347ea83960935b39cedb0a7c4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 12:27:19 +0100 Subject: [PATCH 11/16] Correctly return bad requests when seeh marker is invalid --- src/api/k2v/batch.rs | 6 +----- src/api/k2v/item.rs | 10 ++++------ src/model/k2v/causality.rs | 23 ++++++++++++++++------- src/model/k2v/rpc.rs | 13 +++++++------ src/model/k2v/seen.rs | 16 +++++++++++----- 5 files changed, 39 insertions(+), 29 deletions(-) diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index be3fba07..844faf89 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -24,11 +24,7 @@ pub async fn handle_insert_batch( let mut items2 = vec![]; for it in items { - let ct = it - .ct - .map(|s| CausalContext::parse(&s)) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?; let v = match it.v { Some(vs) => { DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?) diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index ebf34723..e7385bcc 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -133,9 +133,8 @@ pub async fn handle_insert_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + .map(CausalContext::parse_helper) + .transpose()?; let body = hyper::body::to_bytes(req.into_body()).await?; let value = DvvsValue::Value(body.to_vec()); @@ -169,9 +168,8 @@ pub async fn handle_delete_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + .map(CausalContext::parse_helper) + .transpose()?; let value = DvvsValue::Deleted; diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 665b02b2..b1ec8035 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -15,6 +15,8 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; +use crate::helper::error::{Error as HelperError, OkOrBadRequest}; + /// Node IDs used in K2V are u64 integers that are the abbreviation /// of full Garage node IDs which are 256-bit UUIDs. pub type K2VNodeId = u64; @@ -50,6 +52,7 @@ impl CausalContext { pub fn new() -> Self { Self::default() } + /// Make binary representation and encode in base64 pub fn serialize(&self) -> String { let mut ints = Vec::with_capacity(2 * self.vector_clock.len()); @@ -66,12 +69,13 @@ impl CausalContext { base64::encode_config(bytes, base64::URL_SAFE_NO_PAD) } - /// Parse from base64-encoded binary representation - pub fn parse(s: &str) -> Result { - let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD) - .map_err(|e| format!("bad causality token base64: {}", e))?; + + /// Parse from base64-encoded binary representation. + /// Returns None on error. + pub fn parse(s: &str) -> Option { + let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD).ok()?; if bytes.len() % 16 != 8 || bytes.len() < 8 { - return Err("bad causality token length".into()); + return None; } let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap()); @@ -88,11 +92,16 @@ impl CausalContext { let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t); if check != checksum { - return Err("bad causality token checksum".into()); + return None; } - Ok(ret) + Some(ret) } + + pub fn parse_helper(s: &str) -> Result { + Self::parse(s).ok_or_bad_request("Invalid causality token") + } + /// Check if this causal context contains newer items than another one pub fn is_newer_than(&self, other: &Self) -> bool { vclock_gt(&self.vector_clock, &other.vector_clock) diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 04801ebf..3b4d7465 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -29,6 +29,7 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; use garage_table::{PartitionKey, Table}; +use crate::helper::error::Error as HelperError; use crate::k2v::causality::*; use crate::k2v::item_table::*; use crate::k2v::seen::*; @@ -212,7 +213,7 @@ impl K2VRpcHandler { sort_key: String, causal_context: CausalContext, timeout_msec: u64, - ) -> Result, Error> { + ) -> Result, HelperError> { let poll_key = PollKey { partition: K2VItemPartition { bucket_id, @@ -255,7 +256,7 @@ impl K2VRpcHandler { } } K2VRpc::PollItemResponse(None) => (), - v => return Err(Error::unexpected_rpc_message(v)), + v => return Err(Error::unexpected_rpc_message(v).into()), } } @@ -267,12 +268,12 @@ impl K2VRpcHandler { range: PollRange, seen_str: Option, timeout_msec: u64, - ) -> Result, String)>, Error> { + ) -> Result, String)>, HelperError> { let has_seen_marker = seen_str.is_some(); let mut seen = seen_str .as_deref() - .map(RangeSeenMarker::decode) + .map(RangeSeenMarker::decode_helper) .transpose()? .unwrap_or_default(); seen.restrict(&range); @@ -316,7 +317,7 @@ impl K2VRpcHandler { } } } else { - return Err(Error::unexpected_rpc_message(v)); + return Err(Error::unexpected_rpc_message(v).into()); } } @@ -435,7 +436,7 @@ impl K2VRpcHandler { seen_str: &Option, ) -> Result, Error> { if let Some(seen_str) = seen_str { - let seen = RangeSeenMarker::decode(seen_str)?; + let seen = RangeSeenMarker::decode(seen_str).ok_or_message("Invalid seenMarker")?; // Subscribe now to all changes on that partition, // so that new items that are inserted while we are reading the range diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs index d2cd54c7..314d0f9e 100644 --- a/src/model/k2v/seen.rs +++ b/src/model/k2v/seen.rs @@ -13,8 +13,9 @@ use serde::{Deserialize, Serialize}; use garage_util::data::Uuid; use garage_util::encode::{nonversioned_decode, nonversioned_encode}; -use garage_util::error::{Error, OkOrMessage}; +use garage_util::error::Error; +use crate::helper::error::{Error as HelperError, OkOrBadRequest}; use crate::k2v::causality::*; use crate::k2v::item_table::*; use crate::k2v::sub::*; @@ -80,10 +81,15 @@ impl RangeSeenMarker { Ok(base64::encode(&bytes)) } - pub fn decode(s: &str) -> Result { - let bytes = base64::decode(&s).ok_or_message("invalid base64")?; - let bytes = zstd::stream::decode_all(&mut &bytes[..])?; - Ok(nonversioned_decode(&bytes)?) + /// Decode from msgpack+zstd+b64 representation, returns None on error. + pub fn decode(s: &str) -> Option { + let bytes = base64::decode(&s).ok()?; + let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?; + nonversioned_decode(&bytes).ok() + } + + pub fn decode_helper(s: &str) -> Result { + Self::decode(s).ok_or_bad_request("Invalid causality token") } pub fn is_new_item(&self, item: &K2VItem) -> bool { From cbfae673e83251e988d764d3a29f06f571ba8452 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 15:03:08 +0100 Subject: [PATCH 12/16] PollRange & PollItem: min timeout = 1 sec --- src/api/k2v/batch.rs | 2 +- src/api/k2v/item.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 844faf89..abc9403c 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -256,7 +256,7 @@ pub(crate) async fn handle_poll_range( let query = parse_json_body::(req).await?; - let timeout_msec = query.timeout.unwrap_or(300).clamp(10, 600) * 1000; + let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000; let resp = garage .k2v diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index e7385bcc..787a7df3 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -206,7 +206,7 @@ pub async fn handle_poll_item( let causal_context = CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?; - let timeout_msec = timeout_secs.unwrap_or(300).clamp(10, 600) * 1000; + let timeout_msec = timeout_secs.unwrap_or(300).clamp(1, 600) * 1000; let item = garage .k2v From 5b5ca63cf6d108e7bc7d83da68667163a278d30a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 15:17:27 +0100 Subject: [PATCH 13/16] Poll cleanup --- src/k2v-client/Cargo.toml | 2 +- src/model/k2v/rpc.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml index d95a1b3f..7ab38e02 100644 --- a/src/k2v-client/Cargo.toml +++ b/src/k2v-client/Cargo.toml @@ -15,7 +15,7 @@ log = "0.4" rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] } rusoto_credential = "0.48.0" rusoto_signature = "0.48.0" -hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12", "logging" ] } +hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12" ] } serde = "1.0.137" serde_json = "1.0.81" thiserror = "1.0.31" diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 3b4d7465..f5d8ffc2 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -213,7 +213,7 @@ impl K2VRpcHandler { sort_key: String, causal_context: CausalContext, timeout_msec: u64, - ) -> Result, HelperError> { + ) -> Result, Error> { let poll_key = PollKey { partition: K2VItemPartition { bucket_id, @@ -256,7 +256,7 @@ impl K2VRpcHandler { } } K2VRpc::PollItemResponse(None) => (), - v => return Err(Error::unexpected_rpc_message(v).into()), + v => return Err(Error::unexpected_rpc_message(v)), } } From 399f137fd079be2c59b032e67d9ccd8d9214407e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 15:19:51 +0100 Subject: [PATCH 14/16] add precision in pollrange doc --- doc/drafts/k2v-spec.md | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/doc/drafts/k2v-spec.md b/doc/drafts/k2v-spec.md index b3c79c08..faa1a247 100644 --- a/doc/drafts/k2v-spec.md +++ b/doc/drafts/k2v-spec.md @@ -723,13 +723,6 @@ The query body is a JSON object consisting of the following fields: The timeout can be set to any number of seconds, with a maximum of 600 seconds (10 minutes). -If no seen marker is known by the caller, it can do a PollRange call -without specifying `seenMarker`. In this case, the PollRange call will -complete immediately, and return the current content of the range (which -can be empty) and a seen marker to be used in further PollRange calls. This -is the only case in which PollRange might return an HTTP 200 with an empty -set of items. - The response is either: - A HTTP 304 NOT MODIFIED response with an empty body, if the timeout expired and no changes occurred @@ -741,6 +734,16 @@ The response is either: | `seenMarker` | An opaque string that represents items already seen for future PollRange calls | | `items` | The list of items that have changed since last PollRange call, in the same format as ReadBatch | +If no seen marker is known by the caller, it can do a PollRange call +without specifying `seenMarker`. In this case, the PollRange call will +complete immediately, and return the current content of the range (which +can be empty) and a seen marker to be used in further PollRange calls. This +is the only case in which PollRange might return an HTTP 200 with an empty +set of items. + +A seen marker returned as a response to a PollRange query can be used for further PollRange +queries on the same range, or for PollRange queries in a subrange of the initial range. +It may not be used for PollRange queries on ranges larger or outside of the initial range. Example query: From 638c5a3ce006eba9d90a6358655ba2091423efd7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 16:12:07 +0100 Subject: [PATCH 15/16] PollRange: add extra RPC delay after quorum is achieved, to give a chance to the 3rd node to respond --- src/model/k2v/rpc.rs | 77 +++++++++++++++++++++++++++++++++---------- src/rpc/rpc_helper.rs | 5 ++- 2 files changed, 61 insertions(+), 21 deletions(-) diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index f5d8ffc2..117103b6 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -8,7 +8,7 @@ use std::collections::{BTreeMap, HashMap}; use std::convert::TryInto; use std::sync::{Arc, Mutex, MutexGuard}; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use futures::stream::FuturesUnordered; @@ -35,6 +35,8 @@ use crate::k2v::item_table::*; use crate::k2v::seen::*; use crate::k2v::sub::*; +const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200); + const TIMESTAMP_KEY: &'static [u8] = b"timestamp"; /// RPC messages for K2V @@ -271,6 +273,8 @@ impl K2VRpcHandler { ) -> Result, String)>, HelperError> { let has_seen_marker = seen_str.is_some(); + // Parse seen marker, we will use it below. This is also the first check + // that it is valid, which returns a bad request error if not. let mut seen = seen_str .as_deref() .map(RangeSeenMarker::decode_helper) @@ -278,30 +282,67 @@ impl K2VRpcHandler { .unwrap_or_default(); seen.restrict(&range); + // Prepare PollRange RPC to send to the storage nodes responsible for the parititon let nodes = self .item_table .data .replication .write_nodes(&range.partition.hash()); - - let rpc = self.system.rpc.try_call_many( - &self.endpoint, - &nodes[..], - K2VRpc::PollRange { - range, - seen_str, - timeout_msec, - }, - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.item_table.data.replication.read_quorum()) - .without_timeout(), - ); - let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); - let resps = select! { - r = rpc => r?, - _ = tokio::time::sleep(timeout_duration) => return Ok(None), + let quorum = self.item_table.data.replication.read_quorum(); + let msg = K2VRpc::PollRange { + range, + seen_str, + timeout_msec, }; + // Send the request to all nodes, use FuturesUnordered to get the responses in any order + let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout(); + let mut requests = nodes + .iter() + .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs)) + .collect::>(); + + // Fetch responses. This procedure stops fetching responses when any of the following + // conditions arise: + // - we have a response to all requests + // - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay + // has passed since the quorum was achieved + // - a global RPC timeout expired + // The extra delay after a quorum was received is usefull if the third response was to + // arrive during this short interval: this would allow us to consider all the data seen + // by that last node in the response we produce, and would likely help reduce the + // size of the seen marker that we will return (because we would have an info of the + // kind: all items produced by that node until time ts have been returned, so we can + // bump the entry in the global vector clock and possibly remove some item-specific + // vector clocks) + let mut deadline = + Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let mut resps = vec![]; + let mut errors = vec![]; + loop { + select! { + _ = tokio::time::sleep_until(deadline.into()) => { + break; + } + res = requests.next() => match res { + None => break, + Some(Err(e)) => errors.push(e), + Some(Ok(r)) => { + resps.push(r); + if resps.len() >= quorum { + deadline = std::cmp::min(deadline, Instant::now() + POLL_RANGE_EXTRA_DELAY); + } + } + } + } + } + if errors.len() > nodes.len() - quorum { + let errors = errors.iter().map(|e| format!("{}", e)).collect::>(); + return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into()); + } + + // Take all returned items into account to produce the response. let mut new_items = BTreeMap::::new(); for v in resps { if let K2VRpc::PollRangeResponse(node, items) = v { diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 1ec250c3..e59c372a 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -15,10 +15,9 @@ use opentelemetry::{ }; pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; -use netapp::message::IntoReq; pub use netapp::message::{ - Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, - PRIO_SECONDARY, + IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, + PRIO_NORMAL, PRIO_SECONDARY, }; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; From 1dff62564fdda392a97986dca55232f30a1f4234 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 26 Jan 2023 17:05:31 +0100 Subject: [PATCH 16/16] fix clippy --- src/garage/admin.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 4eabebca..2ef3077c 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -911,7 +911,9 @@ impl AdminRpcHandler { let role = layout.roles.get(id).and_then(|x| x.0.as_ref()); let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?"); let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); - let capacity = role.map(|x| x.capacity_string()).unwrap_or("?".into()); + let capacity = role + .map(|x| x.capacity_string()) + .unwrap_or_else(|| "?".into()); let avail_str = |x| match x { Some((avail, total)) => { let pct = (avail as f64) / (total as f64) * 100.;