diff --git a/Cargo.lock b/Cargo.lock index b0326c71..c9ca1f58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -403,10 +403,49 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" dependencies = [ "bitflags", - "textwrap", + "textwrap 0.11.0", "unicode-width", ] +[[package]] +name = "clap" +version = "3.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2dbdf4bdacb33466e854ce889eee8dfd5729abf7ccd7664d0a2d60cd384440b" +dependencies = [ + "atty", + "bitflags", + "clap_derive", + "clap_lex", + "indexmap", + "lazy_static", + "strsim", + "termcolor", + "textwrap 0.15.0", +] + +[[package]] +name = "clap_derive" +version = "3.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25320346e922cffe59c0bbc5410c8d8784509efb321488971081313cb1e1a33c" +dependencies = [ + "heck 0.4.0", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a37c35f1112dad5e6e0b1adaff798507497a18fceeb30cceb3bae7d1427b9213" +dependencies = [ + "os_str_bytes", +] + [[package]] name = "cloudabi" version = "0.0.3" @@ -1271,6 +1310,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1548,6 +1593,7 @@ name = "k2v-client" version = "0.1.0" dependencies = [ "base64", + "clap 3.1.18", "http", "rusoto_core", "rusoto_credential", @@ -2092,6 +2138,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "os_str_bytes" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435" + [[package]] name = "parking_lot" version = "0.11.2" @@ -2341,7 +2393,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" dependencies = [ "bytes 1.1.0", - "heck", + "heck 0.3.3", "itertools 0.10.3", "lazy_static", "log", @@ -2986,7 +3038,7 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" dependencies = [ - "clap", + "clap 2.34.0", "lazy_static", "structopt-derive", ] @@ -2997,7 +3049,7 @@ version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" dependencies = [ - "heck", + "heck 0.3.3", "proc-macro-error", "proc-macro2", "quote", @@ -3065,6 +3117,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "textwrap" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" + [[package]] name = "thiserror" version = "1.0.31" diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml index 358963d7..07de9d05 100644 --- a/src/k2v-client/Cargo.toml +++ b/src/k2v-client/Cargo.toml @@ -13,3 +13,14 @@ serde = "1.0.137" serde_json = "1.0.81" thiserror = "1.0.31" tokio = "1.17.0" + +# cli deps +clap = { version = "3.1.18", optional = true, features = ["derive", "env"] } + + +[features] +cli = ["clap", "tokio/fs", "tokio/io-std"] + +[[bin]] +name = "k2v-cli" +required-features = ["cli"] diff --git a/src/k2v-client/src/bin/k2v-cli.rs b/src/k2v-client/src/bin/k2v-cli.rs new file mode 100644 index 00000000..4e09697d --- /dev/null +++ b/src/k2v-client/src/bin/k2v-cli.rs @@ -0,0 +1,466 @@ +use k2v_client::*; +use rusoto_core::credential::AwsCredentials; +use rusoto_core::Region; + +use clap::{Parser, Subcommand}; + +/// Simple program to greet a person +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +struct Args { + /// Name of the region to use + #[clap(short, long, env = "AWS_REGION", default_value = "garage")] + region: String, + /// Url of the endpoint to connect to + #[clap(short, long, env = "K2V_ENDPOINT")] + endpoint: String, + /// Access key ID + #[clap(short, long, env = "AWS_ACCESS_KEY_ID")] + key_id: String, + /// Access key ID + #[clap(short, long, env = "AWS_SECRET_ACCESS_KEY")] + secret: String, + /// Bucket name + #[clap(short, long, env = "K2V_BUCKET")] + bucket: String, + #[clap(subcommand)] + command: Command, +} + +#[derive(Subcommand, Debug)] +enum Command { + /// Insert a single value + Insert { + /// Partition key to insert to + partition_key: String, + /// Sort key to insert to + sort_key: String, + /// Causality of the insertion + #[clap(short, long)] + causality: Option, + /// Value to insert + #[clap(flatten)] + value: Value, + }, + /// Read a single value + Read { + /// Partition key to read from + partition_key: String, + /// Sort key to read from + sort_key: String, + /// Output formating + #[clap(flatten)] + output_kind: ReadOutputKind, + }, + /// Delete a single value + Delete { + /// Partition key to delete from + partition_key: String, + /// Sort key to delete from + sort_key: String, + /// Causality information + #[clap(short, long)] + causality: String, + }, + /// List partition keys + ReadIndex { + /// Output formating + #[clap(flatten)] + output_kind: BatchOutputKind, + /// Output only partition keys matching this filter + #[clap(flatten)] + filter: Filter, + }, + /// Read a range of sort keys + ReadRange { + /// Partition key to read from + partition_key: String, + /// Output formating + #[clap(flatten)] + output_kind: BatchOutputKind, + /// Output only sort keys matching this filter + #[clap(flatten)] + filter: Filter, + }, + /// Delete a range of sort keys + DeleteRange { + /// Partition key to delete from + partition_key: String, + /// Output formating + #[clap(flatten)] + output_kind: BatchOutputKind, + /// Delete only sort keys matching this filter + #[clap(flatten)] + filter: Filter, + }, +} + +/// Where to read a value from +#[derive(Parser, Debug)] +#[clap(group = clap::ArgGroup::new("value").multiple(false).required(true))] +struct Value { + /// Read value from a file. use - to read from stdin + #[clap(short, long, group = "value")] + file: Option, + /// Read a base64 value from commandline + #[clap(short, long, group = "value")] + b64: Option, + /// Read a raw (UTF-8) value from the commandline + #[clap(short, long, group = "value")] + text: Option, +} + +impl Value { + async fn to_data(&self) -> Result, Error> { + if let Some(ref text) = self.text { + Ok(text.as_bytes().to_vec()) + } else if let Some(ref b64) = self.b64 { + base64::decode(b64).map_err(|_| Error::Message("invalid base64 input".into())) + } else if let Some(ref path) = self.file { + use tokio::io::AsyncReadExt; + if path == "-" { + let mut file = tokio::io::stdin(); + let mut vec = Vec::new(); + file.read_to_end(&mut vec).await?; + Ok(vec) + } else { + let mut file = tokio::fs::File::open(path).await?; + let mut vec = Vec::new(); + file.read_to_end(&mut vec).await?; + Ok(vec) + } + } else { + unreachable!("Value must have one option set") + } + } +} + +#[derive(Parser, Debug)] +#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))] +struct ReadOutputKind { + /// Base64 output. Conflicts are line separated, first line is causality token + #[clap(short, long, group = "output-kind")] + b64: bool, + /// Raw output. Conflicts generate error, causality token is not returned + #[clap(short, long, group = "output-kind")] + raw: bool, + /// Human formated output + #[clap(short = 'H', long, group = "output-kind")] + human: bool, + /// JSON formated output + #[clap(short, long, group = "output-kind")] + json: bool, +} + +impl ReadOutputKind { + fn display_output(&self, val: CausalValue) -> ! { + use std::io::Write; + use std::process::exit; + + if self.json { + let stdout = std::io::stdout(); + serde_json::to_writer(stdout, &val).unwrap(); + exit(0); + } + + if self.raw { + let mut val = val.value; + if val.len() != 1 { + eprintln!( + "Raw mode can only read non-concurent values, fond {} values, expected 1", + val.len() + ); + exit(1); + } + let val = val.pop().unwrap(); + match val { + K2vValue::Value(v) => { + std::io::stdout().write_all(&v).unwrap(); + exit(0); + } + K2vValue::Tombstone => { + eprintln!("Expected value, found tombstone"); + exit(2); + } + } + } + + let causality: String = val.causality.into(); + if self.b64 { + println!("{}", causality); + for val in val.value { + match val { + K2vValue::Value(v) => { + println!("{}", base64::encode(&v)) + } + K2vValue::Tombstone => { + println!(); + } + } + } + exit(0); + } + + // human + println!("causality: {}", causality); + println!("values:"); + for val in val.value { + match val { + K2vValue::Value(v) => { + if let Ok(string) = std::str::from_utf8(&v) { + println!(" utf-8: {}", string); + } else { + println!(" base64: {}", base64::encode(&v)); + } + } + K2vValue::Tombstone => { + println!(" tombstone"); + } + } + } + exit(0); + } +} + +#[derive(Parser, Debug)] +#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))] +struct BatchOutputKind { + /// Human formated output + #[clap(short = 'H', long, group = "output-kind")] + human: bool, + /// JSON formated output + #[clap(short, long, group = "output-kind")] + json: bool, +} + +/// Filter for batch operations +#[derive(Parser, Debug)] +#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))] +struct Filter { + /// Match only keys starting with this prefix + #[clap(short, long, group = "filter")] + prefix: Option, + /// Match only keys lexicographically after this key (including this key itself) + #[clap(short, long, group = "filter")] + start: Option, + /// Match only keys lexicographically before this key (excluding this key) + #[clap(short, long, group = "filter")] + end: Option, + /// Only match the first X keys + #[clap(short, long)] + limit: Option, + /// Return keys in reverse order + #[clap(short, long)] + reverse: bool, + /// Return only keys where conflict happened + #[clap(short, long)] + conflicts_only: bool, + /// Return only keys storing tombstones + #[clap(short, long)] + tombstones: bool, + /// Return any key + #[clap(short, long, group = "filter")] + all: bool, +} + +impl Filter { + fn k2v_filter(&self) -> k2v_client::Filter<'_> { + k2v_client::Filter { + start: self.start.as_deref(), + end: self.end.as_deref(), + prefix: self.prefix.as_deref(), + limit: self.limit, + reverse: self.reverse, + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let args = Args::parse(); + + let region = Region::Custom { + name: args.region, + endpoint: args.endpoint, + }; + + let creds = AwsCredentials::new(args.key_id, args.secret, None, None); + + let client = K2vClient::new(region, args.bucket, creds, None)?; + + match args.command { + Command::Insert { + partition_key, + sort_key, + causality, + value, + } => { + client + .insert_item( + &partition_key, + &sort_key, + value.to_data().await?, + causality.map(Into::into), + ) + .await?; + } + Command::Delete { + partition_key, + sort_key, + causality, + } => { + client + .delete_item(&partition_key, &sort_key, causality.into()) + .await?; + } + Command::Read { + partition_key, + sort_key, + output_kind, + } => { + let res = client.read_item(&partition_key, &sort_key).await?; + output_kind.display_output(res); + } + Command::ReadIndex { + output_kind, + filter, + } => { + if filter.conflicts_only || filter.tombstones { + return Err(Error::Message( + "conlicts-only and tombstones are invalid for read-index".into(), + )); + } + let res = client.read_index(filter.k2v_filter()).await?; + if output_kind.json { + let values = res + .items + .into_iter() + .map(|(k, v)| { + let mut value = serde_json::to_value(v).unwrap(); + value + .as_object_mut() + .unwrap() + .insert("sort_key".to_owned(), k.into()); + value + }) + .collect::>(); + let json = serde_json::json!({ + "next_key": res.next_start, + "values": values, + }); + + let stdout = std::io::stdout(); + serde_json::to_writer(stdout, &json).unwrap(); + } else { + if let Some(next) = res.next_start { + println!("next key: {}", next); + } + println!("key: entries,conflicts,values,bytes"); + for (k, v) in res.items { + println!( + "{}: {},{},{},{}", + k, v.entries, v.conflicts, v.values, v.bytes + ); + } + } + } + Command::ReadRange { + partition_key, + output_kind, + filter, + } => { + let op = BatchReadOp { + partition_key: &partition_key, + filter: filter.k2v_filter(), + conflicts_only: filter.conflicts_only, + include_tombstones: filter.tombstones, + single_item: false, + }; + let mut res = client.read_batch(&[op]).await?; + let res = res.pop().unwrap(); + if output_kind.json { + let values = res + .items + .into_iter() + .map(|(k, v)| { + let mut value = serde_json::to_value(v).unwrap(); + value + .as_object_mut() + .unwrap() + .insert("sort_key".to_owned(), k.into()); + value + }) + .collect::>(); + let json = serde_json::json!({ + "next_key": res.next_start, + "values": values, + }); + + let stdout = std::io::stdout(); + serde_json::to_writer(stdout, &json).unwrap(); + } else { + if let Some(next) = res.next_start { + println!("next key: {}", next); + } + for (key, values) in res.items { + println!("key: {}", key); + let causality: String = values.causality.into(); + println!("causality: {}", causality); + for value in values.value { + match value { + K2vValue::Value(v) => { + if let Ok(string) = std::str::from_utf8(&v) { + println!(" value(utf-8): {}", string); + } else { + println!(" value(base64): {}", base64::encode(&v)); + } + } + K2vValue::Tombstone => { + println!(" tombstone"); + } + } + } + } + } + } + Command::DeleteRange { + partition_key, + output_kind, + filter, + } => { + let single_item = if let Some(limit) = filter.limit { + if limit == 1 { + true + } else { + return Err(Error::Message( + "limit can only be 1 or no limit for delete-range".into(), + )); + } + } else { + false + }; + let op = BatchDeleteOp { + partition_key: &partition_key, + prefix: filter.prefix.as_deref(), + start: filter.start.as_deref(), + end: filter.end.as_deref(), + single_item, + }; + if filter.reverse || filter.conflicts_only || filter.tombstones { + return Err(Error::Message( + "conlicts-only, reverse and tombstones are invalid for read-index".into(), + )); + } + + let res = client.delete_batch(&[op]).await?; + + if output_kind.json { + println!("{}", res[0]); + } else { + println!("deleted {} keys", res[0]); + } + } + } + + Ok(()) +} diff --git a/src/k2v-client/src/error.rs b/src/k2v-client/src/error.rs index e519e141..62357934 100644 --- a/src/k2v-client/src/error.rs +++ b/src/k2v-client/src/error.rs @@ -17,4 +17,6 @@ pub enum Error { RusotoHttp(#[from] rusoto_core::HttpDispatchError), #[error("deserialization error: {0}")] Deserialization(#[from] serde_json::Error), + #[error("{0}")] + Message(Cow<'static, str>), } diff --git a/src/k2v-client/src/lib.rs b/src/k2v-client/src/lib.rs index a85c3773..c349975e 100644 --- a/src/k2v-client/src/lib.rs +++ b/src/k2v-client/src/lib.rs @@ -117,7 +117,7 @@ impl K2vClient { req.add_param("sort_key", sort_key); req.add_param("causality_token", &causality.0); req.add_param("timeout", &timeout.as_secs().to_string()); - req.add_header(ACCEPT, "application/octet-stream, application/json;q=0.9"); + req.add_header(ACCEPT, "application/octet-stream, application/json"); let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; @@ -197,7 +197,6 @@ impl K2vClient { Ok(()) } - // TODO poke team, draft doc outdated fot the return type of this endpoint /// Perform a ReadIndex request, listing partition key which have at least one associated /// sort key, and which matches the filter. pub async fn read_index( @@ -407,7 +406,7 @@ impl Serialize for K2vValue { } /// A set of K2vValue and associated causality information. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct CausalValue { pub causality: CausalityToken, pub value: Vec, @@ -471,7 +470,7 @@ struct ReadIndexItem { } /// Information about data stored with a given partition key. -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct PartitionInfo { pub entries: u64, pub conflicts: u64, @@ -502,7 +501,7 @@ pub struct BatchReadOp<'a> { #[serde(default)] pub single_item: bool, #[serde(default)] - pub concflicts_only: bool, + pub conflicts_only: bool, #[serde(default)] pub include_tombstones: bool, } diff --git a/src/k2v-client/src/main.rs b/src/k2v-client/src/main.rs deleted file mode 100644 index 7ebeb820..00000000 --- a/src/k2v-client/src/main.rs +++ /dev/null @@ -1,47 +0,0 @@ -use k2v_client::*; -use rusoto_core::credential::{EnvironmentProvider, ProvideAwsCredentials}; -use rusoto_core::Region; - -#[tokio::main] -async fn main() -> Result<(), Error> { - // TODO provide a CLI to perform queries - let region = Region::Custom { - name: "us-east-1".to_owned(), - endpoint: "http://172.30.2.1:3903".to_owned(), - }; - - let creds = EnvironmentProvider::default().credentials().await.unwrap(); - - let client = K2vClient::new(region, "my-bucket".to_owned(), creds, None)?; - - client.insert_item("pk", "sk", vec![0x12], None).await?; - - /* - dbg!(client.read_item("pk", "sk").await?); - - client.delete_item("patate", "patate", "eFmifSwRtcl4WaJ9LBG1ywAAAAAAAAAC".to_owned().into()).await?; - - dbg!(client.read_index(Filter::default()).await?); - - client.insert_batch(&[ - BatchInsertOp { - partition_key: "pk", - sort_key: "sk1", - causality: None, - value: vec![1,2,3].into(), - }, - BatchInsertOp { - partition_key: "pk", - sort_key: "sk2", - causality: None, - value: vec![1,2,4].into(), - }, - ]).await?; - - dbg!(client.read_batch(&[BatchReadOp { partition_key: "pk", ..BatchReadOp::default()}]).await?); - - dbg!(client.delete_batch(&[BatchDeleteOp::new("pk")]).await?); - */ - - Ok(()) -}