From 32aab06929f09ce69bc49c4737b4801dd31a3b6f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 11:12:16 +0100 Subject: [PATCH] k2v-client libary poll_range and CLI poll-range --- Cargo.lock | 3 +- Cargo.nix | 9 +- src/k2v-client/Cargo.toml | 3 +- src/k2v-client/bin/k2v-cli.rs | 203 +++++++++++++++++++++++++--------- src/k2v-client/lib.rs | 89 ++++++++++++++- 5 files changed, 248 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb8b4f5c..83bc90e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1752,12 +1752,13 @@ dependencies = [ [[package]] name = "k2v-client" -version = "0.0.1" +version = "0.1.1" dependencies = [ "base64", "clap 3.1.18", "garage_util", "http", + "hyper-rustls 0.23.0", "log", "rusoto_core", "rusoto_credential", diff --git a/Cargo.nix b/Cargo.nix index 79601cdd..1bf2bb06 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -32,7 +32,7 @@ args@{ ignoreLockHash, }: let - nixifiedLockHash = "b6aeefc112eb232904b24398f4e5da776c8ee2c13d427a26dbdf1732205d4fc9"; + nixifiedLockHash = "8f036894ab81a528f76e97e904ff3e496a9b1500569312489d444f615fb781bf"; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); lockHashIgnored = if ignoreLockHash @@ -65,7 +65,7 @@ in garage_api = rustPackages.unknown.garage_api."0.8.1"; garage_web = rustPackages.unknown.garage_web."0.8.1"; garage = rustPackages.unknown.garage."0.8.1"; - k2v-client = rustPackages.unknown.k2v-client."0.0.1"; + k2v-client = rustPackages.unknown.k2v-client."0.1.1"; }; "registry+https://github.com/rust-lang/crates.io-index".addr2line."0.17.0" = overridableMkRustCrate (profileName: rec { name = "addr2line"; @@ -2434,9 +2434,9 @@ in }; }); - "unknown".k2v-client."0.0.1" = overridableMkRustCrate (profileName: rec { + "unknown".k2v-client."0.1.1" = overridableMkRustCrate (profileName: rec { name = "k2v-client"; - version = "0.0.1"; + version = "0.1.1"; registry = "unknown"; src = fetchCrateLocal (workspaceSrc + "/src/k2v-client"); features = builtins.concatLists [ @@ -2449,6 +2449,7 @@ in ${ if rootFeatures' ? "k2v-client/clap" || rootFeatures' ? "k2v-client/cli" then "clap" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".clap."3.1.18" { inherit profileName; }).out; ${ if rootFeatures' ? "k2v-client/cli" || rootFeatures' ? "k2v-client/garage_util" then "garage_util" else null } = (rustPackages."unknown".garage_util."0.8.1" { inherit profileName; }).out; http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.8" { inherit profileName; }).out; + hyper_rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.23.0" { inherit profileName; }).out; log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }).out; rusoto_core = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_core."0.48.0" { inherit profileName; }).out; rusoto_credential = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusoto_credential."0.48.0" { inherit profileName; }).out; diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml index f57ce849..d95a1b3f 100644 --- a/src/k2v-client/Cargo.toml +++ b/src/k2v-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "k2v-client" -version = "0.0.1" +version = "0.1.1" authors = ["Trinity Pointard ", "Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -15,6 +15,7 @@ log = "0.4" rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] } rusoto_credential = "0.48.0" rusoto_signature = "0.48.0" +hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12", "logging" ] } serde = "1.0.137" serde_json = "1.0.81" thiserror = "1.0.31" diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs index 925ebeb8..cdd63cce 100644 --- a/src/k2v-client/bin/k2v-cli.rs +++ b/src/k2v-client/bin/k2v-cli.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; +use std::process::exit; use std::time::Duration; use k2v_client::*; @@ -57,22 +59,39 @@ enum Command { #[clap(flatten)] output_kind: ReadOutputKind, }, - /// Watch changes on a single value - Poll { - /// Partition key to delete from + /// Watch changes on a single value + PollItem { + /// Partition key of item to watch partition_key: String, - /// Sort key to delete from + /// Sort key of item to watch sort_key: String, /// Causality information #[clap(short, long)] causality: String, /// Timeout, in seconds - #[clap(short, long)] + #[clap(short = 'T', long)] timeout: Option, /// Output formating #[clap(flatten)] output_kind: ReadOutputKind, }, + /// Watch changes on a range of values + PollRange { + /// Partition key to poll from + partition_key: String, + /// Output only sort keys matching this filter + #[clap(flatten)] + filter: Filter, + /// Marker of data that had previously been seen by a PollRange + #[clap(short = 'S', long)] + seen_marker: Option, + /// Timeout, in seconds + #[clap(short = 'T', long)] + timeout: Option, + /// Output formating + #[clap(flatten)] + output_kind: BatchOutputKind, + }, /// Delete a single value Delete { /// Partition key to delete from @@ -176,7 +195,6 @@ struct ReadOutputKind { impl ReadOutputKind { fn display_output(&self, val: CausalValue) -> ! { use std::io::Write; - use std::process::exit; if self.json { let stdout = std::io::stdout(); @@ -254,6 +272,83 @@ struct BatchOutputKind { json: bool, } +impl BatchOutputKind { + fn display_human_output(&self, values: BTreeMap) -> ! { + for (key, values) in values { + println!("key: {}", key); + let causality: String = values.causality.into(); + println!("causality: {}", causality); + for value in values.value { + match value { + K2vValue::Value(v) => { + if let Ok(string) = std::str::from_utf8(&v) { + println!(" value(utf-8): {}", string); + } else { + println!(" value(base64): {}", base64::encode(&v)); + } + } + K2vValue::Tombstone => { + println!(" tombstone"); + } + } + } + } + exit(0); + } + + fn values_json(&self, values: BTreeMap) -> Vec { + values + .into_iter() + .map(|(k, v)| { + let mut value = serde_json::to_value(v).unwrap(); + value + .as_object_mut() + .unwrap() + .insert("sort_key".to_owned(), k.into()); + value + }) + .collect::>() + } + + fn display_poll_range_output( + &self, + seen_marker: String, + values: BTreeMap, + ) -> ! { + if self.json { + let json = serde_json::json!({ + "values": self.values_json(values), + "seen_marker": seen_marker, + }); + + let stdout = std::io::stdout(); + serde_json::to_writer_pretty(stdout, &json).unwrap(); + exit(0) + } else { + println!("seen marker: {}", seen_marker); + self.display_human_output(values) + } + } + + fn display_read_range_output(&self, res: PaginatedRange) -> ! { + if self.json { + let json = serde_json::json!({ + "next_key": res.next_start, + "values": self.values_json(res.items), + }); + + let stdout = std::io::stdout(); + serde_json::to_writer_pretty(stdout, &json).unwrap(); + exit(0) + } else { + if let Some(next) = res.next_start { + println!("next key: {}", next); + } + self.display_human_output(res.items) + } + } +} + /// Filter for batch operations #[derive(Parser, Debug)] #[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))] @@ -342,7 +437,7 @@ async fn main() -> Result<(), Error> { let res = client.read_item(&partition_key, &sort_key).await?; output_kind.display_output(res); } - Command::Poll { + Command::PollItem { partition_key, sort_key, causality, @@ -356,7 +451,54 @@ async fn main() -> Result<(), Error> { if let Some(res) = res_opt { output_kind.display_output(res); } else { - println!("Delay expired and value didn't change."); + if output_kind.json { + println!("null"); + } else { + println!("Delay expired and value didn't change."); + } + } + } + Command::PollRange { + partition_key, + filter, + seen_marker, + timeout, + output_kind, + } => { + if filter.conflicts_only + || filter.tombstones + || filter.reverse + || filter.limit.is_some() + { + return Err(Error::Message( + "limit, reverse, conlicts-only, tombstones are invalid for poll-range".into(), + )); + } + + let timeout = timeout.map(Duration::from_secs); + let res = client + .poll_range( + &partition_key, + Some(PollRangeFilter { + start: filter.start.as_deref(), + end: filter.end.as_deref(), + prefix: filter.prefix.as_deref(), + }), + seen_marker.as_deref(), + timeout, + ) + .await?; + match res { + Some((items, seen_marker)) => { + output_kind.display_poll_range_output(seen_marker, items); + } + None => { + if output_kind.json { + println!("null"); + } else { + println!("Delay expired and value didn't change."); + } + } } } Command::ReadIndex { @@ -419,50 +561,7 @@ async fn main() -> Result<(), Error> { }; let mut res = client.read_batch(&[op]).await?; let res = res.pop().unwrap(); - if output_kind.json { - let values = res - .items - .into_iter() - .map(|(k, v)| { - let mut value = serde_json::to_value(v).unwrap(); - value - .as_object_mut() - .unwrap() - .insert("sort_key".to_owned(), k.into()); - value - }) - .collect::>(); - let json = serde_json::json!({ - "next_key": res.next_start, - "values": values, - }); - - let stdout = std::io::stdout(); - serde_json::to_writer_pretty(stdout, &json).unwrap(); - } else { - if let Some(next) = res.next_start { - println!("next key: {}", next); - } - for (key, values) in res.items { - println!("key: {}", key); - let causality: String = values.causality.into(); - println!("causality: {}", causality); - for value in values.value { - match value { - K2vValue::Value(v) => { - if let Ok(string) = std::str::from_utf8(&v) { - println!(" value(utf-8): {}", string); - } else { - println!(" value(base64): {}", base64::encode(&v)); - } - } - K2vValue::Tombstone => { - println!(" tombstone"); - } - } - } - } - } + output_kind.display_read_range_output(res); } Command::DeleteRange { partition_key, diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs index c2606af4..ca52d0cf 100644 --- a/src/k2v-client/lib.rs +++ b/src/k2v-client/lib.rs @@ -40,7 +40,13 @@ impl K2vClient { creds: AwsCredentials, user_agent: Option, ) -> Result { - let mut client = HttpClient::new()?; + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots() + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + let mut client = HttpClient::from_connector(connector); if let Some(ua) = user_agent { client.local_agent_prepend(ua); } else { @@ -153,6 +159,58 @@ impl K2vClient { } } + /// Perform a PollRange request, waiting for any change in a given range of keys + /// to occur + pub async fn poll_range( + &self, + partition_key: &str, + filter: Option>, + seen_marker: Option<&str>, + timeout: Option, + ) -> Result, String)>, Error> { + let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT); + + let request = PollRangeRequest { + filter: filter.unwrap_or_default(), + seen_marker, + timeout: timeout.as_secs(), + }; + + let mut req = SignedRequest::new( + "POST", + SERVICE, + &self.region, + &format!("/{}/{}", self.bucket, partition_key), + ); + req.add_param("poll_range", ""); + + let payload = serde_json::to_vec(&request)?; + req.set_payload(Some(payload)); + let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; + + if res.status == StatusCode::NOT_MODIFIED { + return Ok(None); + } + + let resp: PollRangeResponse = serde_json::from_slice(&res.body)?; + + let items = resp + .items + .into_iter() + .map(|BatchReadItem { sk, ct, v }| { + ( + sk, + CausalValue { + causality: ct, + value: v, + }, + ) + }) + .collect::>(); + + Ok(Some((items, resp.seen_marker))) + } + /// Perform an InsertItem request, inserting a value for a single pk+sk. pub async fn insert_item( &self, @@ -389,6 +447,12 @@ impl From for String { } } +impl AsRef for CausalityToken { + fn as_ref(&self) -> &str { + &self.0 + } +} + /// A value in K2V. can be either a binary value, or a tombstone. #[derive(Debug, Clone, PartialEq, Eq)] pub enum K2vValue { @@ -466,6 +530,29 @@ pub struct Filter<'a> { pub reverse: bool, } +#[derive(Debug, Default, Clone, Serialize)] +pub struct PollRangeFilter<'a> { + pub start: Option<&'a str>, + pub end: Option<&'a str>, + pub prefix: Option<&'a str>, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct PollRangeRequest<'a> { + #[serde(flatten)] + filter: PollRangeFilter<'a>, + seen_marker: Option<&'a str>, + timeout: u64, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PollRangeResponse { + items: Vec, + seen_marker: String, +} + impl<'a> Filter<'a> { fn insert_params(&self, req: &mut SignedRequest) { if let Some(start) = &self.start {