and fix some minor issues
This commit is contained in:
trinity-1686a 2022-05-16 18:15:39 +02:00
parent d0e3434104
commit 0deaf882cd
6 changed files with 545 additions and 56 deletions

66
Cargo.lock generated
View file

@ -403,10 +403,49 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"textwrap", "textwrap 0.11.0",
"unicode-width", "unicode-width",
] ]
[[package]]
name = "clap"
version = "3.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2dbdf4bdacb33466e854ce889eee8dfd5729abf7ccd7664d0a2d60cd384440b"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex",
"indexmap",
"lazy_static",
"strsim",
"termcolor",
"textwrap 0.15.0",
]
[[package]]
name = "clap_derive"
version = "3.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25320346e922cffe59c0bbc5410c8d8784509efb321488971081313cb1e1a33c"
dependencies = [
"heck 0.4.0",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a37c35f1112dad5e6e0b1adaff798507497a18fceeb30cceb3bae7d1427b9213"
dependencies = [
"os_str_bytes",
]
[[package]] [[package]]
name = "cloudabi" name = "cloudabi"
version = "0.0.3" version = "0.0.3"
@ -1271,6 +1310,12 @@ dependencies = [
"unicode-segmentation", "unicode-segmentation",
] ]
[[package]]
name = "heck"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
version = "0.1.19" version = "0.1.19"
@ -1548,6 +1593,7 @@ name = "k2v-client"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"base64", "base64",
"clap 3.1.18",
"http", "http",
"rusoto_core", "rusoto_core",
"rusoto_credential", "rusoto_credential",
@ -2092,6 +2138,12 @@ dependencies = [
"num-traits", "num-traits",
] ]
[[package]]
name = "os_str_bytes"
version = "6.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.2" version = "0.11.2"
@ -2341,7 +2393,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.1.0",
"heck", "heck 0.3.3",
"itertools 0.10.3", "itertools 0.10.3",
"lazy_static", "lazy_static",
"log", "log",
@ -2986,7 +3038,7 @@ version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10"
dependencies = [ dependencies = [
"clap", "clap 2.34.0",
"lazy_static", "lazy_static",
"structopt-derive", "structopt-derive",
] ]
@ -2997,7 +3049,7 @@ version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0" checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0"
dependencies = [ dependencies = [
"heck", "heck 0.3.3",
"proc-macro-error", "proc-macro-error",
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -3065,6 +3117,12 @@ dependencies = [
"unicode-width", "unicode-width",
] ]
[[package]]
name = "textwrap"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.31" version = "1.0.31"

View file

@ -13,3 +13,14 @@ serde = "1.0.137"
serde_json = "1.0.81" serde_json = "1.0.81"
thiserror = "1.0.31" thiserror = "1.0.31"
tokio = "1.17.0" tokio = "1.17.0"
# cli deps
clap = { version = "3.1.18", optional = true, features = ["derive", "env"] }
[features]
cli = ["clap", "tokio/fs", "tokio/io-std"]
[[bin]]
name = "k2v-cli"
required-features = ["cli"]

View file

@ -0,0 +1,466 @@
use k2v_client::*;
use rusoto_core::credential::AwsCredentials;
use rusoto_core::Region;
use clap::{Parser, Subcommand};
/// Simple program to greet a person
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
/// Name of the region to use
#[clap(short, long, env = "AWS_REGION", default_value = "garage")]
region: String,
/// Url of the endpoint to connect to
#[clap(short, long, env = "K2V_ENDPOINT")]
endpoint: String,
/// Access key ID
#[clap(short, long, env = "AWS_ACCESS_KEY_ID")]
key_id: String,
/// Access key ID
#[clap(short, long, env = "AWS_SECRET_ACCESS_KEY")]
secret: String,
/// Bucket name
#[clap(short, long, env = "K2V_BUCKET")]
bucket: String,
#[clap(subcommand)]
command: Command,
}
#[derive(Subcommand, Debug)]
enum Command {
/// Insert a single value
Insert {
/// Partition key to insert to
partition_key: String,
/// Sort key to insert to
sort_key: String,
/// Causality of the insertion
#[clap(short, long)]
causality: Option<String>,
/// Value to insert
#[clap(flatten)]
value: Value,
},
/// Read a single value
Read {
/// Partition key to read from
partition_key: String,
/// Sort key to read from
sort_key: String,
/// Output formating
#[clap(flatten)]
output_kind: ReadOutputKind,
},
/// Delete a single value
Delete {
/// Partition key to delete from
partition_key: String,
/// Sort key to delete from
sort_key: String,
/// Causality information
#[clap(short, long)]
causality: String,
},
/// List partition keys
ReadIndex {
/// Output formating
#[clap(flatten)]
output_kind: BatchOutputKind,
/// Output only partition keys matching this filter
#[clap(flatten)]
filter: Filter,
},
/// Read a range of sort keys
ReadRange {
/// Partition key to read from
partition_key: String,
/// Output formating
#[clap(flatten)]
output_kind: BatchOutputKind,
/// Output only sort keys matching this filter
#[clap(flatten)]
filter: Filter,
},
/// Delete a range of sort keys
DeleteRange {
/// Partition key to delete from
partition_key: String,
/// Output formating
#[clap(flatten)]
output_kind: BatchOutputKind,
/// Delete only sort keys matching this filter
#[clap(flatten)]
filter: Filter,
},
}
/// Where to read a value from
#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("value").multiple(false).required(true))]
struct Value {
/// Read value from a file. use - to read from stdin
#[clap(short, long, group = "value")]
file: Option<String>,
/// Read a base64 value from commandline
#[clap(short, long, group = "value")]
b64: Option<String>,
/// Read a raw (UTF-8) value from the commandline
#[clap(short, long, group = "value")]
text: Option<String>,
}
impl Value {
async fn to_data(&self) -> Result<Vec<u8>, Error> {
if let Some(ref text) = self.text {
Ok(text.as_bytes().to_vec())
} else if let Some(ref b64) = self.b64 {
base64::decode(b64).map_err(|_| Error::Message("invalid base64 input".into()))
} else if let Some(ref path) = self.file {
use tokio::io::AsyncReadExt;
if path == "-" {
let mut file = tokio::io::stdin();
let mut vec = Vec::new();
file.read_to_end(&mut vec).await?;
Ok(vec)
} else {
let mut file = tokio::fs::File::open(path).await?;
let mut vec = Vec::new();
file.read_to_end(&mut vec).await?;
Ok(vec)
}
} else {
unreachable!("Value must have one option set")
}
}
}
#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))]
struct ReadOutputKind {
/// Base64 output. Conflicts are line separated, first line is causality token
#[clap(short, long, group = "output-kind")]
b64: bool,
/// Raw output. Conflicts generate error, causality token is not returned
#[clap(short, long, group = "output-kind")]
raw: bool,
/// Human formated output
#[clap(short = 'H', long, group = "output-kind")]
human: bool,
/// JSON formated output
#[clap(short, long, group = "output-kind")]
json: bool,
}
impl ReadOutputKind {
fn display_output(&self, val: CausalValue) -> ! {
use std::io::Write;
use std::process::exit;
if self.json {
let stdout = std::io::stdout();
serde_json::to_writer(stdout, &val).unwrap();
exit(0);
}
if self.raw {
let mut val = val.value;
if val.len() != 1 {
eprintln!(
"Raw mode can only read non-concurent values, fond {} values, expected 1",
val.len()
);
exit(1);
}
let val = val.pop().unwrap();
match val {
K2vValue::Value(v) => {
std::io::stdout().write_all(&v).unwrap();
exit(0);
}
K2vValue::Tombstone => {
eprintln!("Expected value, found tombstone");
exit(2);
}
}
}
let causality: String = val.causality.into();
if self.b64 {
println!("{}", causality);
for val in val.value {
match val {
K2vValue::Value(v) => {
println!("{}", base64::encode(&v))
}
K2vValue::Tombstone => {
println!();
}
}
}
exit(0);
}
// human
println!("causality: {}", causality);
println!("values:");
for val in val.value {
match val {
K2vValue::Value(v) => {
if let Ok(string) = std::str::from_utf8(&v) {
println!(" utf-8: {}", string);
} else {
println!(" base64: {}", base64::encode(&v));
}
}
K2vValue::Tombstone => {
println!(" tombstone");
}
}
}
exit(0);
}
}
#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("output-kind").multiple(false).required(false))]
struct BatchOutputKind {
/// Human formated output
#[clap(short = 'H', long, group = "output-kind")]
human: bool,
/// JSON formated output
#[clap(short, long, group = "output-kind")]
json: bool,
}
/// Filter for batch operations
#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))]
struct Filter {
/// Match only keys starting with this prefix
#[clap(short, long, group = "filter")]
prefix: Option<String>,
/// Match only keys lexicographically after this key (including this key itself)
#[clap(short, long, group = "filter")]
start: Option<String>,
/// Match only keys lexicographically before this key (excluding this key)
#[clap(short, long, group = "filter")]
end: Option<String>,
/// Only match the first X keys
#[clap(short, long)]
limit: Option<u64>,
/// Return keys in reverse order
#[clap(short, long)]
reverse: bool,
/// Return only keys where conflict happened
#[clap(short, long)]
conflicts_only: bool,
/// Return only keys storing tombstones
#[clap(short, long)]
tombstones: bool,
/// Return any key
#[clap(short, long, group = "filter")]
all: bool,
}
impl Filter {
fn k2v_filter(&self) -> k2v_client::Filter<'_> {
k2v_client::Filter {
start: self.start.as_deref(),
end: self.end.as_deref(),
prefix: self.prefix.as_deref(),
limit: self.limit,
reverse: self.reverse,
}
}
}
#[tokio::main]
async fn main() -> Result<(), Error> {
let args = Args::parse();
let region = Region::Custom {
name: args.region,
endpoint: args.endpoint,
};
let creds = AwsCredentials::new(args.key_id, args.secret, None, None);
let client = K2vClient::new(region, args.bucket, creds, None)?;
match args.command {
Command::Insert {
partition_key,
sort_key,
causality,
value,
} => {
client
.insert_item(
&partition_key,
&sort_key,
value.to_data().await?,
causality.map(Into::into),
)
.await?;
}
Command::Delete {
partition_key,
sort_key,
causality,
} => {
client
.delete_item(&partition_key, &sort_key, causality.into())
.await?;
}
Command::Read {
partition_key,
sort_key,
output_kind,
} => {
let res = client.read_item(&partition_key, &sort_key).await?;
output_kind.display_output(res);
}
Command::ReadIndex {
output_kind,
filter,
} => {
if filter.conflicts_only || filter.tombstones {
return Err(Error::Message(
"conlicts-only and tombstones are invalid for read-index".into(),
));
}
let res = client.read_index(filter.k2v_filter()).await?;
if output_kind.json {
let values = res
.items
.into_iter()
.map(|(k, v)| {
let mut value = serde_json::to_value(v).unwrap();
value
.as_object_mut()
.unwrap()
.insert("sort_key".to_owned(), k.into());
value
})
.collect::<Vec<_>>();
let json = serde_json::json!({
"next_key": res.next_start,
"values": values,
});
let stdout = std::io::stdout();
serde_json::to_writer(stdout, &json).unwrap();
} else {
if let Some(next) = res.next_start {
println!("next key: {}", next);
}
println!("key: entries,conflicts,values,bytes");
for (k, v) in res.items {
println!(
"{}: {},{},{},{}",
k, v.entries, v.conflicts, v.values, v.bytes
);
}
}
}
Command::ReadRange {
partition_key,
output_kind,
filter,
} => {
let op = BatchReadOp {
partition_key: &partition_key,
filter: filter.k2v_filter(),
conflicts_only: filter.conflicts_only,
include_tombstones: filter.tombstones,
single_item: false,
};
let mut res = client.read_batch(&[op]).await?;
let res = res.pop().unwrap();
if output_kind.json {
let values = res
.items
.into_iter()
.map(|(k, v)| {
let mut value = serde_json::to_value(v).unwrap();
value
.as_object_mut()
.unwrap()
.insert("sort_key".to_owned(), k.into());
value
})
.collect::<Vec<_>>();
let json = serde_json::json!({
"next_key": res.next_start,
"values": values,
});
let stdout = std::io::stdout();
serde_json::to_writer(stdout, &json).unwrap();
} else {
if let Some(next) = res.next_start {
println!("next key: {}", next);
}
for (key, values) in res.items {
println!("key: {}", key);
let causality: String = values.causality.into();
println!("causality: {}", causality);
for value in values.value {
match value {
K2vValue::Value(v) => {
if let Ok(string) = std::str::from_utf8(&v) {
println!(" value(utf-8): {}", string);
} else {
println!(" value(base64): {}", base64::encode(&v));
}
}
K2vValue::Tombstone => {
println!(" tombstone");
}
}
}
}
}
}
Command::DeleteRange {
partition_key,
output_kind,
filter,
} => {
let single_item = if let Some(limit) = filter.limit {
if limit == 1 {
true
} else {
return Err(Error::Message(
"limit can only be 1 or no limit for delete-range".into(),
));
}
} else {
false
};
let op = BatchDeleteOp {
partition_key: &partition_key,
prefix: filter.prefix.as_deref(),
start: filter.start.as_deref(),
end: filter.end.as_deref(),
single_item,
};
if filter.reverse || filter.conflicts_only || filter.tombstones {
return Err(Error::Message(
"conlicts-only, reverse and tombstones are invalid for read-index".into(),
));
}
let res = client.delete_batch(&[op]).await?;
if output_kind.json {
println!("{}", res[0]);
} else {
println!("deleted {} keys", res[0]);
}
}
}
Ok(())
}

View file

@ -17,4 +17,6 @@ pub enum Error {
RusotoHttp(#[from] rusoto_core::HttpDispatchError), RusotoHttp(#[from] rusoto_core::HttpDispatchError),
#[error("deserialization error: {0}")] #[error("deserialization error: {0}")]
Deserialization(#[from] serde_json::Error), Deserialization(#[from] serde_json::Error),
#[error("{0}")]
Message(Cow<'static, str>),
} }

View file

@ -117,7 +117,7 @@ impl K2vClient {
req.add_param("sort_key", sort_key); req.add_param("sort_key", sort_key);
req.add_param("causality_token", &causality.0); req.add_param("causality_token", &causality.0);
req.add_param("timeout", &timeout.as_secs().to_string()); req.add_param("timeout", &timeout.as_secs().to_string());
req.add_header(ACCEPT, "application/octet-stream, application/json;q=0.9"); req.add_header(ACCEPT, "application/octet-stream, application/json");
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
@ -197,7 +197,6 @@ impl K2vClient {
Ok(()) Ok(())
} }
// TODO poke team, draft doc outdated fot the return type of this endpoint
/// Perform a ReadIndex request, listing partition key which have at least one associated /// Perform a ReadIndex request, listing partition key which have at least one associated
/// sort key, and which matches the filter. /// sort key, and which matches the filter.
pub async fn read_index( pub async fn read_index(
@ -407,7 +406,7 @@ impl Serialize for K2vValue {
} }
/// A set of K2vValue and associated causality information. /// A set of K2vValue and associated causality information.
#[derive(Debug, Clone)] #[derive(Debug, Clone, Serialize)]
pub struct CausalValue { pub struct CausalValue {
pub causality: CausalityToken, pub causality: CausalityToken,
pub value: Vec<K2vValue>, pub value: Vec<K2vValue>,
@ -471,7 +470,7 @@ struct ReadIndexItem {
} }
/// Information about data stored with a given partition key. /// Information about data stored with a given partition key.
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
pub struct PartitionInfo { pub struct PartitionInfo {
pub entries: u64, pub entries: u64,
pub conflicts: u64, pub conflicts: u64,
@ -502,7 +501,7 @@ pub struct BatchReadOp<'a> {
#[serde(default)] #[serde(default)]
pub single_item: bool, pub single_item: bool,
#[serde(default)] #[serde(default)]
pub concflicts_only: bool, pub conflicts_only: bool,
#[serde(default)] #[serde(default)]
pub include_tombstones: bool, pub include_tombstones: bool,
} }

View file

@ -1,47 +0,0 @@
use k2v_client::*;
use rusoto_core::credential::{EnvironmentProvider, ProvideAwsCredentials};
use rusoto_core::Region;
#[tokio::main]
async fn main() -> Result<(), Error> {
// TODO provide a CLI to perform queries
let region = Region::Custom {
name: "us-east-1".to_owned(),
endpoint: "http://172.30.2.1:3903".to_owned(),
};
let creds = EnvironmentProvider::default().credentials().await.unwrap();
let client = K2vClient::new(region, "my-bucket".to_owned(), creds, None)?;
client.insert_item("pk", "sk", vec![0x12], None).await?;
/*
dbg!(client.read_item("pk", "sk").await?);
client.delete_item("patate", "patate", "eFmifSwRtcl4WaJ9LBG1ywAAAAAAAAAC".to_owned().into()).await?;
dbg!(client.read_index(Filter::default()).await?);
client.insert_batch(&[
BatchInsertOp {
partition_key: "pk",
sort_key: "sk1",
causality: None,
value: vec![1,2,3].into(),
},
BatchInsertOp {
partition_key: "pk",
sort_key: "sk2",
causality: None,
value: vec![1,2,4].into(),
},
]).await?;
dbg!(client.read_batch(&[BatchReadOp { partition_key: "pk", ..BatchReadOp::default()}]).await?);
dbg!(client.delete_batch(&[BatchDeleteOp::new("pk")]).await?);
*/
Ok(())
}