K2V PollRange, version 2 #471
3 changed files with 159 additions and 18 deletions
|
@ -5,7 +5,7 @@
|
||||||
//! node does not process the entry directly, as this would
|
//! node does not process the entry directly, as this would
|
||||||
//! mean the vector clock gets much larger than needed).
|
//! mean the vector clock gets much larger than needed).
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -31,6 +31,7 @@ use garage_table::{PartitionKey, Table};
|
||||||
|
|
||||||
use crate::k2v::causality::*;
|
use crate::k2v::causality::*;
|
||||||
use crate::k2v::item_table::*;
|
use crate::k2v::item_table::*;
|
||||||
|
use crate::k2v::seen::*;
|
||||||
use crate::k2v::sub::*;
|
use crate::k2v::sub::*;
|
||||||
|
|
||||||
const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
|
const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
|
||||||
|
@ -46,7 +47,13 @@ enum K2VRpc {
|
||||||
causal_context: CausalContext,
|
causal_context: CausalContext,
|
||||||
timeout_msec: u64,
|
timeout_msec: u64,
|
||||||
},
|
},
|
||||||
|
PollRange {
|
||||||
|
range: PollRange,
|
||||||
|
seen_str: Option<String>,
|
||||||
|
timeout_msec: u64,
|
||||||
|
},
|
||||||
PollItemResponse(Option<K2VItem>),
|
PollItemResponse(Option<K2VItem>),
|
||||||
|
PollRangeResponse(Uuid, Vec<K2VItem>),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
@ -242,9 +249,7 @@ impl K2VRpcHandler {
|
||||||
resp = Some(x);
|
resp = Some(x);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
K2VRpc::PollItemResponse(None) => {
|
K2VRpc::PollItemResponse(None) => (),
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
v => return Err(Error::unexpected_rpc_message(v)),
|
v => return Err(Error::unexpected_rpc_message(v)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -252,6 +257,69 @@ impl K2VRpcHandler {
|
||||||
Ok(resp)
|
Ok(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn poll_range(
|
||||||
|
&self,
|
||||||
|
range: PollRange,
|
||||||
|
seen_str: Option<String>,
|
||||||
|
timeout_msec: u64,
|
||||||
|
) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, Error> {
|
||||||
|
let mut seen = seen_str
|
||||||
|
.as_deref()
|
||||||
|
.map(RangeSeenMarker::decode)
|
||||||
|
.transpose()?
|
||||||
|
.unwrap_or_default();
|
||||||
|
seen.restrict(&range);
|
||||||
|
|
||||||
|
let nodes = self
|
||||||
|
.item_table
|
||||||
|
.data
|
||||||
|
.replication
|
||||||
|
.write_nodes(&range.partition.hash());
|
||||||
|
|
||||||
|
let rpc = self.system.rpc.try_call_many(
|
||||||
|
&self.endpoint,
|
||||||
|
&nodes[..],
|
||||||
|
K2VRpc::PollRange {
|
||||||
|
range,
|
||||||
|
seen_str,
|
||||||
|
timeout_msec,
|
||||||
|
},
|
||||||
|
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||||
|
.with_quorum(self.item_table.data.replication.read_quorum())
|
||||||
|
.without_timeout(),
|
||||||
|
);
|
||||||
|
let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
|
||||||
|
let resps = select! {
|
||||||
|
r = rpc => r?,
|
||||||
|
_ = tokio::time::sleep(timeout_duration) => return Ok(None),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut new_items = BTreeMap::<String, K2VItem>::new();
|
||||||
|
for v in resps {
|
||||||
|
if let K2VRpc::PollRangeResponse(node, items) = v {
|
||||||
|
seen.mark_seen_node_items(node, items.iter());
|
||||||
|
for item in items.into_iter() {
|
||||||
|
match new_items.get_mut(&item.sort_key) {
|
||||||
|
Some(ent) => {
|
||||||
|
ent.merge(&item);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
new_items.insert(item.sort_key.clone(), item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(Error::unexpected_rpc_message(v));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if new_items.is_empty() {
|
||||||
|
Ok(None)
|
||||||
|
} else {
|
||||||
|
Ok(Some((new_items, seen.encode()?)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ---- internal handlers ----
|
// ---- internal handlers ----
|
||||||
|
|
||||||
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
|
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
|
||||||
|
@ -348,6 +416,52 @@ impl K2VRpcHandler {
|
||||||
|
|
||||||
Ok(value)
|
Ok(value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_poll_range(
|
||||||
|
&self,
|
||||||
|
range: &PollRange,
|
||||||
|
seen_str: &Option<String>,
|
||||||
|
) -> Result<Vec<K2VItem>, Error> {
|
||||||
|
let seen = seen_str
|
||||||
|
.as_deref()
|
||||||
|
.map(RangeSeenMarker::decode)
|
||||||
|
.transpose()?
|
||||||
|
.unwrap_or_default();
|
||||||
|
let mut new_items = vec![];
|
||||||
|
|
||||||
|
let mut chan = self.subscriptions.subscribe_partition(&range.partition);
|
||||||
|
|
||||||
|
// Read current state of the specified range to check new items
|
||||||
|
let partition_hash = range.partition.hash();
|
||||||
|
let first_key = match &range.start {
|
||||||
|
None => partition_hash.to_vec(),
|
||||||
|
Some(sk) => self.item_table.data.tree_key(&range.partition, sk),
|
||||||
|
};
|
||||||
|
for item in self.item_table.data.store.range(first_key..)? {
|
||||||
|
let (key, value) = item?;
|
||||||
|
if &key[..32] != partition_hash.as_slice() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let item = self.item_table.data.decode_entry(&value)?;
|
||||||
|
if !range.matches(&item) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if seen.is_new_item(&item) {
|
||||||
|
new_items.push(item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we found no new items, wait for a matching item to arrive
|
||||||
|
// on the channel
|
||||||
|
while new_items.is_empty() {
|
||||||
|
let item = chan.recv().await?;
|
||||||
|
if range.matches(&item) && seen.is_new_item(&item) {
|
||||||
|
new_items.push(item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(new_items)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
@ -367,6 +481,17 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
|
||||||
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
|
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
K2VRpc::PollRange {
|
||||||
|
range,
|
||||||
|
seen_str,
|
||||||
|
timeout_msec,
|
||||||
|
} => {
|
||||||
|
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
|
||||||
|
select! {
|
||||||
|
ret = self.handle_poll_range(range, seen_str) => ret.map(|items| K2VRpc::PollRangeResponse(self.system.id, items)),
|
||||||
|
_ = delay => Ok(K2VRpc::PollRangeResponse(self.system.id, vec![])),
|
||||||
|
}
|
||||||
|
}
|
||||||
m => Err(Error::unexpected_rpc_message(m)),
|
m => Err(Error::unexpected_rpc_message(m)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ use garage_util::error::{Error, OkOrMessage};
|
||||||
|
|
||||||
use crate::k2v::causality::*;
|
use crate::k2v::causality::*;
|
||||||
use crate::k2v::item_table::*;
|
use crate::k2v::item_table::*;
|
||||||
|
use crate::k2v::sub::*;
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||||
pub struct RangeSeenMarker {
|
pub struct RangeSeenMarker {
|
||||||
|
@ -29,6 +30,18 @@ impl RangeSeenMarker {
|
||||||
Self::default()
|
Self::default()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn restrict(&mut self, range: &PollRange) {
|
||||||
|
if let Some(start) = &range.start {
|
||||||
|
self.items = self.items.split_off(start);
|
||||||
|
}
|
||||||
|
if let Some(end) = &range.end {
|
||||||
|
self.items.split_off(end);
|
||||||
|
}
|
||||||
|
if let Some(pfx) = &range.prefix {
|
||||||
|
self.items.retain(|k, _v| k.starts_with(pfx));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>(
|
pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>(
|
||||||
&mut self,
|
&mut self,
|
||||||
node: Uuid,
|
node: Uuid,
|
||||||
|
|
|
@ -26,7 +26,7 @@ pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>);
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct SubscriptionManagerInner {
|
pub struct SubscriptionManagerInner {
|
||||||
item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>,
|
item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>,
|
||||||
range_subscriptions: HashMap<PollRange, broadcast::Sender<K2VItem>>,
|
part_subscriptions: HashMap<K2VItemPartition, broadcast::Sender<K2VItem>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SubscriptionManager {
|
impl SubscriptionManager {
|
||||||
|
@ -34,7 +34,7 @@ impl SubscriptionManager {
|
||||||
Self::default()
|
Self::default()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
|
pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
|
||||||
let mut inner = self.0.lock().unwrap();
|
let mut inner = self.0.lock().unwrap();
|
||||||
if let Some(s) = inner.item_subscriptions.get(key) {
|
if let Some(s) = inner.item_subscriptions.get(key) {
|
||||||
s.subscribe()
|
s.subscribe()
|
||||||
|
@ -45,18 +45,21 @@ impl SubscriptionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver<K2VItem> {
|
pub(crate) fn subscribe_partition(
|
||||||
|
&self,
|
||||||
|
part: &K2VItemPartition,
|
||||||
|
) -> broadcast::Receiver<K2VItem> {
|
||||||
let mut inner = self.0.lock().unwrap();
|
let mut inner = self.0.lock().unwrap();
|
||||||
if let Some(s) = inner.range_subscriptions.get(key) {
|
if let Some(s) = inner.part_subscriptions.get(part) {
|
||||||
s.subscribe()
|
s.subscribe()
|
||||||
} else {
|
} else {
|
||||||
let (tx, rx) = broadcast::channel(8);
|
let (tx, rx) = broadcast::channel(8);
|
||||||
inner.range_subscriptions.insert(key.clone(), tx);
|
inner.part_subscriptions.insert(part.clone(), tx);
|
||||||
rx
|
rx
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn notify(&self, item: &K2VItem) {
|
pub(crate) fn notify(&self, item: &K2VItem) {
|
||||||
let mut inner = self.0.lock().unwrap();
|
let mut inner = self.0.lock().unwrap();
|
||||||
|
|
||||||
// 1. Notify single item subscribers,
|
// 1. Notify single item subscribers,
|
||||||
|
@ -73,20 +76,20 @@ impl SubscriptionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Notify range subscribers,
|
// 2. Notify partition subscribers,
|
||||||
// removing subscriptions with no more listeners if any
|
// removing subscriptions with no more listeners if any
|
||||||
inner.range_subscriptions.retain(|sub, chan| {
|
if let Some(s) = inner.part_subscriptions.get(&item.partition) {
|
||||||
if sub.matches(&item) {
|
if s.send(item.clone()).is_err() {
|
||||||
chan.send(item.clone()).is_ok()
|
// no more subscribers, remove channel from here
|
||||||
} else {
|
// (we will re-create it later if we need to subscribe again)
|
||||||
chan.receiver_count() != 0
|
inner.part_subscriptions.remove(&item.partition);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PollRange {
|
impl PollRange {
|
||||||
fn matches(&self, item: &K2VItem) -> bool {
|
pub fn matches(&self, item: &K2VItem) -> bool {
|
||||||
item.partition == self.partition
|
item.partition == self.partition
|
||||||
&& self
|
&& self
|
||||||
.prefix
|
.prefix
|
||||||
|
|
Loading…
Reference in a new issue