forked from Deuxfleurs/garage
Merge pull request 'K2V PollRange, version 2' (#471) from k2v-watch-range-2 into main
Reviewed-on: Deuxfleurs/garage#471
This commit is contained in:
commit
246f7468cd
23 changed files with 1125 additions and 198 deletions
3
Cargo.lock
generated
3
Cargo.lock
generated
|
@ -1828,12 +1828,13 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "k2v-client"
|
||||
version = "0.0.1"
|
||||
version = "0.1.1"
|
||||
dependencies = [
|
||||
"base64 0.21.0",
|
||||
"clap 4.1.3",
|
||||
"garage_util",
|
||||
"http",
|
||||
"hyper-rustls 0.23.2",
|
||||
"log",
|
||||
"rusoto_core",
|
||||
"rusoto_credential",
|
||||
|
|
|
@ -32,7 +32,7 @@ args@{
|
|||
ignoreLockHash,
|
||||
}:
|
||||
let
|
||||
nixifiedLockHash = "cf836c01a9c668bab5f9a09d468f47aa24c50abec92855503624d706721335ef";
|
||||
nixifiedLockHash = "8681b26e2f98ae495ad3ac0d6df1241a6bc0f06bd5fa7ae4c08d8cc13b5b88c5";
|
||||
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
||||
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
||||
lockHashIgnored = if ignoreLockHash
|
||||
|
@ -65,7 +65,7 @@ in
|
|||
garage_api = rustPackages.unknown.garage_api."0.8.1";
|
||||
garage_web = rustPackages.unknown.garage_web."0.8.1";
|
||||
garage = rustPackages.unknown.garage."0.8.1";
|
||||
k2v-client = rustPackages.unknown.k2v-client."0.0.1";
|
||||
k2v-client = rustPackages.unknown.k2v-client."0.1.1";
|
||||
};
|
||||
"registry+https://github.com/rust-lang/crates.io-index".addr2line."0.19.0" = overridableMkRustCrate (profileName: rec {
|
||||
name = "addr2line";
|
||||
|
@ -2539,9 +2539,9 @@ in
|
|||
};
|
||||
});
|
||||
|
||||
"unknown".k2v-client."0.0.1" = overridableMkRustCrate (profileName: rec {
|
||||
"unknown".k2v-client."0.1.1" = overridableMkRustCrate (profileName: rec {
|
||||
name = "k2v-client";
|
||||
version = "0.0.1";
|
||||
version = "0.1.1";
|
||||
registry = "unknown";
|
||||
src = fetchCrateLocal (workspaceSrc + "/src/k2v-client");
|
||||
features = builtins.concatLists [
|
||||
|
@ -2554,6 +2554,7 @@ in
|
|||
${ if rootFeatures' ? "k2v-client/clap" || rootFeatures' ? "k2v-client/cli" then "clap" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".clap."4.1.3" { inherit profileName; }).out;
|
||||
${ if rootFeatures' ? "k2v-client/cli" || rootFeatures' ? "k2v-client/garage_util" then "garage_util" else null } = (rustPackages."unknown".garage_util."0.8.1" { inherit profileName; }).out;
|
||||
http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.8" { inherit profileName; }).out;
|
||||
hyper_rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.23.2" { inherit profileName; }).out;
|
||||
log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.17" { inherit profileName; }).out;
|
||||
rusoto_core = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_core."0.48.0" { inherit profileName; }).out;
|
||||
rusoto_credential = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_credential."0.48.0" { inherit profileName; }).out;
|
||||
|
|
|
@ -706,6 +706,73 @@ HTTP/1.1 200 OK
|
|||
]
|
||||
```
|
||||
|
||||
**PollRange: `POST /<bucket>/<partition key>?poll_range`**, or alternatively<br/>
|
||||
**PollRange: `SEARCH /<bucket>/<partition key>?poll_range`**
|
||||
|
||||
Polls a range of items for changes.
|
||||
|
||||
The query body is a JSON object consisting of the following fields:
|
||||
|
||||
| name | default value | meaning |
|
||||
|-----------------|---------------|----------------------------------------------------------------------------------------|
|
||||
| `prefix` | `null` | Restrict items to poll to those whose sort keys start with this prefix |
|
||||
| `start` | `null` | The sort key of the first item to poll |
|
||||
| `end` | `null` | The sort key of the last item to poll (excluded) |
|
||||
| `timeout` | 300 | The timeout before 304 NOT MODIFIED is returned if no value in the range is updated |
|
||||
| `seenMarker` | `null` | An opaque string returned by a previous PollRange call, that represents items already seen |
|
||||
|
||||
The timeout can be set to any number of seconds, with a maximum of 600 seconds (10 minutes).
|
||||
|
||||
The response is either:
|
||||
|
||||
- A HTTP 304 NOT MODIFIED response with an empty body, if the timeout expired and no changes occurred
|
||||
|
||||
- A HTTP 200 response, indicating that some changes have occurred since the last PollRange call, in which case a JSON object is returned in the body with the following fields:
|
||||
|
||||
| name | meaning |
|
||||
|-----------------|----------------------------------------------------------------------------------------|
|
||||
| `seenMarker` | An opaque string that represents items already seen for future PollRange calls |
|
||||
| `items` | The list of items that have changed since last PollRange call, in the same format as ReadBatch |
|
||||
|
||||
If no seen marker is known by the caller, it can do a PollRange call
|
||||
without specifying `seenMarker`. In this case, the PollRange call will
|
||||
complete immediately, and return the current content of the range (which
|
||||
can be empty) and a seen marker to be used in further PollRange calls. This
|
||||
is the only case in which PollRange might return an HTTP 200 with an empty
|
||||
set of items.
|
||||
|
||||
A seen marker returned as a response to a PollRange query can be used for further PollRange
|
||||
queries on the same range, or for PollRange queries in a subrange of the initial range.
|
||||
It may not be used for PollRange queries on ranges larger or outside of the initial range.
|
||||
|
||||
Example query:
|
||||
|
||||
```json
|
||||
SEARCH /my_bucket?poll_range HTTP/1.1
|
||||
|
||||
{
|
||||
"prefix": "0391.",
|
||||
"start": "0391.000001973107",
|
||||
"seenMarker": "opaquestring123",
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
Example response:
|
||||
|
||||
```json
|
||||
HTTP/1.1 200 OK
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"seenMarker": "opaquestring456",
|
||||
"items": [
|
||||
{ sk: "0391.000001973221", ct: "opaquetoken123", v: ["b64cryptoblob123", "b64cryptoblob'123"] },
|
||||
{ sk: "0391.000001974191", ct: "opaquetoken456", v: ["b64cryptoblob456", "b64cryptoblob'456"] },
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## Internals: causality tokens
|
||||
|
||||
|
|
|
@ -164,6 +164,9 @@ impl ApiHandler for K2VApiServer {
|
|||
Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
|
||||
Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
|
||||
Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
|
||||
Endpoint::PollRange { partition_key } => {
|
||||
handle_poll_range(garage, bucket_id, &partition_key, req).await
|
||||
}
|
||||
Endpoint::Options => unreachable!(),
|
||||
};
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@ use hyper::{Body, Request, Response, StatusCode};
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
||||
use garage_table::{EnumerationOrder, TableSchema};
|
||||
|
||||
|
@ -26,11 +25,7 @@ pub async fn handle_insert_batch(
|
|||
|
||||
let mut items2 = vec![];
|
||||
for it in items {
|
||||
let ct = it
|
||||
.ct
|
||||
.map(|s| CausalContext::parse(&s))
|
||||
.transpose()
|
||||
.ok_or_bad_request("Invalid causality token")?;
|
||||
let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?;
|
||||
let v = match it.v {
|
||||
Some(vs) => DvvsValue::Value(
|
||||
BASE64_STANDARD
|
||||
|
@ -68,10 +63,7 @@ pub async fn handle_read_batch(
|
|||
resps.push(resp?);
|
||||
}
|
||||
|
||||
let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.body(Body::from(resp_json))?)
|
||||
Ok(json_ok_response(&resps)?)
|
||||
}
|
||||
|
||||
async fn handle_read_batch_query(
|
||||
|
@ -163,10 +155,7 @@ pub async fn handle_delete_batch(
|
|||
resps.push(resp?);
|
||||
}
|
||||
|
||||
let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.body(Body::from(resp_json))?)
|
||||
Ok(json_ok_response(&resps)?)
|
||||
}
|
||||
|
||||
async fn handle_delete_batch_query(
|
||||
|
@ -260,6 +249,53 @@ async fn handle_delete_batch_query(
|
|||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_poll_range(
|
||||
garage: Arc<Garage>,
|
||||
bucket_id: Uuid,
|
||||
partition_key: &str,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
use garage_model::k2v::sub::PollRange;
|
||||
|
||||
let query = parse_json_body::<PollRangeQuery>(req).await?;
|
||||
|
||||
let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;
|
||||
|
||||
let resp = garage
|
||||
.k2v
|
||||
.rpc
|
||||
.poll_range(
|
||||
PollRange {
|
||||
partition: K2VItemPartition {
|
||||
bucket_id,
|
||||
partition_key: partition_key.to_string(),
|
||||
},
|
||||
start: query.start,
|
||||
end: query.end,
|
||||
prefix: query.prefix,
|
||||
},
|
||||
query.seen_marker,
|
||||
timeout_msec,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some((items, seen_marker)) = resp {
|
||||
let resp = PollRangeResponse {
|
||||
items: items
|
||||
.into_iter()
|
||||
.map(|(_k, i)| ReadBatchResponseItem::from(i))
|
||||
.collect::<Vec<_>>(),
|
||||
seen_marker,
|
||||
};
|
||||
|
||||
Ok(json_ok_response(&resp)?)
|
||||
} else {
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NOT_MODIFIED)
|
||||
.body(Body::empty())?)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct InsertBatchItem {
|
||||
pk: String,
|
||||
|
@ -364,3 +400,24 @@ struct DeleteBatchResponse {
|
|||
#[serde(rename = "deletedItems")]
|
||||
deleted_items: usize,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct PollRangeQuery {
|
||||
#[serde(default)]
|
||||
prefix: Option<String>,
|
||||
#[serde(default)]
|
||||
start: Option<String>,
|
||||
#[serde(default)]
|
||||
end: Option<String>,
|
||||
#[serde(default)]
|
||||
timeout: Option<u64>,
|
||||
#[serde(default, rename = "seenMarker")]
|
||||
seen_marker: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct PollRangeResponse {
|
||||
items: Vec<ReadBatchResponseItem>,
|
||||
#[serde(rename = "seenMarker")]
|
||||
seen_marker: String,
|
||||
}
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use hyper::{Body, Response, StatusCode};
|
||||
use hyper::{Body, Response};
|
||||
use serde::Serialize;
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
||||
use garage_rpc::ring::Ring;
|
||||
use garage_table::util::*;
|
||||
|
@ -12,6 +11,7 @@ use garage_table::util::*;
|
|||
use garage_model::garage::Garage;
|
||||
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
|
||||
|
||||
use crate::helpers::*;
|
||||
use crate::k2v::error::*;
|
||||
use crate::k2v::range::read_range;
|
||||
|
||||
|
@ -68,10 +68,7 @@ pub async fn handle_read_index(
|
|||
next_start,
|
||||
};
|
||||
|
||||
let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?;
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.body(Body::from(resp_json))?)
|
||||
Ok(json_ok_response(&resp)?)
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
|
|
@ -134,9 +134,8 @@ pub async fn handle_insert_item(
|
|||
.get(X_GARAGE_CAUSALITY_TOKEN)
|
||||
.map(|s| s.to_str())
|
||||
.transpose()?
|
||||
.map(CausalContext::parse)
|
||||
.transpose()
|
||||
.ok_or_bad_request("Invalid causality token")?;
|
||||
.map(CausalContext::parse_helper)
|
||||
.transpose()?;
|
||||
|
||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||
let value = DvvsValue::Value(body.to_vec());
|
||||
|
@ -170,9 +169,8 @@ pub async fn handle_delete_item(
|
|||
.get(X_GARAGE_CAUSALITY_TOKEN)
|
||||
.map(|s| s.to_str())
|
||||
.transpose()?
|
||||
.map(CausalContext::parse)
|
||||
.transpose()
|
||||
.ok_or_bad_request("Invalid causality token")?;
|
||||
.map(CausalContext::parse_helper)
|
||||
.transpose()?;
|
||||
|
||||
let value = DvvsValue::Deleted;
|
||||
|
||||
|
@ -209,15 +207,17 @@ pub async fn handle_poll_item(
|
|||
let causal_context =
|
||||
CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?;
|
||||
|
||||
let timeout_msec = timeout_secs.unwrap_or(300).clamp(1, 600) * 1000;
|
||||
|
||||
let item = garage
|
||||
.k2v
|
||||
.rpc
|
||||
.poll(
|
||||
.poll_item(
|
||||
bucket_id,
|
||||
partition_key,
|
||||
sort_key,
|
||||
causal_context,
|
||||
timeout_secs.unwrap_or(300) * 1000,
|
||||
timeout_msec,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
|
@ -32,6 +32,9 @@ pub enum Endpoint {
|
|||
causality_token: String,
|
||||
timeout: Option<u64>,
|
||||
},
|
||||
PollRange {
|
||||
partition_key: String,
|
||||
},
|
||||
ReadBatch {
|
||||
},
|
||||
ReadIndex {
|
||||
|
@ -113,6 +116,7 @@ impl Endpoint {
|
|||
@gen_parser
|
||||
(query.keyword.take().unwrap_or_default(), partition_key, query, None),
|
||||
key: [
|
||||
POLL_RANGE => PollRange,
|
||||
],
|
||||
no_key: [
|
||||
EMPTY => ReadBatch,
|
||||
|
@ -142,6 +146,7 @@ impl Endpoint {
|
|||
@gen_parser
|
||||
(query.keyword.take().unwrap_or_default(), partition_key, query, None),
|
||||
key: [
|
||||
POLL_RANGE => PollRange,
|
||||
],
|
||||
no_key: [
|
||||
EMPTY => InsertBatch,
|
||||
|
@ -234,7 +239,8 @@ impl Endpoint {
|
|||
generateQueryParameters! {
|
||||
keywords: [
|
||||
"delete" => DELETE,
|
||||
"search" => SEARCH
|
||||
"search" => SEARCH,
|
||||
"poll_range" => POLL_RANGE
|
||||
],
|
||||
fields: [
|
||||
"prefix" => prefix,
|
||||
|
|
|
@ -911,7 +911,9 @@ impl AdminRpcHandler {
|
|||
let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
|
||||
let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
|
||||
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
|
||||
let capacity = role.map(|x| x.capacity_string()).unwrap_or("?".into());
|
||||
let capacity = role
|
||||
.map(|x| x.capacity_string())
|
||||
.unwrap_or_else(|| "?".into());
|
||||
let avail_str = |x| match x {
|
||||
Some((avail, total)) => {
|
||||
let pct = (avail as f64) / (total as f64) * 100.;
|
||||
|
|
|
@ -1,12 +1,16 @@
|
|||
use hyper::{Method, StatusCode};
|
||||
use std::time::Duration;
|
||||
|
||||
use assert_json_diff::assert_json_eq;
|
||||
use serde_json::json;
|
||||
|
||||
use super::json_body;
|
||||
use crate::common;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_poll() {
|
||||
async fn test_poll_item() {
|
||||
let ctx = common::context();
|
||||
let bucket = ctx.create_bucket("test-k2v-poll");
|
||||
let bucket = ctx.create_bucket("test-k2v-poll-item");
|
||||
|
||||
// Write initial value
|
||||
let res = ctx
|
||||
|
@ -96,3 +100,165 @@ async fn test_poll() {
|
|||
.to_vec();
|
||||
assert_eq!(poll_res_body, b"New value");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_poll_range() {
|
||||
let ctx = common::context();
|
||||
let bucket = ctx.create_bucket("test-k2v-poll-range");
|
||||
|
||||
// Write initial value
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.method(Method::PUT)
|
||||
.path("root")
|
||||
.query_param("sort_key", Some("test1"))
|
||||
.body(b"Initial value".to_vec())
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Retrieve initial value to get its causality token
|
||||
let res2 = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.path("root")
|
||||
.query_param("sort_key", Some("test1"))
|
||||
.signed_header("accept", "application/octet-stream")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res2.status(), StatusCode::OK);
|
||||
let ct = res2
|
||||
.headers()
|
||||
.get("x-garage-causality-token")
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
// Initial poll range, retrieve single item and first seen_marker
|
||||
let res2 = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.method(Method::POST)
|
||||
.path("root")
|
||||
.query_param("poll_range", None::<String>)
|
||||
.body(b"{}".to_vec())
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res2.status(), StatusCode::OK);
|
||||
let json_res = json_body(res2).await;
|
||||
let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
|
||||
assert_json_eq!(
|
||||
json_res,
|
||||
json!(
|
||||
{
|
||||
"items": [
|
||||
{"sk": "test1", "ct": ct, "v": [base64::encode(b"Initial value")]},
|
||||
],
|
||||
"seenMarker": seen_marker,
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
// Second poll range, which will complete later
|
||||
let poll = {
|
||||
let bucket = bucket.clone();
|
||||
tokio::spawn(async move {
|
||||
let ctx = common::context();
|
||||
ctx.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.method(Method::POST)
|
||||
.path("root")
|
||||
.query_param("poll_range", None::<String>)
|
||||
.body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
|
||||
.send()
|
||||
.await
|
||||
})
|
||||
};
|
||||
|
||||
// Write new value that supersedes initial one
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.method(Method::PUT)
|
||||
.path("root")
|
||||
.query_param("sort_key", Some("test1"))
|
||||
.signed_header("x-garage-causality-token", ct)
|
||||
.body(b"New value".to_vec())
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Check poll finishes with correct value
|
||||
let poll_res = tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
|
||||
res = poll => res.unwrap().unwrap(),
|
||||
};
|
||||
|
||||
assert_eq!(poll_res.status(), StatusCode::OK);
|
||||
let json_res = json_body(poll_res).await;
|
||||
let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
|
||||
assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
|
||||
assert_json_eq!(&json_res["items"][0]["sk"], json!("test1"));
|
||||
assert_json_eq!(
|
||||
&json_res["items"][0]["v"],
|
||||
json!([base64::encode(b"New value")])
|
||||
);
|
||||
|
||||
// Now we will add a value on a different key
|
||||
// Start a new poll operation
|
||||
let poll = {
|
||||
let bucket = bucket.clone();
|
||||
tokio::spawn(async move {
|
||||
let ctx = common::context();
|
||||
ctx.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.method(Method::POST)
|
||||
.path("root")
|
||||
.query_param("poll_range", None::<String>)
|
||||
.body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
|
||||
.send()
|
||||
.await
|
||||
})
|
||||
};
|
||||
|
||||
// Write value on different key
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.method(Method::PUT)
|
||||
.path("root")
|
||||
.query_param("sort_key", Some("test2"))
|
||||
.body(b"Other value".to_vec())
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Check poll finishes with correct value
|
||||
let poll_res = tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
|
||||
res = poll => res.unwrap().unwrap(),
|
||||
};
|
||||
|
||||
assert_eq!(poll_res.status(), StatusCode::OK);
|
||||
let json_res = json_body(poll_res).await;
|
||||
assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
|
||||
assert_json_eq!(&json_res["items"][0]["sk"], json!("test2"));
|
||||
assert_json_eq!(
|
||||
&json_res["items"][0]["v"],
|
||||
json!([base64::encode(b"Other value")])
|
||||
);
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "k2v-client"
|
||||
version = "0.0.1"
|
||||
version = "0.1.1"
|
||||
authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"]
|
||||
edition = "2018"
|
||||
license = "AGPL-3.0"
|
||||
|
@ -15,6 +15,7 @@ log = "0.4"
|
|||
rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] }
|
||||
rusoto_credential = "0.48.0"
|
||||
rusoto_signature = "0.48.0"
|
||||
hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12" ] }
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
thiserror = "1.0"
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::process::exit;
|
||||
use std::time::Duration;
|
||||
|
||||
use k2v_client::*;
|
||||
|
@ -57,22 +59,39 @@ enum Command {
|
|||
#[clap(flatten)]
|
||||
output_kind: ReadOutputKind,
|
||||
},
|
||||
/// Watch changes on a single value
|
||||
Poll {
|
||||
/// Partition key to delete from
|
||||
/// Watch changes on a single value
|
||||
PollItem {
|
||||
/// Partition key of item to watch
|
||||
partition_key: String,
|
||||
/// Sort key to delete from
|
||||
/// Sort key of item to watch
|
||||
sort_key: String,
|
||||
/// Causality information
|
||||
#[clap(short, long)]
|
||||
causality: String,
|
||||
/// Timeout, in seconds
|
||||
#[clap(short, long)]
|
||||
#[clap(short = 'T', long)]
|
||||
timeout: Option<u64>,
|
||||
/// Output formating
|
||||
#[clap(flatten)]
|
||||
output_kind: ReadOutputKind,
|
||||
},
|
||||
/// Watch changes on a range of values
|
||||
PollRange {
|
||||
/// Partition key to poll from
|
||||
partition_key: String,
|
||||
/// Output only sort keys matching this filter
|
||||
#[clap(flatten)]
|
||||
filter: Filter,
|
||||
/// Marker of data that had previously been seen by a PollRange
|
||||
#[clap(short = 'S', long)]
|
||||
seen_marker: Option<String>,
|
||||
/// Timeout, in seconds
|
||||
#[clap(short = 'T', long)]
|
||||
timeout: Option<u64>,
|
||||
/// Output formating
|
||||
#[clap(flatten)]
|
||||
output_kind: BatchOutputKind,
|
||||
},
|
||||
/// Delete a single value
|
||||
Delete {
|
||||
/// Partition key to delete from
|
||||
|
@ -176,7 +195,6 @@ struct ReadOutputKind {
|
|||
impl ReadOutputKind {
|
||||
fn display_output(&self, val: CausalValue) -> ! {
|
||||
use std::io::Write;
|
||||
use std::process::exit;
|
||||
|
||||
if self.json {
|
||||
let stdout = std::io::stdout();
|
||||
|
@ -254,6 +272,83 @@ struct BatchOutputKind {
|
|||
json: bool,
|
||||
}
|
||||
|
||||
impl BatchOutputKind {
|
||||
fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! {
|
||||
for (key, values) in values {
|
||||
println!("key: {}", key);
|
||||
let causality: String = values.causality.into();
|
||||
println!("causality: {}", causality);
|
||||
for value in values.value {
|
||||
match value {
|
||||
K2vValue::Value(v) => {
|
||||
if let Ok(string) = std::str::from_utf8(&v) {
|
||||
println!(" value(utf-8): {}", string);
|
||||
} else {
|
||||
println!(" value(base64): {}", base64::encode(&v));
|
||||
}
|
||||
}
|
||||
K2vValue::Tombstone => {
|
||||
println!(" tombstone");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
exit(0);
|
||||
}
|
||||
|
||||
fn values_json(&self, values: BTreeMap<String, CausalValue>) -> Vec<serde_json::Value> {
|
||||
values
|
||||
.into_iter()
|
||||
.map(|(k, v)| {
|
||||
let mut value = serde_json::to_value(v).unwrap();
|
||||
value
|
||||
.as_object_mut()
|
||||
.unwrap()
|
||||
.insert("sort_key".to_owned(), k.into());
|
||||
value
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
fn display_poll_range_output(
|
||||
&self,
|
||||
seen_marker: String,
|
||||
values: BTreeMap<String, CausalValue>,
|
||||
) -> ! {
|
||||
if self.json {
|
||||
let json = serde_json::json!({
|
||||
"values": self.values_json(values),
|
||||
"seen_marker": seen_marker,
|
||||
});
|
||||
|
||||
let stdout = std::io::stdout();
|
||||
serde_json::to_writer_pretty(stdout, &json).unwrap();
|
||||
exit(0)
|
||||
} else {
|
||||
println!("seen marker: {}", seen_marker);
|
||||
self.display_human_output(values)
|
||||
}
|
||||
}
|
||||
|
||||
fn display_read_range_output(&self, res: PaginatedRange<CausalValue>) -> ! {
|
||||
if self.json {
|
||||
let json = serde_json::json!({
|
||||
"next_key": res.next_start,
|
||||
"values": self.values_json(res.items),
|
||||
});
|
||||
|
||||
let stdout = std::io::stdout();
|
||||
serde_json::to_writer_pretty(stdout, &json).unwrap();
|
||||
exit(0)
|
||||
} else {
|
||||
if let Some(next) = res.next_start {
|
||||
println!("next key: {}", next);
|
||||
}
|
||||
self.display_human_output(res.items)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Filter for batch operations
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))]
|
||||
|
@ -342,7 +437,7 @@ async fn main() -> Result<(), Error> {
|
|||
let res = client.read_item(&partition_key, &sort_key).await?;
|
||||
output_kind.display_output(res);
|
||||
}
|
||||
Command::Poll {
|
||||
Command::PollItem {
|
||||
partition_key,
|
||||
sort_key,
|
||||
causality,
|
||||
|
@ -356,7 +451,54 @@ async fn main() -> Result<(), Error> {
|
|||
if let Some(res) = res_opt {
|
||||
output_kind.display_output(res);
|
||||
} else {
|
||||
println!("Delay expired and value didn't change.");
|
||||
if output_kind.json {
|
||||
println!("null");
|
||||
} else {
|
||||
println!("Delay expired and value didn't change.");
|
||||
}
|
||||
}
|
||||
}
|
||||
Command::PollRange {
|
||||
partition_key,
|
||||
filter,
|
||||
seen_marker,
|
||||
timeout,
|
||||
output_kind,
|
||||
} => {
|
||||
if filter.conflicts_only
|
||||
|| filter.tombstones
|
||||
|| filter.reverse
|
||||
|| filter.limit.is_some()
|
||||
{
|
||||
return Err(Error::Message(
|
||||
"limit, reverse, conlicts-only, tombstones are invalid for poll-range".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let timeout = timeout.map(Duration::from_secs);
|
||||
let res = client
|
||||
.poll_range(
|
||||
&partition_key,
|
||||
Some(PollRangeFilter {
|
||||
start: filter.start.as_deref(),
|
||||
end: filter.end.as_deref(),
|
||||
prefix: filter.prefix.as_deref(),
|
||||
}),
|
||||
seen_marker.as_deref(),
|
||||
timeout,
|
||||
)
|
||||
.await?;
|
||||
match res {
|
||||
Some((items, seen_marker)) => {
|
||||
output_kind.display_poll_range_output(seen_marker, items);
|
||||
}
|
||||
None => {
|
||||
if output_kind.json {
|
||||
println!("null");
|
||||
} else {
|
||||
println!("Delay expired and value didn't change.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Command::ReadIndex {
|
||||
|
@ -419,50 +561,7 @@ async fn main() -> Result<(), Error> {
|
|||
};
|
||||
let mut res = client.read_batch(&[op]).await?;
|
||||
let res = res.pop().unwrap();
|
||||
if output_kind.json {
|
||||
let values = res
|
||||
.items
|
||||
.into_iter()
|
||||
.map(|(k, v)| {
|
||||
let mut value = serde_json::to_value(v).unwrap();
|
||||
value
|
||||
.as_object_mut()
|
||||
.unwrap()
|
||||
.insert("sort_key".to_owned(), k.into());
|
||||
value
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let json = serde_json::json!({
|
||||
"next_key": res.next_start,
|
||||
"values": values,
|
||||
});
|
||||
|
||||
let stdout = std::io::stdout();
|
||||
serde_json::to_writer_pretty(stdout, &json).unwrap();
|
||||
} else {
|
||||
if let Some(next) = res.next_start {
|
||||
println!("next key: {}", next);
|
||||
}
|
||||
for (key, values) in res.items {
|
||||
println!("key: {}", key);
|
||||
let causality: String = values.causality.into();
|
||||
println!("causality: {}", causality);
|
||||
for value in values.value {
|
||||
match value {
|
||||
K2vValue::Value(v) => {
|
||||
if let Ok(string) = std::str::from_utf8(&v) {
|
||||
println!(" value(utf-8): {}", string);
|
||||
} else {
|
||||
println!(" value(base64): {}", base64::encode(&v));
|
||||
}
|
||||
}
|
||||
K2vValue::Tombstone => {
|
||||
println!(" tombstone");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
output_kind.display_read_range_output(res);
|
||||
}
|
||||
Command::DeleteRange {
|
||||
partition_key,
|
||||
|
|
|
@ -40,7 +40,13 @@ impl K2vClient {
|
|||
creds: AwsCredentials,
|
||||
user_agent: Option<String>,
|
||||
) -> Result<Self, Error> {
|
||||
let mut client = HttpClient::new()?;
|
||||
let connector = hyper_rustls::HttpsConnectorBuilder::new()
|
||||
.with_native_roots()
|
||||
.https_or_http()
|
||||
.enable_http1()
|
||||
.enable_http2()
|
||||
.build();
|
||||
let mut client = HttpClient::from_connector(connector);
|
||||
if let Some(ua) = user_agent {
|
||||
client.local_agent_prepend(ua);
|
||||
} else {
|
||||
|
@ -153,6 +159,58 @@ impl K2vClient {
|
|||
}
|
||||
}
|
||||
|
||||
/// Perform a PollRange request, waiting for any change in a given range of keys
|
||||
/// to occur
|
||||
pub async fn poll_range(
|
||||
&self,
|
||||
partition_key: &str,
|
||||
filter: Option<PollRangeFilter<'_>>,
|
||||
seen_marker: Option<&str>,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<Option<(BTreeMap<String, CausalValue>, String)>, Error> {
|
||||
let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
|
||||
|
||||
let request = PollRangeRequest {
|
||||
filter: filter.unwrap_or_default(),
|
||||
seen_marker,
|
||||
timeout: timeout.as_secs(),
|
||||
};
|
||||
|
||||
let mut req = SignedRequest::new(
|
||||
"POST",
|
||||
SERVICE,
|
||||
&self.region,
|
||||
&format!("/{}/{}", self.bucket, partition_key),
|
||||
);
|
||||
req.add_param("poll_range", "");
|
||||
|
||||
let payload = serde_json::to_vec(&request)?;
|
||||
req.set_payload(Some(payload));
|
||||
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
|
||||
|
||||
if res.status == StatusCode::NOT_MODIFIED {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let resp: PollRangeResponse = serde_json::from_slice(&res.body)?;
|
||||
|
||||
let items = resp
|
||||
.items
|
||||
.into_iter()
|
||||
.map(|BatchReadItem { sk, ct, v }| {
|
||||
(
|
||||
sk,
|
||||
CausalValue {
|
||||
causality: ct,
|
||||
value: v,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect::<BTreeMap<_, _>>();
|
||||
|
||||
Ok(Some((items, resp.seen_marker)))
|
||||
}
|
||||
|
||||
/// Perform an InsertItem request, inserting a value for a single pk+sk.
|
||||
pub async fn insert_item(
|
||||
&self,
|
||||
|
@ -389,6 +447,12 @@ impl From<CausalityToken> for String {
|
|||
}
|
||||
}
|
||||
|
||||
impl AsRef<str> for CausalityToken {
|
||||
fn as_ref(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// A value in K2V. can be either a binary value, or a tombstone.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum K2vValue {
|
||||
|
@ -466,6 +530,29 @@ pub struct Filter<'a> {
|
|||
pub reverse: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Serialize)]
|
||||
pub struct PollRangeFilter<'a> {
|
||||
pub start: Option<&'a str>,
|
||||
pub end: Option<&'a str>,
|
||||
pub prefix: Option<&'a str>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct PollRangeRequest<'a> {
|
||||
#[serde(flatten)]
|
||||
filter: PollRangeFilter<'a>,
|
||||
seen_marker: Option<&'a str>,
|
||||
timeout: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct PollRangeResponse {
|
||||
items: Vec<BatchReadItem>,
|
||||
seen_marker: String,
|
||||
}
|
||||
|
||||
impl<'a> Filter<'a> {
|
||||
fn insert_params(&self, req: &mut SignedRequest) {
|
||||
if let Some(start) = &self.start {
|
||||
|
|
|
@ -27,7 +27,7 @@ use crate::index_counter::*;
|
|||
use crate::key_table::*;
|
||||
|
||||
#[cfg(feature = "k2v")]
|
||||
use crate::k2v::{item_table::*, poll::*, rpc::*};
|
||||
use crate::k2v::{item_table::*, rpc::*, sub::*};
|
||||
|
||||
/// An entire Garage full of data
|
||||
pub struct Garage {
|
||||
|
@ -305,8 +305,10 @@ impl GarageK2V {
|
|||
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
|
||||
info!("Initialize K2V counter table...");
|
||||
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
|
||||
|
||||
info!("Initialize K2V subscription manager...");
|
||||
let subscriptions = Arc::new(SubscriptionManager::new());
|
||||
|
||||
info!("Initialize K2V item table...");
|
||||
let item_table = Table::new(
|
||||
K2VItemTable {
|
||||
|
@ -317,7 +319,9 @@ impl GarageK2V {
|
|||
system.clone(),
|
||||
db,
|
||||
);
|
||||
let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions);
|
||||
|
||||
info!("Initialize K2V RPC handler...");
|
||||
let rpc = K2VRpcHandler::new(system, db, item_table.clone(), subscriptions);
|
||||
|
||||
Self {
|
||||
item_table,
|
||||
|
|
|
@ -1,3 +1,12 @@
|
|||
//! Implements a CausalContext, which is a set of timestamps for each
|
||||
//! node -- a vector clock --, indicating that the versions with
|
||||
//! timestamps <= these numbers have been seen and can be
|
||||
//! overwritten by a subsequent write.
|
||||
//!
|
||||
//! The textual representation of a CausalContext, which we call a
|
||||
//! "causality token", is used in the API and must be sent along with
|
||||
//! each write or delete operation to indicate the previously seen
|
||||
//! versions that we want to overwrite or delete.
|
||||
use base64::prelude::*;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
@ -7,28 +16,44 @@ use serde::{Deserialize, Serialize};
|
|||
|
||||
use garage_util::data::*;
|
||||
|
||||
use crate::helper::error::{Error as HelperError, OkOrBadRequest};
|
||||
|
||||
/// Node IDs used in K2V are u64 integers that are the abbreviation
|
||||
/// of full Garage node IDs which are 256-bit UUIDs.
|
||||
pub type K2VNodeId = u64;
|
||||
|
||||
pub type VectorClock = BTreeMap<K2VNodeId, u64>;
|
||||
|
||||
pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
|
||||
let mut tmp = [0u8; 8];
|
||||
tmp.copy_from_slice(&node_id.as_slice()[..8]);
|
||||
u64::from_be_bytes(tmp)
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
|
||||
pub fn vclock_gt(a: &VectorClock, b: &VectorClock) -> bool {
|
||||
a.iter().any(|(n, ts)| ts > b.get(n).unwrap_or(&0))
|
||||
}
|
||||
|
||||
pub fn vclock_max(a: &VectorClock, b: &VectorClock) -> VectorClock {
|
||||
let mut ret = a.clone();
|
||||
for (n, ts) in b.iter() {
|
||||
let ent = ret.entry(*n).or_insert(0);
|
||||
*ent = std::cmp::max(*ts, *ent);
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Default)]
|
||||
pub struct CausalContext {
|
||||
pub vector_clock: BTreeMap<K2VNodeId, u64>,
|
||||
pub vector_clock: VectorClock,
|
||||
}
|
||||
|
||||
impl CausalContext {
|
||||
/// Empty causality context
|
||||
pub fn new_empty() -> Self {
|
||||
Self {
|
||||
vector_clock: BTreeMap::new(),
|
||||
}
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Make binary representation and encode in base64
|
||||
pub fn serialize(&self) -> String {
|
||||
let mut ints = Vec::with_capacity(2 * self.vector_clock.len());
|
||||
|
@ -45,13 +70,13 @@ impl CausalContext {
|
|||
|
||||
BASE64_URL_SAFE_NO_PAD.encode(bytes)
|
||||
}
|
||||
/// Parse from base64-encoded binary representation
|
||||
pub fn parse(s: &str) -> Result<Self, String> {
|
||||
let bytes = BASE64_URL_SAFE_NO_PAD
|
||||
.decode(s)
|
||||
.map_err(|e| format!("bad causality token base64: {}", e))?;
|
||||
|
||||
/// Parse from base64-encoded binary representation.
|
||||
/// Returns None on error.
|
||||
pub fn parse(s: &str) -> Option<Self> {
|
||||
let bytes = BASE64_URL_SAFE_NO_PAD.decode(s).ok()?;
|
||||
if bytes.len() % 16 != 8 || bytes.len() < 8 {
|
||||
return Err("bad causality token length".into());
|
||||
return None;
|
||||
}
|
||||
|
||||
let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
|
||||
|
@ -68,16 +93,19 @@ impl CausalContext {
|
|||
let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);
|
||||
|
||||
if check != checksum {
|
||||
return Err("bad causality token checksum".into());
|
||||
return None;
|
||||
}
|
||||
|
||||
Ok(ret)
|
||||
Some(ret)
|
||||
}
|
||||
|
||||
pub fn parse_helper(s: &str) -> Result<Self, HelperError> {
|
||||
Self::parse(s).ok_or_bad_request("Invalid causality token")
|
||||
}
|
||||
|
||||
/// Check if this causal context contains newer items than another one
|
||||
pub fn is_newer_than(&self, other: &Self) -> bool {
|
||||
self.vector_clock
|
||||
.iter()
|
||||
.any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0))
|
||||
vclock_gt(&self.vector_clock, &other.vector_clock)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ use garage_table::*;
|
|||
|
||||
use crate::index_counter::*;
|
||||
use crate::k2v::causality::*;
|
||||
use crate::k2v::poll::*;
|
||||
use crate::k2v::sub::*;
|
||||
|
||||
pub const ENTRIES: &str = "entries";
|
||||
pub const CONFLICTS: &str = "conflicts";
|
||||
|
@ -73,7 +73,8 @@ impl K2VItem {
|
|||
this_node: Uuid,
|
||||
context: &Option<CausalContext>,
|
||||
new_value: DvvsValue,
|
||||
) {
|
||||
node_ts: u64,
|
||||
) -> u64 {
|
||||
if let Some(context) = context {
|
||||
for (node, t_discard) in context.vector_clock.iter() {
|
||||
if let Some(e) = self.items.get_mut(node) {
|
||||
|
@ -98,12 +99,14 @@ impl K2VItem {
|
|||
values: vec![],
|
||||
});
|
||||
let t_prev = e.max_time();
|
||||
e.values.push((t_prev + 1, new_value));
|
||||
let t_new = std::cmp::max(t_prev + 1, node_ts + 1);
|
||||
e.values.push((t_new, new_value));
|
||||
t_new
|
||||
}
|
||||
|
||||
/// Extract the causality context of a K2V Item
|
||||
pub fn causal_context(&self) -> CausalContext {
|
||||
let mut cc = CausalContext::new_empty();
|
||||
let mut cc = CausalContext::new();
|
||||
for (node, ent) in self.items.iter() {
|
||||
cc.vector_clock.insert(*node, ent.max_time());
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
pub mod causality;
|
||||
pub mod seen;
|
||||
|
||||
pub mod item_table;
|
||||
|
||||
pub mod poll;
|
||||
pub mod rpc;
|
||||
|
||||
pub mod sub;
|
||||
|
|
|
@ -1,50 +0,0 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::k2v::item_table::*;
|
||||
|
||||
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct PollKey {
|
||||
pub partition: K2VItemPartition,
|
||||
pub sort_key: String,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SubscriptionManager {
|
||||
subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
|
||||
}
|
||||
|
||||
impl SubscriptionManager {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
|
||||
let mut subs = self.subscriptions.lock().unwrap();
|
||||
if let Some(s) = subs.get(key) {
|
||||
s.subscribe()
|
||||
} else {
|
||||
let (tx, rx) = broadcast::channel(8);
|
||||
subs.insert(key.clone(), tx);
|
||||
rx
|
||||
}
|
||||
}
|
||||
|
||||
pub fn notify(&self, item: &K2VItem) {
|
||||
let key = PollKey {
|
||||
partition: item.partition.clone(),
|
||||
sort_key: item.sort_key.clone(),
|
||||
};
|
||||
let mut subs = self.subscriptions.lock().unwrap();
|
||||
if let Some(s) = subs.get(&key) {
|
||||
if s.send(item.clone()).is_err() {
|
||||
// no more subscribers, remove channel from here
|
||||
// (we will re-create it later if we need to subscribe again)
|
||||
subs.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,9 +5,10 @@
|
|||
//! node does not process the entry directly, as this would
|
||||
//! mean the vector clock gets much larger than needed).
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::convert::TryInto;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::stream::FuturesUnordered;
|
||||
|
@ -15,9 +16,12 @@ use futures::StreamExt;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use tokio::select;
|
||||
|
||||
use garage_db as db;
|
||||
|
||||
use garage_util::crdt::*;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::*;
|
||||
use garage_util::time::now_msec;
|
||||
|
||||
use garage_rpc::system::System;
|
||||
use garage_rpc::*;
|
||||
|
@ -25,9 +29,15 @@ use garage_rpc::*;
|
|||
use garage_table::replication::{TableReplication, TableShardedReplication};
|
||||
use garage_table::{PartitionKey, Table};
|
||||
|
||||
use crate::helper::error::Error as HelperError;
|
||||
use crate::k2v::causality::*;
|
||||
use crate::k2v::item_table::*;
|
||||
use crate::k2v::poll::*;
|
||||
use crate::k2v::seen::*;
|
||||
use crate::k2v::sub::*;
|
||||
|
||||
const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200);
|
||||
|
||||
const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
|
||||
|
||||
/// RPC messages for K2V
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
@ -40,7 +50,13 @@ enum K2VRpc {
|
|||
causal_context: CausalContext,
|
||||
timeout_msec: u64,
|
||||
},
|
||||
PollRange {
|
||||
range: PollRange,
|
||||
seen_str: Option<String>,
|
||||
timeout_msec: u64,
|
||||
},
|
||||
PollItemResponse(Option<K2VItem>),
|
||||
PollRangeResponse(Uuid, Vec<K2VItem>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
@ -59,6 +75,12 @@ impl Rpc for K2VRpc {
|
|||
pub struct K2VRpcHandler {
|
||||
system: Arc<System>,
|
||||
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
||||
|
||||
// Using a mutex on the local_timestamp_tree is not strictly necessary,
|
||||
// but it helps to not try to do several inserts at the same time,
|
||||
// which would create transaction conflicts and force many useless retries.
|
||||
local_timestamp_tree: Mutex<db::Tree>,
|
||||
|
||||
endpoint: Arc<Endpoint<K2VRpc, Self>>,
|
||||
subscriptions: Arc<SubscriptionManager>,
|
||||
}
|
||||
|
@ -66,14 +88,19 @@ pub struct K2VRpcHandler {
|
|||
impl K2VRpcHandler {
|
||||
pub fn new(
|
||||
system: Arc<System>,
|
||||
db: &db::Db,
|
||||
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
||||
subscriptions: Arc<SubscriptionManager>,
|
||||
) -> Arc<Self> {
|
||||
let local_timestamp_tree = db
|
||||
.open_tree("k2v_local_timestamp")
|
||||
.expect("Unable to open DB tree for k2v local timestamp");
|
||||
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
|
||||
|
||||
let rpc_handler = Arc::new(Self {
|
||||
system,
|
||||
item_table,
|
||||
local_timestamp_tree: Mutex::new(local_timestamp_tree),
|
||||
endpoint,
|
||||
subscriptions,
|
||||
});
|
||||
|
@ -181,7 +208,7 @@ impl K2VRpcHandler {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn poll(
|
||||
pub async fn poll_item(
|
||||
&self,
|
||||
bucket_id: Uuid,
|
||||
partition_key: String,
|
||||
|
@ -230,9 +257,7 @@ impl K2VRpcHandler {
|
|||
resp = Some(x);
|
||||
}
|
||||
}
|
||||
K2VRpc::PollItemResponse(None) => {
|
||||
return Ok(None);
|
||||
}
|
||||
K2VRpc::PollItemResponse(None) => (),
|
||||
v => return Err(Error::unexpected_rpc_message(v)),
|
||||
}
|
||||
}
|
||||
|
@ -240,10 +265,117 @@ impl K2VRpcHandler {
|
|||
Ok(resp)
|
||||
}
|
||||
|
||||
pub async fn poll_range(
|
||||
&self,
|
||||
range: PollRange,
|
||||
seen_str: Option<String>,
|
||||
timeout_msec: u64,
|
||||
) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> {
|
||||
let has_seen_marker = seen_str.is_some();
|
||||
|
||||
// Parse seen marker, we will use it below. This is also the first check
|
||||
// that it is valid, which returns a bad request error if not.
|
||||
let mut seen = seen_str
|
||||
.as_deref()
|
||||
.map(RangeSeenMarker::decode_helper)
|
||||
.transpose()?
|
||||
.unwrap_or_default();
|
||||
seen.restrict(&range);
|
||||
|
||||
// Prepare PollRange RPC to send to the storage nodes responsible for the parititon
|
||||
let nodes = self
|
||||
.item_table
|
||||
.data
|
||||
.replication
|
||||
.write_nodes(&range.partition.hash());
|
||||
let quorum = self.item_table.data.replication.read_quorum();
|
||||
let msg = K2VRpc::PollRange {
|
||||
range,
|
||||
seen_str,
|
||||
timeout_msec,
|
||||
};
|
||||
|
||||
// Send the request to all nodes, use FuturesUnordered to get the responses in any order
|
||||
let msg = msg.into_req().map_err(netapp::error::Error::from)?;
|
||||
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
|
||||
let mut requests = nodes
|
||||
.iter()
|
||||
.map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
// Fetch responses. This procedure stops fetching responses when any of the following
|
||||
// conditions arise:
|
||||
// - we have a response to all requests
|
||||
// - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay
|
||||
// has passed since the quorum was achieved
|
||||
// - a global RPC timeout expired
|
||||
// The extra delay after a quorum was received is usefull if the third response was to
|
||||
// arrive during this short interval: this would allow us to consider all the data seen
|
||||
// by that last node in the response we produce, and would likely help reduce the
|
||||
// size of the seen marker that we will return (because we would have an info of the
|
||||
// kind: all items produced by that node until time ts have been returned, so we can
|
||||
// bump the entry in the global vector clock and possibly remove some item-specific
|
||||
// vector clocks)
|
||||
let mut deadline =
|
||||
Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
|
||||
let mut resps = vec![];
|
||||
let mut errors = vec![];
|
||||
loop {
|
||||
select! {
|
||||
_ = tokio::time::sleep_until(deadline.into()) => {
|
||||
break;
|
||||
}
|
||||
res = requests.next() => match res {
|
||||
None => break,
|
||||
Some(Err(e)) => errors.push(e),
|
||||
Some(Ok(r)) => {
|
||||
resps.push(r);
|
||||
if resps.len() >= quorum {
|
||||
deadline = std::cmp::min(deadline, Instant::now() + POLL_RANGE_EXTRA_DELAY);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if errors.len() > nodes.len() - quorum {
|
||||
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
|
||||
return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into());
|
||||
}
|
||||
|
||||
// Take all returned items into account to produce the response.
|
||||
let mut new_items = BTreeMap::<String, K2VItem>::new();
|
||||
for v in resps {
|
||||
if let K2VRpc::PollRangeResponse(node, items) = v {
|
||||
seen.mark_seen_node_items(node, items.iter());
|
||||
for item in items.into_iter() {
|
||||
match new_items.get_mut(&item.sort_key) {
|
||||
Some(ent) => {
|
||||
ent.merge(&item);
|
||||
}
|
||||
None => {
|
||||
new_items.insert(item.sort_key.clone(), item);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(Error::unexpected_rpc_message(v).into());
|
||||
}
|
||||
}
|
||||
|
||||
if new_items.is_empty() && has_seen_marker {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some((new_items, seen.encode()?)))
|
||||
}
|
||||
}
|
||||
|
||||
// ---- internal handlers ----
|
||||
|
||||
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
|
||||
let new = self.local_insert(item)?;
|
||||
let new = {
|
||||
let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
|
||||
self.local_insert(&local_timestamp_tree, item)?
|
||||
};
|
||||
|
||||
// Propagate to rest of network
|
||||
if let Some(updated) = new {
|
||||
|
@ -256,11 +388,14 @@ impl K2VRpcHandler {
|
|||
async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
|
||||
let mut updated_vec = vec![];
|
||||
|
||||
for item in items {
|
||||
let new = self.local_insert(item)?;
|
||||
{
|
||||
let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
|
||||
for item in items {
|
||||
let new = self.local_insert(&local_timestamp_tree, item)?;
|
||||
|
||||
if let Some(updated) = new {
|
||||
updated_vec.push(updated);
|
||||
if let Some(updated) = new {
|
||||
updated_vec.push(updated);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -272,10 +407,22 @@ impl K2VRpcHandler {
|
|||
Ok(K2VRpc::Ok)
|
||||
}
|
||||
|
||||
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
|
||||
fn local_insert(
|
||||
&self,
|
||||
local_timestamp_tree: &MutexGuard<'_, db::Tree>,
|
||||
item: &InsertedItem,
|
||||
) -> Result<Option<K2VItem>, Error> {
|
||||
let now = now_msec();
|
||||
|
||||
self.item_table
|
||||
.data
|
||||
.update_entry_with(&item.partition, &item.sort_key, |ent| {
|
||||
.update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
|
||||
let old_local_timestamp = tx
|
||||
.get(&local_timestamp_tree, TIMESTAMP_KEY)?
|
||||
.and_then(|x| x.try_into().ok())
|
||||
.map(u64::from_be_bytes)
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut ent = ent.unwrap_or_else(|| {
|
||||
K2VItem::new(
|
||||
item.partition.bucket_id,
|
||||
|
@ -283,13 +430,25 @@ impl K2VRpcHandler {
|
|||
item.sort_key.clone(),
|
||||
)
|
||||
});
|
||||
ent.update(self.system.id, &item.causal_context, item.value.clone());
|
||||
ent
|
||||
let new_local_timestamp = ent.update(
|
||||
self.system.id,
|
||||
&item.causal_context,
|
||||
item.value.clone(),
|
||||
std::cmp::max(old_local_timestamp, now),
|
||||
);
|
||||
|
||||
tx.insert(
|
||||
&local_timestamp_tree,
|
||||
TIMESTAMP_KEY,
|
||||
u64::to_be_bytes(new_local_timestamp),
|
||||
)?;
|
||||
|
||||
Ok(ent)
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
|
||||
let mut chan = self.subscriptions.subscribe(key);
|
||||
async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
|
||||
let mut chan = self.subscriptions.subscribe_item(key);
|
||||
|
||||
let mut value = self
|
||||
.item_table
|
||||
|
@ -311,6 +470,71 @@ impl K2VRpcHandler {
|
|||
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
async fn handle_poll_range(
|
||||
&self,
|
||||
range: &PollRange,
|
||||
seen_str: &Option<String>,
|
||||
) -> Result<Vec<K2VItem>, Error> {
|
||||
if let Some(seen_str) = seen_str {
|
||||
let seen = RangeSeenMarker::decode(seen_str).ok_or_message("Invalid seenMarker")?;
|
||||
|
||||
// Subscribe now to all changes on that partition,
|
||||
// so that new items that are inserted while we are reading the range
|
||||
// will be seen in the loop below
|
||||
let mut chan = self.subscriptions.subscribe_partition(&range.partition);
|
||||
|
||||
// Check for the presence of any new items already stored in the item table
|
||||
let mut new_items = self.poll_range_read_range(range, &seen)?;
|
||||
|
||||
// If we found no new items, wait for a matching item to arrive
|
||||
// on the channel
|
||||
while new_items.is_empty() {
|
||||
let item = chan.recv().await?;
|
||||
if range.matches(&item) && seen.is_new_item(&item) {
|
||||
new_items.push(item);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(new_items)
|
||||
} else {
|
||||
// If no seen marker was specified, we do not poll for anything.
|
||||
// We return immediately with the set of known items (even if
|
||||
// it is empty), which will give the client an inital view of
|
||||
// the dataset and an initial seen marker for further
|
||||
// PollRange calls.
|
||||
self.poll_range_read_range(range, &RangeSeenMarker::default())
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_range_read_range(
|
||||
&self,
|
||||
range: &PollRange,
|
||||
seen: &RangeSeenMarker,
|
||||
) -> Result<Vec<K2VItem>, Error> {
|
||||
let mut new_items = vec![];
|
||||
|
||||
let partition_hash = range.partition.hash();
|
||||
let first_key = match &range.start {
|
||||
None => partition_hash.to_vec(),
|
||||
Some(sk) => self.item_table.data.tree_key(&range.partition, sk),
|
||||
};
|
||||
for item in self.item_table.data.store.range(first_key..)? {
|
||||
let (key, value) = item?;
|
||||
if &key[..32] != partition_hash.as_slice() {
|
||||
break;
|
||||
}
|
||||
let item = self.item_table.data.decode_entry(&value)?;
|
||||
if !range.matches(&item) {
|
||||
break;
|
||||
}
|
||||
if seen.is_new_item(&item) {
|
||||
new_items.push(item);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(new_items)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -326,10 +550,21 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
|
|||
} => {
|
||||
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
|
||||
select! {
|
||||
ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
|
||||
ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
|
||||
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
|
||||
}
|
||||
}
|
||||
K2VRpc::PollRange {
|
||||
range,
|
||||
seen_str,
|
||||
timeout_msec,
|
||||
} => {
|
||||
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
|
||||
select! {
|
||||
ret = self.handle_poll_range(range, seen_str) => ret.map(|items| K2VRpc::PollRangeResponse(self.system.id, items)),
|
||||
_ = delay => Ok(K2VRpc::PollRangeResponse(self.system.id, vec![])),
|
||||
}
|
||||
}
|
||||
m => Err(Error::unexpected_rpc_message(m)),
|
||||
}
|
||||
}
|
||||
|
|
105
src/model/k2v/seen.rs
Normal file
105
src/model/k2v/seen.rs
Normal file
|
@ -0,0 +1,105 @@
|
|||
//! Implements a RangeSeenMarker, a data type used in the PollRange API
|
||||
//! to indicate which items in the range have already been seen
|
||||
//! and which have not been seen yet.
|
||||
//!
|
||||
//! It consists of a vector clock that indicates that for each node,
|
||||
//! all items produced by that node with timestamps <= the value in the
|
||||
//! vector clock has been seen, as well as a set of causal contexts for
|
||||
//! individual items.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use base64::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use garage_util::data::Uuid;
|
||||
use garage_util::encode::{nonversioned_decode, nonversioned_encode};
|
||||
use garage_util::error::Error;
|
||||
|
||||
use crate::helper::error::{Error as HelperError, OkOrBadRequest};
|
||||
use crate::k2v::causality::*;
|
||||
use crate::k2v::item_table::*;
|
||||
use crate::k2v::sub::*;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||
pub struct RangeSeenMarker {
|
||||
vector_clock: VectorClock,
|
||||
items: BTreeMap<String, VectorClock>,
|
||||
}
|
||||
|
||||
impl RangeSeenMarker {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn restrict(&mut self, range: &PollRange) {
|
||||
if let Some(start) = &range.start {
|
||||
self.items = self.items.split_off(start);
|
||||
}
|
||||
if let Some(end) = &range.end {
|
||||
self.items.split_off(end);
|
||||
}
|
||||
if let Some(pfx) = &range.prefix {
|
||||
self.items.retain(|k, _v| k.starts_with(pfx));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>(
|
||||
&mut self,
|
||||
node: Uuid,
|
||||
items: I,
|
||||
) {
|
||||
let node = make_node_id(node);
|
||||
for item in items.into_iter() {
|
||||
let cc = item.causal_context();
|
||||
|
||||
if let Some(ts) = cc.vector_clock.get(&node) {
|
||||
let ent = self.vector_clock.entry(node).or_insert(0);
|
||||
*ent = std::cmp::max(*ent, *ts);
|
||||
}
|
||||
|
||||
if vclock_gt(&cc.vector_clock, &self.vector_clock) {
|
||||
match self.items.get_mut(&item.sort_key) {
|
||||
None => {
|
||||
self.items.insert(item.sort_key.clone(), cc.vector_clock);
|
||||
}
|
||||
Some(ent) => *ent = vclock_max(&ent, &cc.vector_clock),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn canonicalize(&mut self) {
|
||||
let self_vc = &self.vector_clock;
|
||||
self.items.retain(|_sk, vc| vclock_gt(&vc, self_vc))
|
||||
}
|
||||
|
||||
pub fn encode(&mut self) -> Result<String, Error> {
|
||||
self.canonicalize();
|
||||
|
||||
let bytes = nonversioned_encode(&self)?;
|
||||
let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?;
|
||||
Ok(BASE64_STANDARD.encode(&bytes))
|
||||
}
|
||||
|
||||
/// Decode from msgpack+zstd+b64 representation, returns None on error.
|
||||
pub fn decode(s: &str) -> Option<Self> {
|
||||
let bytes = BASE64_STANDARD.decode(&s).ok()?;
|
||||
let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?;
|
||||
nonversioned_decode(&bytes).ok()
|
||||
}
|
||||
|
||||
pub fn decode_helper(s: &str) -> Result<Self, HelperError> {
|
||||
Self::decode(s).ok_or_bad_request("Invalid causality token")
|
||||
}
|
||||
|
||||
pub fn is_new_item(&self, item: &K2VItem) -> bool {
|
||||
let cc = item.causal_context();
|
||||
vclock_gt(&cc.vector_clock, &self.vector_clock)
|
||||
&& self
|
||||
.items
|
||||
.get(&item.sort_key)
|
||||
.map(|vc| vclock_gt(&cc.vector_clock, &vc))
|
||||
.unwrap_or(true)
|
||||
}
|
||||
}
|
110
src/model/k2v/sub.rs
Normal file
110
src/model/k2v/sub.rs
Normal file
|
@ -0,0 +1,110 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::k2v::item_table::*;
|
||||
|
||||
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct PollKey {
|
||||
pub partition: K2VItemPartition,
|
||||
pub sort_key: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct PollRange {
|
||||
pub partition: K2VItemPartition,
|
||||
pub prefix: Option<String>,
|
||||
pub start: Option<String>,
|
||||
pub end: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>);
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SubscriptionManagerInner {
|
||||
item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>,
|
||||
part_subscriptions: HashMap<K2VItemPartition, broadcast::Sender<K2VItem>>,
|
||||
}
|
||||
|
||||
impl SubscriptionManager {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
|
||||
let mut inner = self.0.lock().unwrap();
|
||||
if let Some(s) = inner.item_subscriptions.get(key) {
|
||||
s.subscribe()
|
||||
} else {
|
||||
let (tx, rx) = broadcast::channel(8);
|
||||
inner.item_subscriptions.insert(key.clone(), tx);
|
||||
rx
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn subscribe_partition(
|
||||
&self,
|
||||
part: &K2VItemPartition,
|
||||
) -> broadcast::Receiver<K2VItem> {
|
||||
let mut inner = self.0.lock().unwrap();
|
||||
if let Some(s) = inner.part_subscriptions.get(part) {
|
||||
s.subscribe()
|
||||
} else {
|
||||
let (tx, rx) = broadcast::channel(8);
|
||||
inner.part_subscriptions.insert(part.clone(), tx);
|
||||
rx
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn notify(&self, item: &K2VItem) {
|
||||
let mut inner = self.0.lock().unwrap();
|
||||
|
||||
// 1. Notify single item subscribers,
|
||||
// removing subscriptions with no more listeners if any
|
||||
let key = PollKey {
|
||||
partition: item.partition.clone(),
|
||||
sort_key: item.sort_key.clone(),
|
||||
};
|
||||
if let Some(s) = inner.item_subscriptions.get(&key) {
|
||||
if s.send(item.clone()).is_err() {
|
||||
// no more subscribers, remove channel from here
|
||||
// (we will re-create it later if we need to subscribe again)
|
||||
inner.item_subscriptions.remove(&key);
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Notify partition subscribers,
|
||||
// removing subscriptions with no more listeners if any
|
||||
if let Some(s) = inner.part_subscriptions.get(&item.partition) {
|
||||
if s.send(item.clone()).is_err() {
|
||||
// no more subscribers, remove channel from here
|
||||
// (we will re-create it later if we need to subscribe again)
|
||||
inner.part_subscriptions.remove(&item.partition);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PollRange {
|
||||
pub fn matches(&self, item: &K2VItem) -> bool {
|
||||
item.partition == self.partition
|
||||
&& self
|
||||
.prefix
|
||||
.as_ref()
|
||||
.map(|x| item.sort_key.starts_with(x))
|
||||
.unwrap_or(true)
|
||||
&& self
|
||||
.start
|
||||
.as_ref()
|
||||
.map(|x| item.sort_key >= *x)
|
||||
.unwrap_or(true)
|
||||
&& self
|
||||
.end
|
||||
.as_ref()
|
||||
.map(|x| item.sort_key < *x)
|
||||
.unwrap_or(true)
|
||||
}
|
||||
}
|
|
@ -15,10 +15,9 @@ use opentelemetry::{
|
|||
};
|
||||
|
||||
pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
|
||||
use netapp::message::IntoReq;
|
||||
pub use netapp::message::{
|
||||
Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL,
|
||||
PRIO_SECONDARY,
|
||||
IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH,
|
||||
PRIO_NORMAL, PRIO_SECONDARY,
|
||||
};
|
||||
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
|
||||
pub use netapp::{self, NetApp, NodeID};
|
||||
|
|
|
@ -181,13 +181,17 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
|
||||
let update = self.decode_entry(update_bytes)?;
|
||||
|
||||
self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent {
|
||||
Some(mut ent) => {
|
||||
ent.merge(&update);
|
||||
ent
|
||||
}
|
||||
None => update.clone(),
|
||||
})?;
|
||||
self.update_entry_with(
|
||||
update.partition_key(),
|
||||
update.sort_key(),
|
||||
|_tx, ent| match ent {
|
||||
Some(mut ent) => {
|
||||
ent.merge(&update);
|
||||
Ok(ent)
|
||||
}
|
||||
None => Ok(update.clone()),
|
||||
},
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -195,7 +199,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
&self,
|
||||
partition_key: &F::P,
|
||||
sort_key: &F::S,
|
||||
f: impl Fn(Option<F::E>) -> F::E,
|
||||
update_fn: impl Fn(&mut db::Transaction, Option<F::E>) -> db::TxOpResult<F::E>,
|
||||
) -> Result<Option<F::E>, Error> {
|
||||
let tree_key = self.tree_key(partition_key, sort_key);
|
||||
|
||||
|
@ -203,10 +207,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
|||
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
|
||||
Some(old_bytes) => {
|
||||
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
|
||||
let new_entry = f(Some(old_entry.clone()));
|
||||
let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
|
||||
(Some(old_entry), Some(old_bytes), new_entry)
|
||||
}
|
||||
None => (None, None, f(None)),
|
||||
None => (None, None, update_fn(&mut tx, None)?),
|
||||
};
|
||||
|
||||
// Changed can be true in two scenarios
|
||||
|
|
Loading…
Add table
Reference in a new issue