diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index e5497215..8b070885 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -5,7 +5,7 @@ //! node does not process the entry directly, as this would //! mean the vector clock gets much larger than needed). -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -31,6 +31,7 @@ use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; use crate::k2v::item_table::*; +use crate::k2v::seen::*; use crate::k2v::sub::*; const TIMESTAMP_KEY: &'static [u8] = b"timestamp"; @@ -46,7 +47,13 @@ enum K2VRpc { causal_context: CausalContext, timeout_msec: u64, }, + PollRange { + range: PollRange, + seen_str: Option, + timeout_msec: u64, + }, PollItemResponse(Option), + PollRangeResponse(Uuid, Vec), } #[derive(Debug, Serialize, Deserialize)] @@ -242,9 +249,7 @@ impl K2VRpcHandler { resp = Some(x); } } - K2VRpc::PollItemResponse(None) => { - return Ok(None); - } + K2VRpc::PollItemResponse(None) => (), v => return Err(Error::unexpected_rpc_message(v)), } } @@ -252,6 +257,69 @@ impl K2VRpcHandler { Ok(resp) } + pub async fn poll_range( + &self, + range: PollRange, + seen_str: Option, + timeout_msec: u64, + ) -> Result, String)>, Error> { + let mut seen = seen_str + .as_deref() + .map(RangeSeenMarker::decode) + .transpose()? + .unwrap_or_default(); + seen.restrict(&range); + + let nodes = self + .item_table + .data + .replication + .write_nodes(&range.partition.hash()); + + let rpc = self.system.rpc.try_call_many( + &self.endpoint, + &nodes[..], + K2VRpc::PollRange { + range, + seen_str, + timeout_msec, + }, + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.item_table.data.replication.read_quorum()) + .without_timeout(), + ); + let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let resps = select! { + r = rpc => r?, + _ = tokio::time::sleep(timeout_duration) => return Ok(None), + }; + + let mut new_items = BTreeMap::::new(); + for v in resps { + if let K2VRpc::PollRangeResponse(node, items) = v { + seen.mark_seen_node_items(node, items.iter()); + for item in items.into_iter() { + match new_items.get_mut(&item.sort_key) { + Some(ent) => { + ent.merge(&item); + } + None => { + new_items.insert(item.sort_key.clone(), item); + } + } + } + } else { + return Err(Error::unexpected_rpc_message(v)); + } + } + + if new_items.is_empty() { + Ok(None) + } else { + Ok(Some((new_items, seen.encode()?))) + } + } + // ---- internal handlers ---- async fn handle_insert(&self, item: &InsertedItem) -> Result { @@ -348,6 +416,52 @@ impl K2VRpcHandler { Ok(value) } + + async fn handle_poll_range( + &self, + range: &PollRange, + seen_str: &Option, + ) -> Result, Error> { + let seen = seen_str + .as_deref() + .map(RangeSeenMarker::decode) + .transpose()? + .unwrap_or_default(); + let mut new_items = vec![]; + + let mut chan = self.subscriptions.subscribe_partition(&range.partition); + + // Read current state of the specified range to check new items + let partition_hash = range.partition.hash(); + let first_key = match &range.start { + None => partition_hash.to_vec(), + Some(sk) => self.item_table.data.tree_key(&range.partition, sk), + }; + for item in self.item_table.data.store.range(first_key..)? { + let (key, value) = item?; + if &key[..32] != partition_hash.as_slice() { + break; + } + let item = self.item_table.data.decode_entry(&value)?; + if !range.matches(&item) { + break; + } + if seen.is_new_item(&item) { + new_items.push(item); + } + } + + // If we found no new items, wait for a matching item to arrive + // on the channel + while new_items.is_empty() { + let item = chan.recv().await?; + if range.matches(&item) && seen.is_new_item(&item) { + new_items.push(item); + } + } + + Ok(new_items) + } } #[async_trait] @@ -367,6 +481,17 @@ impl EndpointHandler for K2VRpcHandler { _ = delay => Ok(K2VRpc::PollItemResponse(None)), } } + K2VRpc::PollRange { + range, + seen_str, + timeout_msec, + } => { + let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); + select! { + ret = self.handle_poll_range(range, seen_str) => ret.map(|items| K2VRpc::PollRangeResponse(self.system.id, items)), + _ = delay => Ok(K2VRpc::PollRangeResponse(self.system.id, vec![])), + } + } m => Err(Error::unexpected_rpc_message(m)), } } diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs index b8f4ff27..d2cd54c7 100644 --- a/src/model/k2v/seen.rs +++ b/src/model/k2v/seen.rs @@ -17,6 +17,7 @@ use garage_util::error::{Error, OkOrMessage}; use crate::k2v::causality::*; use crate::k2v::item_table::*; +use crate::k2v::sub::*; #[derive(Debug, Serialize, Deserialize, Default)] pub struct RangeSeenMarker { @@ -29,6 +30,18 @@ impl RangeSeenMarker { Self::default() } + pub fn restrict(&mut self, range: &PollRange) { + if let Some(start) = &range.start { + self.items = self.items.split_off(start); + } + if let Some(end) = &range.end { + self.items.split_off(end); + } + if let Some(pfx) = &range.prefix { + self.items.retain(|k, _v| k.starts_with(pfx)); + } + } + pub fn mark_seen_node_items<'a, I: IntoIterator>( &mut self, node: Uuid, diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs index c4273dba..b1daa271 100644 --- a/src/model/k2v/sub.rs +++ b/src/model/k2v/sub.rs @@ -26,7 +26,7 @@ pub struct SubscriptionManager(Mutex); #[derive(Default)] pub struct SubscriptionManagerInner { item_subscriptions: HashMap>, - range_subscriptions: HashMap>, + part_subscriptions: HashMap>, } impl SubscriptionManager { @@ -34,7 +34,7 @@ impl SubscriptionManager { Self::default() } - pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver { + pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver { let mut inner = self.0.lock().unwrap(); if let Some(s) = inner.item_subscriptions.get(key) { s.subscribe() @@ -45,18 +45,21 @@ impl SubscriptionManager { } } - pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver { + pub(crate) fn subscribe_partition( + &self, + part: &K2VItemPartition, + ) -> broadcast::Receiver { let mut inner = self.0.lock().unwrap(); - if let Some(s) = inner.range_subscriptions.get(key) { + if let Some(s) = inner.part_subscriptions.get(part) { s.subscribe() } else { let (tx, rx) = broadcast::channel(8); - inner.range_subscriptions.insert(key.clone(), tx); + inner.part_subscriptions.insert(part.clone(), tx); rx } } - pub fn notify(&self, item: &K2VItem) { + pub(crate) fn notify(&self, item: &K2VItem) { let mut inner = self.0.lock().unwrap(); // 1. Notify single item subscribers, @@ -73,20 +76,20 @@ impl SubscriptionManager { } } - // 2. Notify range subscribers, + // 2. Notify partition subscribers, // removing subscriptions with no more listeners if any - inner.range_subscriptions.retain(|sub, chan| { - if sub.matches(&item) { - chan.send(item.clone()).is_ok() - } else { - chan.receiver_count() != 0 + if let Some(s) = inner.part_subscriptions.get(&item.partition) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + inner.part_subscriptions.remove(&item.partition); } - }); + } } } impl PollRange { - fn matches(&self, item: &K2VItem) -> bool { + pub fn matches(&self, item: &K2VItem) -> bool { item.partition == self.partition && self .prefix