K2V PollRange, version 2 #471

Merged
lx merged 18 commits from k2v-watch-range-2 into main 2023-01-26 16:19:05 +00:00
23 changed files with 1125 additions and 198 deletions

3
Cargo.lock generated
View file

@ -1828,12 +1828,13 @@ dependencies = [
[[package]]
name = "k2v-client"
version = "0.0.1"
version = "0.1.1"
dependencies = [
"base64 0.21.0",
"clap 4.1.3",
"garage_util",
"http",
"hyper-rustls 0.23.2",
"log",
"rusoto_core",
"rusoto_credential",

View file

@ -32,7 +32,7 @@ args@{
ignoreLockHash,
}:
let
nixifiedLockHash = "cf836c01a9c668bab5f9a09d468f47aa24c50abec92855503624d706721335ef";
nixifiedLockHash = "8681b26e2f98ae495ad3ac0d6df1241a6bc0f06bd5fa7ae4c08d8cc13b5b88c5";
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
lockHashIgnored = if ignoreLockHash
@ -65,7 +65,7 @@ in
garage_api = rustPackages.unknown.garage_api."0.8.1";
garage_web = rustPackages.unknown.garage_web."0.8.1";
garage = rustPackages.unknown.garage."0.8.1";
k2v-client = rustPackages.unknown.k2v-client."0.0.1";
k2v-client = rustPackages.unknown.k2v-client."0.1.1";
};
"registry+https://github.com/rust-lang/crates.io-index".addr2line."0.19.0" = overridableMkRustCrate (profileName: rec {
name = "addr2line";
@ -2539,9 +2539,9 @@ in
};
});
"unknown".k2v-client."0.0.1" = overridableMkRustCrate (profileName: rec {
"unknown".k2v-client."0.1.1" = overridableMkRustCrate (profileName: rec {
name = "k2v-client";
version = "0.0.1";
version = "0.1.1";
registry = "unknown";
src = fetchCrateLocal (workspaceSrc + "/src/k2v-client");
features = builtins.concatLists [
@ -2554,6 +2554,7 @@ in
${ if rootFeatures' ? "k2v-client/clap" || rootFeatures' ? "k2v-client/cli" then "clap" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".clap."4.1.3" { inherit profileName; }).out;
${ if rootFeatures' ? "k2v-client/cli" || rootFeatures' ? "k2v-client/garage_util" then "garage_util" else null } = (rustPackages."unknown".garage_util."0.8.1" { inherit profileName; }).out;
http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.8" { inherit profileName; }).out;
hyper_rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.23.2" { inherit profileName; }).out;
log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.17" { inherit profileName; }).out;
rusoto_core = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_core."0.48.0" { inherit profileName; }).out;
rusoto_credential = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_credential."0.48.0" { inherit profileName; }).out;

View file

@ -706,6 +706,73 @@ HTTP/1.1 200 OK
]
```
**PollRange: `POST /<bucket>/<partition key>?poll_range`**, or alternatively<br/>
**PollRange: `SEARCH /<bucket>/<partition key>?poll_range`**
Polls a range of items for changes.
The query body is a JSON object consisting of the following fields:
| name | default value | meaning |
|-----------------|---------------|----------------------------------------------------------------------------------------|
| `prefix` | `null` | Restrict items to poll to those whose sort keys start with this prefix |
| `start` | `null` | The sort key of the first item to poll |
| `end` | `null` | The sort key of the last item to poll (excluded) |
| `timeout` | 300 | The timeout before 304 NOT MODIFIED is returned if no value in the range is updated |
| `seenMarker` | `null` | An opaque string returned by a previous PollRange call, that represents items already seen |
The timeout can be set to any number of seconds, with a maximum of 600 seconds (10 minutes).
The response is either:
- A HTTP 304 NOT MODIFIED response with an empty body, if the timeout expired and no changes occurred
- A HTTP 200 response, indicating that some changes have occurred since the last PollRange call, in which case a JSON object is returned in the body with the following fields:
| name | meaning |
|-----------------|----------------------------------------------------------------------------------------|
| `seenMarker` | An opaque string that represents items already seen for future PollRange calls |
| `items` | The list of items that have changed since last PollRange call, in the same format as ReadBatch |
If no seen marker is known by the caller, it can do a PollRange call
without specifying `seenMarker`. In this case, the PollRange call will
complete immediately, and return the current content of the range (which
can be empty) and a seen marker to be used in further PollRange calls. This
is the only case in which PollRange might return an HTTP 200 with an empty
set of items.
A seen marker returned as a response to a PollRange query can be used for further PollRange
queries on the same range, or for PollRange queries in a subrange of the initial range.
It may not be used for PollRange queries on ranges larger or outside of the initial range.
Example query:
```json
SEARCH /my_bucket?poll_range HTTP/1.1
{
"prefix": "0391.",
"start": "0391.000001973107",
"seenMarker": "opaquestring123",
}
```
Example response:
```json
HTTP/1.1 200 OK
Content-Type: application/json
{
"seenMarker": "opaquestring456",
"items": [
{ sk: "0391.000001973221", ct: "opaquetoken123", v: ["b64cryptoblob123", "b64cryptoblob'123"] },
{ sk: "0391.000001974191", ct: "opaquetoken456", v: ["b64cryptoblob456", "b64cryptoblob'456"] },
]
}
```
## Internals: causality tokens

View file

@ -164,6 +164,9 @@ impl ApiHandler for K2VApiServer {
Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
Endpoint::PollRange { partition_key } => {
handle_poll_range(garage, bucket_id, &partition_key, req).await
}
Endpoint::Options => unreachable!(),
};

View file

@ -5,7 +5,6 @@ use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_table::{EnumerationOrder, TableSchema};
@ -26,11 +25,7 @@ pub async fn handle_insert_batch(
let mut items2 = vec![];
for it in items {
let ct = it
.ct
.map(|s| CausalContext::parse(&s))
.transpose()
.ok_or_bad_request("Invalid causality token")?;
let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?;
let v = match it.v {
Some(vs) => DvvsValue::Value(
BASE64_STANDARD
@ -68,10 +63,7 @@ pub async fn handle_read_batch(
resps.push(resp?);
}
let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(resp_json))?)
Ok(json_ok_response(&resps)?)
}
async fn handle_read_batch_query(
@ -163,10 +155,7 @@ pub async fn handle_delete_batch(
resps.push(resp?);
}
let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(resp_json))?)
Ok(json_ok_response(&resps)?)
}
async fn handle_delete_batch_query(
@ -260,6 +249,53 @@ async fn handle_delete_batch_query(
})
}
pub(crate) async fn handle_poll_range(
garage: Arc<Garage>,
bucket_id: Uuid,
partition_key: &str,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
use garage_model::k2v::sub::PollRange;
let query = parse_json_body::<PollRangeQuery>(req).await?;
let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;
let resp = garage
.k2v
.rpc
.poll_range(
PollRange {
partition: K2VItemPartition {
bucket_id,
partition_key: partition_key.to_string(),
},
start: query.start,
end: query.end,
prefix: query.prefix,
},
query.seen_marker,
timeout_msec,
)
.await?;
if let Some((items, seen_marker)) = resp {
let resp = PollRangeResponse {
items: items
.into_iter()
.map(|(_k, i)| ReadBatchResponseItem::from(i))
.collect::<Vec<_>>(),
seen_marker,
};
Ok(json_ok_response(&resp)?)
} else {
Ok(Response::builder()
.status(StatusCode::NOT_MODIFIED)
.body(Body::empty())?)
}
}
#[derive(Deserialize)]
struct InsertBatchItem {
pk: String,
@ -364,3 +400,24 @@ struct DeleteBatchResponse {
#[serde(rename = "deletedItems")]
deleted_items: usize,
}
#[derive(Deserialize)]
struct PollRangeQuery {
#[serde(default)]
prefix: Option<String>,
#[serde(default)]
start: Option<String>,
#[serde(default)]
end: Option<String>,
#[serde(default)]
timeout: Option<u64>,
#[serde(default, rename = "seenMarker")]
seen_marker: Option<String>,
}
#[derive(Serialize)]
struct PollRangeResponse {
items: Vec<ReadBatchResponseItem>,
#[serde(rename = "seenMarker")]
seen_marker: String,
}

View file

@ -1,10 +1,9 @@
use std::sync::Arc;
use hyper::{Body, Response, StatusCode};
use hyper::{Body, Response};
use serde::Serialize;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_rpc::ring::Ring;
use garage_table::util::*;
@ -12,6 +11,7 @@ use garage_table::util::*;
use garage_model::garage::Garage;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use crate::helpers::*;
use crate::k2v::error::*;
use crate::k2v::range::read_range;
@ -68,10 +68,7 @@ pub async fn handle_read_index(
next_start,
};
let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(resp_json))?)
Ok(json_ok_response(&resp)?)
}
#[derive(Serialize)]

View file

@ -134,9 +134,8 @@ pub async fn handle_insert_item(
.get(X_GARAGE_CAUSALITY_TOKEN)
.map(|s| s.to_str())
.transpose()?
.map(CausalContext::parse)
.transpose()
.ok_or_bad_request("Invalid causality token")?;
.map(CausalContext::parse_helper)
.transpose()?;
let body = hyper::body::to_bytes(req.into_body()).await?;
let value = DvvsValue::Value(body.to_vec());
@ -170,9 +169,8 @@ pub async fn handle_delete_item(
.get(X_GARAGE_CAUSALITY_TOKEN)
.map(|s| s.to_str())
.transpose()?
.map(CausalContext::parse)
.transpose()
.ok_or_bad_request("Invalid causality token")?;
.map(CausalContext::parse_helper)
.transpose()?;
let value = DvvsValue::Deleted;
@ -209,15 +207,17 @@ pub async fn handle_poll_item(
let causal_context =
CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?;
let timeout_msec = timeout_secs.unwrap_or(300).clamp(1, 600) * 1000;
let item = garage
.k2v
.rpc
.poll(
.poll_item(
bucket_id,
partition_key,
sort_key,
causal_context,
timeout_secs.unwrap_or(300) * 1000,
timeout_msec,
)
.await?;

View file

@ -32,6 +32,9 @@ pub enum Endpoint {
causality_token: String,
timeout: Option<u64>,
},
PollRange {
partition_key: String,
},
ReadBatch {
},
ReadIndex {
@ -113,6 +116,7 @@ impl Endpoint {
@gen_parser
(query.keyword.take().unwrap_or_default(), partition_key, query, None),
key: [
POLL_RANGE => PollRange,
],
no_key: [
EMPTY => ReadBatch,
@ -142,6 +146,7 @@ impl Endpoint {
@gen_parser
(query.keyword.take().unwrap_or_default(), partition_key, query, None),
key: [
POLL_RANGE => PollRange,
],
no_key: [
EMPTY => InsertBatch,
@ -234,7 +239,8 @@ impl Endpoint {
generateQueryParameters! {
keywords: [
"delete" => DELETE,
"search" => SEARCH
"search" => SEARCH,
"poll_range" => POLL_RANGE
],
fields: [
"prefix" => prefix,

View file

@ -911,7 +911,9 @@ impl AdminRpcHandler {
let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
let capacity = role.map(|x| x.capacity_string()).unwrap_or("?".into());
let capacity = role
.map(|x| x.capacity_string())
.unwrap_or_else(|| "?".into());
let avail_str = |x| match x {
Some((avail, total)) => {
let pct = (avail as f64) / (total as f64) * 100.;

View file

@ -1,12 +1,16 @@
use hyper::{Method, StatusCode};
use std::time::Duration;
use assert_json_diff::assert_json_eq;
use serde_json::json;
use super::json_body;
use crate::common;
#[tokio::test]
async fn test_poll() {
async fn test_poll_item() {
let ctx = common::context();
let bucket = ctx.create_bucket("test-k2v-poll");
let bucket = ctx.create_bucket("test-k2v-poll-item");
// Write initial value
let res = ctx
@ -96,3 +100,165 @@ async fn test_poll() {
.to_vec();
assert_eq!(poll_res_body, b"New value");
}
#[tokio::test]
async fn test_poll_range() {
let ctx = common::context();
let bucket = ctx.create_bucket("test-k2v-poll-range");
// Write initial value
let res = ctx
.k2v
.request
.builder(bucket.clone())
.method(Method::PUT)
.path("root")
.query_param("sort_key", Some("test1"))
.body(b"Initial value".to_vec())
.send()
.await
.unwrap();
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Retrieve initial value to get its causality token
let res2 = ctx
.k2v
.request
.builder(bucket.clone())
.path("root")
.query_param("sort_key", Some("test1"))
.signed_header("accept", "application/octet-stream")
.send()
.await
.unwrap();
assert_eq!(res2.status(), StatusCode::OK);
let ct = res2
.headers()
.get("x-garage-causality-token")
.unwrap()
.to_str()
.unwrap()
.to_string();
// Initial poll range, retrieve single item and first seen_marker
let res2 = ctx
.k2v
.request
.builder(bucket.clone())
.method(Method::POST)
.path("root")
.query_param("poll_range", None::<String>)
.body(b"{}".to_vec())
.send()
.await
.unwrap();
assert_eq!(res2.status(), StatusCode::OK);
let json_res = json_body(res2).await;
let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
assert_json_eq!(
json_res,
json!(
{
"items": [
{"sk": "test1", "ct": ct, "v": [base64::encode(b"Initial value")]},
],
"seenMarker": seen_marker,
}
)
);
// Second poll range, which will complete later
let poll = {
let bucket = bucket.clone();
tokio::spawn(async move {
let ctx = common::context();
ctx.k2v
.request
.builder(bucket.clone())
.method(Method::POST)
.path("root")
.query_param("poll_range", None::<String>)
.body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
.send()
.await
})
};
// Write new value that supersedes initial one
let res = ctx
.k2v
.request
.builder(bucket.clone())
.method(Method::PUT)
.path("root")
.query_param("sort_key", Some("test1"))
.signed_header("x-garage-causality-token", ct)
.body(b"New value".to_vec())
.send()
.await
.unwrap();
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Check poll finishes with correct value
let poll_res = tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
res = poll => res.unwrap().unwrap(),
};
assert_eq!(poll_res.status(), StatusCode::OK);
let json_res = json_body(poll_res).await;
let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
assert_json_eq!(&json_res["items"][0]["sk"], json!("test1"));
assert_json_eq!(
&json_res["items"][0]["v"],
json!([base64::encode(b"New value")])
);
// Now we will add a value on a different key
// Start a new poll operation
let poll = {
let bucket = bucket.clone();
tokio::spawn(async move {
let ctx = common::context();
ctx.k2v
.request
.builder(bucket.clone())
.method(Method::POST)
.path("root")
.query_param("poll_range", None::<String>)
.body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
.send()
.await
})
};
// Write value on different key
let res = ctx
.k2v
.request
.builder(bucket.clone())
.method(Method::PUT)
.path("root")
.query_param("sort_key", Some("test2"))
.body(b"Other value".to_vec())
.send()
.await
.unwrap();
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Check poll finishes with correct value
let poll_res = tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
res = poll => res.unwrap().unwrap(),
};
assert_eq!(poll_res.status(), StatusCode::OK);
let json_res = json_body(poll_res).await;
assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
assert_json_eq!(&json_res["items"][0]["sk"], json!("test2"));
assert_json_eq!(
&json_res["items"][0]["v"],
json!([base64::encode(b"Other value")])
);
}

View file

@ -1,6 +1,6 @@
[package]
name = "k2v-client"
version = "0.0.1"
version = "0.1.1"
authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@ -15,6 +15,7 @@ log = "0.4"
rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] }
rusoto_credential = "0.48.0"
rusoto_signature = "0.48.0"
hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12" ] }
serde = "1.0"
serde_json = "1.0"
thiserror = "1.0"

View file

@ -1,3 +1,5 @@
use std::collections::BTreeMap;
use std::process::exit;
use std::time::Duration;
use k2v_client::*;
@ -57,22 +59,39 @@ enum Command {
#[clap(flatten)]
output_kind: ReadOutputKind,
},
/// Watch changes on a single value
Poll {
/// Partition key to delete from
/// Watch changes on a single value
PollItem {
/// Partition key of item to watch
partition_key: String,
/// Sort key to delete from
/// Sort key of item to watch
sort_key: String,
/// Causality information
#[clap(short, long)]
causality: String,
/// Timeout, in seconds
#[clap(short, long)]
#[clap(short = 'T', long)]
timeout: Option<u64>,
/// Output formating
#[clap(flatten)]
output_kind: ReadOutputKind,
},
/// Watch changes on a range of values
PollRange {
/// Partition key to poll from
partition_key: String,
/// Output only sort keys matching this filter
#[clap(flatten)]
filter: Filter,
/// Marker of data that had previously been seen by a PollRange
#[clap(short = 'S', long)]
seen_marker: Option<String>,
/// Timeout, in seconds
#[clap(short = 'T', long)]
timeout: Option<u64>,
/// Output formating
#[clap(flatten)]
output_kind: BatchOutputKind,
},
/// Delete a single value
Delete {
/// Partition key to delete from
@ -176,7 +195,6 @@ struct ReadOutputKind {
impl ReadOutputKind {
fn display_output(&self, val: CausalValue) -> ! {
use std::io::Write;
use std::process::exit;
if self.json {
let stdout = std::io::stdout();
@ -254,6 +272,83 @@ struct BatchOutputKind {
json: bool,
}
impl BatchOutputKind {
fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! {
for (key, values) in values {
println!("key: {}", key);
let causality: String = values.causality.into();
println!("causality: {}", causality);
for value in values.value {
match value {
K2vValue::Value(v) => {
if let Ok(string) = std::str::from_utf8(&v) {
println!(" value(utf-8): {}", string);
} else {
println!(" value(base64): {}", base64::encode(&v));
}
}
K2vValue::Tombstone => {
println!(" tombstone");
}
}
}
}
exit(0);
}
fn values_json(&self, values: BTreeMap<String, CausalValue>) -> Vec<serde_json::Value> {
values
.into_iter()
.map(|(k, v)| {
let mut value = serde_json::to_value(v).unwrap();
value
.as_object_mut()
.unwrap()
.insert("sort_key".to_owned(), k.into());
value
})
.collect::<Vec<_>>()
}
fn display_poll_range_output(
&self,
seen_marker: String,
values: BTreeMap<String, CausalValue>,
) -> ! {
if self.json {
let json = serde_json::json!({
"values": self.values_json(values),
"seen_marker": seen_marker,
});
let stdout = std::io::stdout();
serde_json::to_writer_pretty(stdout, &json).unwrap();
exit(0)
} else {
println!("seen marker: {}", seen_marker);
self.display_human_output(values)
}
}
fn display_read_range_output(&self, res: PaginatedRange<CausalValue>) -> ! {
if self.json {
let json = serde_json::json!({
"next_key": res.next_start,
"values": self.values_json(res.items),
});
let stdout = std::io::stdout();
serde_json::to_writer_pretty(stdout, &json).unwrap();
exit(0)
} else {
if let Some(next) = res.next_start {
println!("next key: {}", next);
}
self.display_human_output(res.items)
}
}
}
/// Filter for batch operations
#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))]
@ -342,7 +437,7 @@ async fn main() -> Result<(), Error> {
let res = client.read_item(&partition_key, &sort_key).await?;
output_kind.display_output(res);
}
Command::Poll {
Command::PollItem {
partition_key,
sort_key,
causality,
@ -356,7 +451,54 @@ async fn main() -> Result<(), Error> {
if let Some(res) = res_opt {
output_kind.display_output(res);
} else {
println!("Delay expired and value didn't change.");
if output_kind.json {
println!("null");
} else {
println!("Delay expired and value didn't change.");
}
}
}
Command::PollRange {
partition_key,
filter,
seen_marker,
timeout,
output_kind,
} => {
if filter.conflicts_only
|| filter.tombstones
|| filter.reverse
|| filter.limit.is_some()
{
return Err(Error::Message(
"limit, reverse, conlicts-only, tombstones are invalid for poll-range".into(),
));
}
let timeout = timeout.map(Duration::from_secs);
let res = client
.poll_range(
&partition_key,
Some(PollRangeFilter {
start: filter.start.as_deref(),
end: filter.end.as_deref(),
prefix: filter.prefix.as_deref(),
}),
seen_marker.as_deref(),
timeout,
)
.await?;
match res {
Some((items, seen_marker)) => {
output_kind.display_poll_range_output(seen_marker, items);
}
None => {
if output_kind.json {
println!("null");
} else {
println!("Delay expired and value didn't change.");
}
}
}
}
Command::ReadIndex {
@ -419,50 +561,7 @@ async fn main() -> Result<(), Error> {
};
let mut res = client.read_batch(&[op]).await?;
let res = res.pop().unwrap();
if output_kind.json {
let values = res
.items
.into_iter()
.map(|(k, v)| {
let mut value = serde_json::to_value(v).unwrap();
value
.as_object_mut()
.unwrap()
.insert("sort_key".to_owned(), k.into());
value
})
.collect::<Vec<_>>();
let json = serde_json::json!({
"next_key": res.next_start,
"values": values,
});
let stdout = std::io::stdout();
serde_json::to_writer_pretty(stdout, &json).unwrap();
} else {
if let Some(next) = res.next_start {
println!("next key: {}", next);
}
for (key, values) in res.items {
println!("key: {}", key);
let causality: String = values.causality.into();
println!("causality: {}", causality);
for value in values.value {
match value {
K2vValue::Value(v) => {
if let Ok(string) = std::str::from_utf8(&v) {
println!(" value(utf-8): {}", string);
} else {
println!(" value(base64): {}", base64::encode(&v));
}
}
K2vValue::Tombstone => {
println!(" tombstone");
}
}
}
}
}
output_kind.display_read_range_output(res);
}
Command::DeleteRange {
partition_key,

View file

@ -40,7 +40,13 @@ impl K2vClient {
creds: AwsCredentials,
user_agent: Option<String>,
) -> Result<Self, Error> {
let mut client = HttpClient::new()?;
let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
.enable_http2()
.build();
let mut client = HttpClient::from_connector(connector);
if let Some(ua) = user_agent {
client.local_agent_prepend(ua);
} else {
@ -153,6 +159,58 @@ impl K2vClient {
}
}
/// Perform a PollRange request, waiting for any change in a given range of keys
/// to occur
pub async fn poll_range(
&self,
partition_key: &str,
filter: Option<PollRangeFilter<'_>>,
seen_marker: Option<&str>,
timeout: Option<Duration>,
) -> Result<Option<(BTreeMap<String, CausalValue>, String)>, Error> {
let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
let request = PollRangeRequest {
filter: filter.unwrap_or_default(),
seen_marker,
timeout: timeout.as_secs(),
};
let mut req = SignedRequest::new(
"POST",
SERVICE,
&self.region,
&format!("/{}/{}", self.bucket, partition_key),
);
req.add_param("poll_range", "");
let payload = serde_json::to_vec(&request)?;
req.set_payload(Some(payload));
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
if res.status == StatusCode::NOT_MODIFIED {
return Ok(None);
}
let resp: PollRangeResponse = serde_json::from_slice(&res.body)?;
let items = resp
.items
.into_iter()
.map(|BatchReadItem { sk, ct, v }| {
(
sk,
CausalValue {
causality: ct,
value: v,
},
)
})
.collect::<BTreeMap<_, _>>();
Ok(Some((items, resp.seen_marker)))
}
/// Perform an InsertItem request, inserting a value for a single pk+sk.
pub async fn insert_item(
&self,
@ -389,6 +447,12 @@ impl From<CausalityToken> for String {
}
}
impl AsRef<str> for CausalityToken {
fn as_ref(&self) -> &str {
&self.0
}
}
/// A value in K2V. can be either a binary value, or a tombstone.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum K2vValue {
@ -466,6 +530,29 @@ pub struct Filter<'a> {
pub reverse: bool,
}
#[derive(Debug, Default, Clone, Serialize)]
pub struct PollRangeFilter<'a> {
pub start: Option<&'a str>,
pub end: Option<&'a str>,
pub prefix: Option<&'a str>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct PollRangeRequest<'a> {
#[serde(flatten)]
filter: PollRangeFilter<'a>,
seen_marker: Option<&'a str>,
timeout: u64,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct PollRangeResponse {
items: Vec<BatchReadItem>,
seen_marker: String,
}
impl<'a> Filter<'a> {
fn insert_params(&self, req: &mut SignedRequest) {
if let Some(start) = &self.start {

View file

@ -27,7 +27,7 @@ use crate::index_counter::*;
use crate::key_table::*;
#[cfg(feature = "k2v")]
use crate::k2v::{item_table::*, poll::*, rpc::*};
use crate::k2v::{item_table::*, rpc::*, sub::*};
/// An entire Garage full of data
pub struct Garage {
@ -305,8 +305,10 @@ impl GarageK2V {
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
info!("Initialize K2V subscription manager...");
let subscriptions = Arc::new(SubscriptionManager::new());
info!("Initialize K2V item table...");
let item_table = Table::new(
K2VItemTable {
@ -317,7 +319,9 @@ impl GarageK2V {
system.clone(),
db,
);
let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions);
info!("Initialize K2V RPC handler...");
let rpc = K2VRpcHandler::new(system, db, item_table.clone(), subscriptions);
Self {
item_table,

View file

@ -1,3 +1,12 @@
//! Implements a CausalContext, which is a set of timestamps for each
//! node -- a vector clock --, indicating that the versions with
//! timestamps <= these numbers have been seen and can be
//! overwritten by a subsequent write.
//!
//! The textual representation of a CausalContext, which we call a
//! "causality token", is used in the API and must be sent along with
//! each write or delete operation to indicate the previously seen
//! versions that we want to overwrite or delete.
use base64::prelude::*;
use std::collections::BTreeMap;
@ -7,28 +16,44 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
use crate::helper::error::{Error as HelperError, OkOrBadRequest};
/// Node IDs used in K2V are u64 integers that are the abbreviation
/// of full Garage node IDs which are 256-bit UUIDs.
pub type K2VNodeId = u64;
pub type VectorClock = BTreeMap<K2VNodeId, u64>;
pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
let mut tmp = [0u8; 8];
tmp.copy_from_slice(&node_id.as_slice()[..8]);
u64::from_be_bytes(tmp)
}
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
pub fn vclock_gt(a: &VectorClock, b: &VectorClock) -> bool {
a.iter().any(|(n, ts)| ts > b.get(n).unwrap_or(&0))
}
pub fn vclock_max(a: &VectorClock, b: &VectorClock) -> VectorClock {
let mut ret = a.clone();
for (n, ts) in b.iter() {
let ent = ret.entry(*n).or_insert(0);
*ent = std::cmp::max(*ts, *ent);
}
ret
}
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Default)]
pub struct CausalContext {
pub vector_clock: BTreeMap<K2VNodeId, u64>,
pub vector_clock: VectorClock,
}
impl CausalContext {
/// Empty causality context
pub fn new_empty() -> Self {
Self {
vector_clock: BTreeMap::new(),
}
pub fn new() -> Self {
Self::default()
}
/// Make binary representation and encode in base64
pub fn serialize(&self) -> String {
let mut ints = Vec::with_capacity(2 * self.vector_clock.len());
@ -45,13 +70,13 @@ impl CausalContext {
BASE64_URL_SAFE_NO_PAD.encode(bytes)
}
/// Parse from base64-encoded binary representation
pub fn parse(s: &str) -> Result<Self, String> {
let bytes = BASE64_URL_SAFE_NO_PAD
.decode(s)
.map_err(|e| format!("bad causality token base64: {}", e))?;
/// Parse from base64-encoded binary representation.
/// Returns None on error.
pub fn parse(s: &str) -> Option<Self> {
let bytes = BASE64_URL_SAFE_NO_PAD.decode(s).ok()?;
if bytes.len() % 16 != 8 || bytes.len() < 8 {
return Err("bad causality token length".into());
return None;
}
let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
@ -68,16 +93,19 @@ impl CausalContext {
let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);
if check != checksum {
return Err("bad causality token checksum".into());
return None;
}
Ok(ret)
Some(ret)
}
pub fn parse_helper(s: &str) -> Result<Self, HelperError> {
Self::parse(s).ok_or_bad_request("Invalid causality token")
}
/// Check if this causal context contains newer items than another one
pub fn is_newer_than(&self, other: &Self) -> bool {
self.vector_clock
.iter()
.any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0))
vclock_gt(&self.vector_clock, &other.vector_clock)
}
}

View file

@ -11,7 +11,7 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
use crate::k2v::poll::*;
use crate::k2v::sub::*;
pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &str = "conflicts";
@ -73,7 +73,8 @@ impl K2VItem {
this_node: Uuid,
context: &Option<CausalContext>,
new_value: DvvsValue,
) {
node_ts: u64,
) -> u64 {
if let Some(context) = context {
for (node, t_discard) in context.vector_clock.iter() {
if let Some(e) = self.items.get_mut(node) {
@ -98,12 +99,14 @@ impl K2VItem {
values: vec![],
});
let t_prev = e.max_time();
e.values.push((t_prev + 1, new_value));
let t_new = std::cmp::max(t_prev + 1, node_ts + 1);
e.values.push((t_new, new_value));
t_new
}
/// Extract the causality context of a K2V Item
pub fn causal_context(&self) -> CausalContext {
let mut cc = CausalContext::new_empty();
let mut cc = CausalContext::new();
for (node, ent) in self.items.iter() {
cc.vector_clock.insert(*node, ent.max_time());
}

View file

@ -1,6 +1,8 @@
pub mod causality;
pub mod seen;
pub mod item_table;
pub mod poll;
pub mod rpc;
pub mod sub;

View file

@ -1,50 +0,0 @@
use std::collections::HashMap;
use std::sync::Mutex;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use crate::k2v::item_table::*;
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PollKey {
pub partition: K2VItemPartition,
pub sort_key: String,
}
#[derive(Default)]
pub struct SubscriptionManager {
subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
}
impl SubscriptionManager {
pub fn new() -> Self {
Self::default()
}
pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
let mut subs = self.subscriptions.lock().unwrap();
if let Some(s) = subs.get(key) {
s.subscribe()
} else {
let (tx, rx) = broadcast::channel(8);
subs.insert(key.clone(), tx);
rx
}
}
pub fn notify(&self, item: &K2VItem) {
let key = PollKey {
partition: item.partition.clone(),
sort_key: item.sort_key.clone(),
};
let mut subs = self.subscriptions.lock().unwrap();
if let Some(s) = subs.get(&key) {
if s.send(item.clone()).is_err() {
// no more subscribers, remove channel from here
// (we will re-create it later if we need to subscribe again)
subs.remove(&key);
}
}
}
}

View file

@ -5,9 +5,10 @@
//! node does not process the entry directly, as this would
//! mean the vector clock gets much larger than needed).
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::collections::{BTreeMap, HashMap};
use std::convert::TryInto;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
@ -15,9 +16,12 @@ use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::select;
use garage_db as db;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::now_msec;
use garage_rpc::system::System;
use garage_rpc::*;
@ -25,9 +29,15 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::{PartitionKey, Table};
use crate::helper::error::Error as HelperError;
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
use crate::k2v::poll::*;
use crate::k2v::seen::*;
use crate::k2v::sub::*;
const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200);
const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
/// RPC messages for K2V
#[derive(Debug, Serialize, Deserialize)]
@ -40,7 +50,13 @@ enum K2VRpc {
causal_context: CausalContext,
timeout_msec: u64,
},
PollRange {
range: PollRange,
seen_str: Option<String>,
timeout_msec: u64,
},
PollItemResponse(Option<K2VItem>),
PollRangeResponse(Uuid, Vec<K2VItem>),
}
#[derive(Debug, Serialize, Deserialize)]
@ -59,6 +75,12 @@ impl Rpc for K2VRpc {
pub struct K2VRpcHandler {
system: Arc<System>,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
// Using a mutex on the local_timestamp_tree is not strictly necessary,
// but it helps to not try to do several inserts at the same time,
// which would create transaction conflicts and force many useless retries.
local_timestamp_tree: Mutex<db::Tree>,
endpoint: Arc<Endpoint<K2VRpc, Self>>,
subscriptions: Arc<SubscriptionManager>,
}
@ -66,14 +88,19 @@ pub struct K2VRpcHandler {
impl K2VRpcHandler {
pub fn new(
system: Arc<System>,
db: &db::Db,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
subscriptions: Arc<SubscriptionManager>,
) -> Arc<Self> {
let local_timestamp_tree = db
.open_tree("k2v_local_timestamp")
.expect("Unable to open DB tree for k2v local timestamp");
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
let rpc_handler = Arc::new(Self {
system,
item_table,
local_timestamp_tree: Mutex::new(local_timestamp_tree),
endpoint,
subscriptions,
});
@ -181,7 +208,7 @@ impl K2VRpcHandler {
Ok(())
}
pub async fn poll(
pub async fn poll_item(
&self,
bucket_id: Uuid,
partition_key: String,
@ -230,9 +257,7 @@ impl K2VRpcHandler {
resp = Some(x);
}
}
K2VRpc::PollItemResponse(None) => {
return Ok(None);
}
K2VRpc::PollItemResponse(None) => (),
v => return Err(Error::unexpected_rpc_message(v)),
}
}
@ -240,10 +265,117 @@ impl K2VRpcHandler {
Ok(resp)
}
pub async fn poll_range(
&self,
range: PollRange,
seen_str: Option<String>,
timeout_msec: u64,
) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> {
let has_seen_marker = seen_str.is_some();
// Parse seen marker, we will use it below. This is also the first check
// that it is valid, which returns a bad request error if not.
let mut seen = seen_str
.as_deref()
.map(RangeSeenMarker::decode_helper)
.transpose()?
.unwrap_or_default();
seen.restrict(&range);
// Prepare PollRange RPC to send to the storage nodes responsible for the parititon
let nodes = self
.item_table
.data
.replication
.write_nodes(&range.partition.hash());
let quorum = self.item_table.data.replication.read_quorum();
let msg = K2VRpc::PollRange {
range,
seen_str,
timeout_msec,
};
// Send the request to all nodes, use FuturesUnordered to get the responses in any order
let msg = msg.into_req().map_err(netapp::error::Error::from)?;
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
let mut requests = nodes
.iter()
.map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
.collect::<FuturesUnordered<_>>();
// Fetch responses. This procedure stops fetching responses when any of the following
// conditions arise:
// - we have a response to all requests
// - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay
// has passed since the quorum was achieved
// - a global RPC timeout expired
// The extra delay after a quorum was received is usefull if the third response was to
// arrive during this short interval: this would allow us to consider all the data seen
// by that last node in the response we produce, and would likely help reduce the
// size of the seen marker that we will return (because we would have an info of the
// kind: all items produced by that node until time ts have been returned, so we can
// bump the entry in the global vector clock and possibly remove some item-specific
// vector clocks)
let mut deadline =
Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
let mut resps = vec![];
let mut errors = vec![];
loop {
select! {
_ = tokio::time::sleep_until(deadline.into()) => {
break;
}
res = requests.next() => match res {
None => break,
Some(Err(e)) => errors.push(e),
Some(Ok(r)) => {
resps.push(r);
if resps.len() >= quorum {
deadline = std::cmp::min(deadline, Instant::now() + POLL_RANGE_EXTRA_DELAY);
}
}
}
}
}
if errors.len() > nodes.len() - quorum {
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into());
}
// Take all returned items into account to produce the response.
let mut new_items = BTreeMap::<String, K2VItem>::new();
for v in resps {
if let K2VRpc::PollRangeResponse(node, items) = v {
seen.mark_seen_node_items(node, items.iter());
for item in items.into_iter() {
match new_items.get_mut(&item.sort_key) {
Some(ent) => {
ent.merge(&item);
}
None => {
new_items.insert(item.sort_key.clone(), item);
}
}
}
} else {
return Err(Error::unexpected_rpc_message(v).into());
}
}
if new_items.is_empty() && has_seen_marker {
Ok(None)
} else {
Ok(Some((new_items, seen.encode()?)))
}
}
// ---- internal handlers ----
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
let new = self.local_insert(item)?;
let new = {
let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
self.local_insert(&local_timestamp_tree, item)?
};
// Propagate to rest of network
if let Some(updated) = new {
@ -256,11 +388,14 @@ impl K2VRpcHandler {
async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
let mut updated_vec = vec![];
for item in items {
let new = self.local_insert(item)?;
{
let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
for item in items {
let new = self.local_insert(&local_timestamp_tree, item)?;
if let Some(updated) = new {
updated_vec.push(updated);
if let Some(updated) = new {
updated_vec.push(updated);
}
}
}
@ -272,10 +407,22 @@ impl K2VRpcHandler {
Ok(K2VRpc::Ok)
}
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
fn local_insert(
&self,
local_timestamp_tree: &MutexGuard<'_, db::Tree>,
item: &InsertedItem,
) -> Result<Option<K2VItem>, Error> {
let now = now_msec();
self.item_table
.data
.update_entry_with(&item.partition, &item.sort_key, |ent| {
.update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
let old_local_timestamp = tx
.get(&local_timestamp_tree, TIMESTAMP_KEY)?
.and_then(|x| x.try_into().ok())
.map(u64::from_be_bytes)
.unwrap_or_default();
let mut ent = ent.unwrap_or_else(|| {
K2VItem::new(
item.partition.bucket_id,
@ -283,13 +430,25 @@ impl K2VRpcHandler {
item.sort_key.clone(),
)
});
ent.update(self.system.id, &item.causal_context, item.value.clone());
ent
let new_local_timestamp = ent.update(
self.system.id,
&item.causal_context,
item.value.clone(),
std::cmp::max(old_local_timestamp, now),
);
tx.insert(
&local_timestamp_tree,
TIMESTAMP_KEY,
u64::to_be_bytes(new_local_timestamp),
)?;
Ok(ent)
})
}
async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
let mut chan = self.subscriptions.subscribe(key);
async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
let mut chan = self.subscriptions.subscribe_item(key);
let mut value = self
.item_table
@ -311,6 +470,71 @@ impl K2VRpcHandler {
Ok(value)
}
async fn handle_poll_range(
&self,
range: &PollRange,
seen_str: &Option<String>,
) -> Result<Vec<K2VItem>, Error> {
if let Some(seen_str) = seen_str {
let seen = RangeSeenMarker::decode(seen_str).ok_or_message("Invalid seenMarker")?;
// Subscribe now to all changes on that partition,
// so that new items that are inserted while we are reading the range
// will be seen in the loop below
let mut chan = self.subscriptions.subscribe_partition(&range.partition);
// Check for the presence of any new items already stored in the item table
let mut new_items = self.poll_range_read_range(range, &seen)?;
// If we found no new items, wait for a matching item to arrive
// on the channel
while new_items.is_empty() {
let item = chan.recv().await?;
if range.matches(&item) && seen.is_new_item(&item) {
new_items.push(item);
}
}
Ok(new_items)
} else {
// If no seen marker was specified, we do not poll for anything.
// We return immediately with the set of known items (even if
// it is empty), which will give the client an inital view of
// the dataset and an initial seen marker for further
// PollRange calls.
self.poll_range_read_range(range, &RangeSeenMarker::default())
}
}
fn poll_range_read_range(
&self,
range: &PollRange,
seen: &RangeSeenMarker,
) -> Result<Vec<K2VItem>, Error> {
let mut new_items = vec![];
let partition_hash = range.partition.hash();
let first_key = match &range.start {
None => partition_hash.to_vec(),
Some(sk) => self.item_table.data.tree_key(&range.partition, sk),
};
for item in self.item_table.data.store.range(first_key..)? {
let (key, value) = item?;
if &key[..32] != partition_hash.as_slice() {
break;
}
let item = self.item_table.data.decode_entry(&value)?;
if !range.matches(&item) {
break;
}
if seen.is_new_item(&item) {
new_items.push(item);
}
}
Ok(new_items)
}
}
#[async_trait]
@ -326,10 +550,21 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
} => {
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
select! {
ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
}
}
K2VRpc::PollRange {
range,
seen_str,
timeout_msec,
} => {
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
select! {
ret = self.handle_poll_range(range, seen_str) => ret.map(|items| K2VRpc::PollRangeResponse(self.system.id, items)),
_ = delay => Ok(K2VRpc::PollRangeResponse(self.system.id, vec![])),
}
}
m => Err(Error::unexpected_rpc_message(m)),
}
}

105
src/model/k2v/seen.rs Normal file
View file

@ -0,0 +1,105 @@
//! Implements a RangeSeenMarker, a data type used in the PollRange API
//! to indicate which items in the range have already been seen
//! and which have not been seen yet.
//!
//! It consists of a vector clock that indicates that for each node,
//! all items produced by that node with timestamps <= the value in the
//! vector clock has been seen, as well as a set of causal contexts for
//! individual items.
use std::collections::BTreeMap;
use base64::prelude::*;
use serde::{Deserialize, Serialize};
use garage_util::data::Uuid;
use garage_util::encode::{nonversioned_decode, nonversioned_encode};
use garage_util::error::Error;
use crate::helper::error::{Error as HelperError, OkOrBadRequest};
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
use crate::k2v::sub::*;
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct RangeSeenMarker {
vector_clock: VectorClock,
items: BTreeMap<String, VectorClock>,
}
impl RangeSeenMarker {
pub fn new() -> Self {
Self::default()
}
pub fn restrict(&mut self, range: &PollRange) {
if let Some(start) = &range.start {
self.items = self.items.split_off(start);
}
if let Some(end) = &range.end {
self.items.split_off(end);
}
if let Some(pfx) = &range.prefix {
self.items.retain(|k, _v| k.starts_with(pfx));
}
}
pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>(
&mut self,
node: Uuid,
items: I,
) {
let node = make_node_id(node);
for item in items.into_iter() {
let cc = item.causal_context();
if let Some(ts) = cc.vector_clock.get(&node) {
let ent = self.vector_clock.entry(node).or_insert(0);
*ent = std::cmp::max(*ent, *ts);
}
if vclock_gt(&cc.vector_clock, &self.vector_clock) {
match self.items.get_mut(&item.sort_key) {
None => {
self.items.insert(item.sort_key.clone(), cc.vector_clock);
}
Some(ent) => *ent = vclock_max(&ent, &cc.vector_clock),
}
}
}
}
pub fn canonicalize(&mut self) {
let self_vc = &self.vector_clock;
self.items.retain(|_sk, vc| vclock_gt(&vc, self_vc))
}
pub fn encode(&mut self) -> Result<String, Error> {
self.canonicalize();
let bytes = nonversioned_encode(&self)?;
let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?;
Ok(BASE64_STANDARD.encode(&bytes))
}
/// Decode from msgpack+zstd+b64 representation, returns None on error.
pub fn decode(s: &str) -> Option<Self> {
let bytes = BASE64_STANDARD.decode(&s).ok()?;
let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?;
nonversioned_decode(&bytes).ok()
}
pub fn decode_helper(s: &str) -> Result<Self, HelperError> {
Self::decode(s).ok_or_bad_request("Invalid causality token")
}
pub fn is_new_item(&self, item: &K2VItem) -> bool {
let cc = item.causal_context();
vclock_gt(&cc.vector_clock, &self.vector_clock)
&& self
.items
.get(&item.sort_key)
.map(|vc| vclock_gt(&cc.vector_clock, &vc))
.unwrap_or(true)
}
}

110
src/model/k2v/sub.rs Normal file
View file

@ -0,0 +1,110 @@
use std::collections::HashMap;
use std::sync::Mutex;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use crate::k2v::item_table::*;
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PollKey {
pub partition: K2VItemPartition,
pub sort_key: String,
}
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PollRange {
pub partition: K2VItemPartition,
pub prefix: Option<String>,
pub start: Option<String>,
pub end: Option<String>,
}
#[derive(Default)]
pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>);
#[derive(Default)]
pub struct SubscriptionManagerInner {
item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>,
part_subscriptions: HashMap<K2VItemPartition, broadcast::Sender<K2VItem>>,
}
impl SubscriptionManager {
pub fn new() -> Self {
Self::default()
}
pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
let mut inner = self.0.lock().unwrap();
if let Some(s) = inner.item_subscriptions.get(key) {
s.subscribe()
} else {
let (tx, rx) = broadcast::channel(8);
inner.item_subscriptions.insert(key.clone(), tx);
rx
}
}
pub(crate) fn subscribe_partition(
&self,
part: &K2VItemPartition,
) -> broadcast::Receiver<K2VItem> {
let mut inner = self.0.lock().unwrap();
if let Some(s) = inner.part_subscriptions.get(part) {
s.subscribe()
} else {
let (tx, rx) = broadcast::channel(8);
inner.part_subscriptions.insert(part.clone(), tx);
rx
}
}
pub(crate) fn notify(&self, item: &K2VItem) {
let mut inner = self.0.lock().unwrap();
// 1. Notify single item subscribers,
// removing subscriptions with no more listeners if any
let key = PollKey {
partition: item.partition.clone(),
sort_key: item.sort_key.clone(),
};
if let Some(s) = inner.item_subscriptions.get(&key) {
if s.send(item.clone()).is_err() {
// no more subscribers, remove channel from here
// (we will re-create it later if we need to subscribe again)
inner.item_subscriptions.remove(&key);
}
}
// 2. Notify partition subscribers,
// removing subscriptions with no more listeners if any
if let Some(s) = inner.part_subscriptions.get(&item.partition) {
if s.send(item.clone()).is_err() {
// no more subscribers, remove channel from here
// (we will re-create it later if we need to subscribe again)
inner.part_subscriptions.remove(&item.partition);
}
}
}
}
impl PollRange {
pub fn matches(&self, item: &K2VItem) -> bool {
item.partition == self.partition
&& self
.prefix
.as_ref()
.map(|x| item.sort_key.starts_with(x))
.unwrap_or(true)
&& self
.start
.as_ref()
.map(|x| item.sort_key >= *x)
.unwrap_or(true)
&& self
.end
.as_ref()
.map(|x| item.sort_key < *x)
.unwrap_or(true)
}
}

View file

@ -15,10 +15,9 @@ use opentelemetry::{
};
pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
use netapp::message::IntoReq;
pub use netapp::message::{
Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL,
PRIO_SECONDARY,
IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH,
PRIO_NORMAL, PRIO_SECONDARY,
};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::{self, NetApp, NodeID};

View file

@ -181,13 +181,17 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
let update = self.decode_entry(update_bytes)?;
self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent {
Some(mut ent) => {
ent.merge(&update);
ent
}
None => update.clone(),
})?;
self.update_entry_with(
update.partition_key(),
update.sort_key(),
|_tx, ent| match ent {
Some(mut ent) => {
ent.merge(&update);
Ok(ent)
}
None => Ok(update.clone()),
},
)?;
Ok(())
}
@ -195,7 +199,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
&self,
partition_key: &F::P,
sort_key: &F::S,
f: impl Fn(Option<F::E>) -> F::E,
update_fn: impl Fn(&mut db::Transaction, Option<F::E>) -> db::TxOpResult<F::E>,
) -> Result<Option<F::E>, Error> {
let tree_key = self.tree_key(partition_key, sort_key);
@ -203,10 +207,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
let new_entry = f(Some(old_entry.clone()));
let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
(Some(old_entry), Some(old_bytes), new_entry)
}
None => (None, None, f(None)),
None => (None, None, update_fn(&mut tx, None)?),
};
// Changed can be true in two scenarios