K2V PollRange, version 2 #471
2 changed files with 46 additions and 18 deletions
|
@ -723,6 +723,13 @@ The query body is a JSON object consisting of the following fields:
|
||||||
|
|
||||||
The timeout can be set to any number of seconds, with a maximum of 600 seconds (10 minutes).
|
The timeout can be set to any number of seconds, with a maximum of 600 seconds (10 minutes).
|
||||||
|
|
||||||
|
If no seen marker is known by the caller, it can do a PollRange call
|
||||||
|
without specifying `seenMarker`. In this case, the PollRange call will
|
||||||
|
complete immediately, and return the current content of the range (which
|
||||||
|
can be empty) and a seen marker to be used in further PollRange calls. This
|
||||||
|
is the only case in which PollRange might return an HTTP 200 with an empty
|
||||||
|
set of items.
|
||||||
|
|
||||||
The response is either:
|
The response is either:
|
||||||
|
|
||||||
- A HTTP 304 NOT MODIFIED response with an empty body, if the timeout expired and no changes occurred
|
- A HTTP 304 NOT MODIFIED response with an empty body, if the timeout expired and no changes occurred
|
||||||
|
|
|
@ -268,6 +268,8 @@ impl K2VRpcHandler {
|
||||||
seen_str: Option<String>,
|
seen_str: Option<String>,
|
||||||
timeout_msec: u64,
|
timeout_msec: u64,
|
||||||
) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, Error> {
|
) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, Error> {
|
||||||
|
let has_seen_marker = seen_str.is_some();
|
||||||
|
|
||||||
let mut seen = seen_str
|
let mut seen = seen_str
|
||||||
.as_deref()
|
.as_deref()
|
||||||
.map(RangeSeenMarker::decode)
|
.map(RangeSeenMarker::decode)
|
||||||
|
@ -318,7 +320,7 @@ impl K2VRpcHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if new_items.is_empty() {
|
if new_items.is_empty() && has_seen_marker {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
} else {
|
} else {
|
||||||
Ok(Some((new_items, seen.encode()?)))
|
Ok(Some((new_items, seen.encode()?)))
|
||||||
|
@ -432,16 +434,44 @@ impl K2VRpcHandler {
|
||||||
range: &PollRange,
|
range: &PollRange,
|
||||||
seen_str: &Option<String>,
|
seen_str: &Option<String>,
|
||||||
) -> Result<Vec<K2VItem>, Error> {
|
) -> Result<Vec<K2VItem>, Error> {
|
||||||
let seen = seen_str
|
if let Some(seen_str) = seen_str {
|
||||||
.as_deref()
|
let seen = RangeSeenMarker::decode(seen_str)?;
|
||||||
.map(RangeSeenMarker::decode)
|
|
||||||
.transpose()?
|
|
||||||
.unwrap_or_default();
|
|
||||||
let mut new_items = vec![];
|
|
||||||
|
|
||||||
|
// Subscribe now to all changes on that partition,
|
||||||
|
// so that new items that are inserted while we are reading the range
|
||||||
|
// will be seen in the loop below
|
||||||
let mut chan = self.subscriptions.subscribe_partition(&range.partition);
|
let mut chan = self.subscriptions.subscribe_partition(&range.partition);
|
||||||
|
|
||||||
// Read current state of the specified range to check new items
|
// Check for the presence of any new items already stored in the item table
|
||||||
|
let mut new_items = self.poll_range_read_range(range, &seen)?;
|
||||||
|
|
||||||
|
// If we found no new items, wait for a matching item to arrive
|
||||||
|
// on the channel
|
||||||
|
while new_items.is_empty() {
|
||||||
|
let item = chan.recv().await?;
|
||||||
|
if range.matches(&item) && seen.is_new_item(&item) {
|
||||||
|
new_items.push(item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(new_items)
|
||||||
|
} else {
|
||||||
|
// If no seen marker was specified, we do not poll for anything.
|
||||||
|
// We return immediately with the set of known items (even if
|
||||||
|
// it is empty), which will give the client an inital view of
|
||||||
|
// the dataset and an initial seen marker for further
|
||||||
|
// PollRange calls.
|
||||||
|
self.poll_range_read_range(range, &RangeSeenMarker::default())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_range_read_range(
|
||||||
|
&self,
|
||||||
|
range: &PollRange,
|
||||||
|
seen: &RangeSeenMarker,
|
||||||
|
) -> Result<Vec<K2VItem>, Error> {
|
||||||
|
let mut new_items = vec![];
|
||||||
|
|
||||||
let partition_hash = range.partition.hash();
|
let partition_hash = range.partition.hash();
|
||||||
let first_key = match &range.start {
|
let first_key = match &range.start {
|
||||||
None => partition_hash.to_vec(),
|
None => partition_hash.to_vec(),
|
||||||
|
@ -461,15 +491,6 @@ impl K2VRpcHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we found no new items, wait for a matching item to arrive
|
|
||||||
// on the channel
|
|
||||||
while new_items.is_empty() {
|
|
||||||
let item = chan.recv().await?;
|
|
||||||
if range.matches(&item) && seen.is_new_item(&item) {
|
|
||||||
new_items.push(item);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(new_items)
|
Ok(new_items)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue