add a crate k2v client #303
7 changed files with 1145 additions and 235 deletions
144
Cargo.lock
generated
144
Cargo.lock
generated
|
@ -504,6 +504,16 @@ dependencies = [
|
||||||
"subtle",
|
"subtle",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "crypto-mac"
|
||||||
|
version = "0.11.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b1d1a86f49236c215f271d40892d5fc950490551400b02ef360692c29815c714"
|
||||||
|
dependencies = [
|
||||||
|
"generic-array",
|
||||||
|
"subtle",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ct-logs"
|
name = "ct-logs"
|
||||||
version = "0.8.0"
|
version = "0.8.0"
|
||||||
|
@ -848,7 +858,7 @@ dependencies = [
|
||||||
"garage_web",
|
"garage_web",
|
||||||
"git-version",
|
"git-version",
|
||||||
"hex",
|
"hex",
|
||||||
"hmac",
|
"hmac 0.10.1",
|
||||||
"http",
|
"http",
|
||||||
"hyper",
|
"hyper",
|
||||||
"kuska-sodiumoxide",
|
"kuska-sodiumoxide",
|
||||||
|
@ -904,7 +914,7 @@ dependencies = [
|
||||||
"garage_table 0.7.0",
|
"garage_table 0.7.0",
|
||||||
"garage_util 0.7.0",
|
"garage_util 0.7.0",
|
||||||
"hex",
|
"hex",
|
||||||
"hmac",
|
"hmac 0.10.1",
|
||||||
"http",
|
"http",
|
||||||
"http-range",
|
"http-range",
|
||||||
"httpdate 0.3.2",
|
"httpdate 0.3.2",
|
||||||
|
@ -1296,6 +1306,16 @@ dependencies = [
|
||||||
"digest",
|
"digest",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hmac"
|
||||||
|
version = "0.11.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b"
|
||||||
|
dependencies = [
|
||||||
|
"crypto-mac 0.11.1",
|
||||||
|
"digest",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "http"
|
name = "http"
|
||||||
version = "0.2.6"
|
version = "0.2.6"
|
||||||
|
@ -1523,6 +1543,21 @@ dependencies = [
|
||||||
"serde_json",
|
"serde_json",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "k2v-client"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"base64",
|
||||||
|
"http",
|
||||||
|
"rusoto_core",
|
||||||
|
"rusoto_credential",
|
||||||
|
"rusoto_signature",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"thiserror",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "k8s-openapi"
|
name = "k8s-openapi"
|
||||||
version = "0.13.1"
|
version = "0.13.1"
|
||||||
|
@ -2533,6 +2568,75 @@ dependencies = [
|
||||||
"xmlparser",
|
"xmlparser",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rusoto_core"
|
||||||
|
version = "0.48.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1db30db44ea73551326269adcf7a2169428a054f14faf9e1768f2163494f2fa2"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
"base64",
|
||||||
|
"bytes 1.1.0",
|
||||||
|
"crc32fast",
|
||||||
|
"futures",
|
||||||
|
"http",
|
||||||
|
"hyper",
|
||||||
|
"hyper-tls",
|
||||||
|
"lazy_static",
|
||||||
|
"log",
|
||||||
|
"rusoto_credential",
|
||||||
|
"rusoto_signature",
|
||||||
|
"rustc_version",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"tokio",
|
||||||
|
"xml-rs",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rusoto_credential"
|
||||||
|
version = "0.48.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ee0a6c13db5aad6047b6a44ef023dbbc21a056b6dab5be3b79ce4283d5c02d05"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
"chrono",
|
||||||
|
"dirs-next",
|
||||||
|
"futures",
|
||||||
|
"hyper",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"shlex",
|
||||||
|
"tokio",
|
||||||
|
"zeroize",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rusoto_signature"
|
||||||
|
version = "0.48.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a5ae95491c8b4847931e291b151127eccd6ff8ca13f33603eb3d0035ecb05272"
|
||||||
|
dependencies = [
|
||||||
|
"base64",
|
||||||
|
"bytes 1.1.0",
|
||||||
|
"chrono",
|
||||||
|
"digest",
|
||||||
|
"futures",
|
||||||
|
"hex",
|
||||||
|
"hmac 0.11.0",
|
||||||
|
"http",
|
||||||
|
"hyper",
|
||||||
|
"log",
|
||||||
|
"md-5",
|
||||||
|
"percent-encoding",
|
||||||
|
"pin-project-lite",
|
||||||
|
"rusoto_credential",
|
||||||
|
"rustc_version",
|
||||||
|
"serde",
|
||||||
|
"sha2",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustc_version"
|
name = "rustc_version"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
|
@ -2669,9 +2773,9 @@ checksum = "a4a3381e03edd24287172047536f20cabde766e2cd3e65e6b00fb3af51c4f38d"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde"
|
name = "serde"
|
||||||
version = "1.0.136"
|
version = "1.0.137"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789"
|
checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
]
|
]
|
||||||
|
@ -2697,9 +2801,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_derive"
|
name = "serde_derive"
|
||||||
version = "1.0.136"
|
version = "1.0.137"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9"
|
checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
@ -2719,9 +2823,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_json"
|
name = "serde_json"
|
||||||
version = "1.0.79"
|
version = "1.0.81"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95"
|
checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"indexmap",
|
"indexmap",
|
||||||
"itoa",
|
"itoa",
|
||||||
|
@ -2754,6 +2858,12 @@ dependencies = [
|
||||||
"opaque-debug",
|
"opaque-debug",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "shlex"
|
||||||
|
version = "1.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "signal-hook-registry"
|
name = "signal-hook-registry"
|
||||||
version = "1.4.0"
|
version = "1.4.0"
|
||||||
|
@ -2902,9 +3012,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "syn"
|
name = "syn"
|
||||||
version = "1.0.89"
|
version = "1.0.94"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54"
|
checksum = "a07e33e919ebcd69113d5be0e4d70c5707004ff45188910106854f38b960df4a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
@ -2957,18 +3067,18 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "thiserror"
|
name = "thiserror"
|
||||||
version = "1.0.30"
|
version = "1.0.31"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417"
|
checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"thiserror-impl",
|
"thiserror-impl",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "thiserror-impl"
|
name = "thiserror-impl"
|
||||||
version = "1.0.30"
|
version = "1.0.31"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b"
|
checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
@ -3535,6 +3645,12 @@ version = "0.32.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316"
|
checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "xml-rs"
|
||||||
|
version = "0.8.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "xmlparser"
|
name = "xmlparser"
|
||||||
version = "0.13.3"
|
version = "0.13.3"
|
||||||
|
|
|
@ -8,7 +8,8 @@ members = [
|
||||||
"src/admin",
|
"src/admin",
|
||||||
"src/api",
|
"src/api",
|
||||||
"src/web",
|
"src/web",
|
||||||
"src/garage"
|
"src/garage",
|
||||||
|
"src/k2v-client",
|
||||||
]
|
]
|
||||||
trinity-1686a marked this conversation as resolved
|
|||||||
|
|
||||||
[profile.dev]
|
[profile.dev]
|
||||||
|
|
15
src/k2v-client/Cargo.toml
Normal file
15
src/k2v-client/Cargo.toml
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
[package]
|
||||||
|
name = "k2v-client"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
base64 = "0.13.0"
|
||||||
|
http = "0.2.6"
|
||||||
|
rusoto_core = "0.48.0"
|
||||||
|
rusoto_credential = "0.48.0"
|
||||||
|
rusoto_signature = "0.48.0"
|
||||||
|
serde = "1.0.137"
|
||||||
|
serde_json = "1.0.81"
|
||||||
|
thiserror = "1.0.31"
|
||||||
|
tokio = "1.17.0"
|
20
src/k2v-client/src/error.rs
Normal file
20
src/k2v-client/src/error.rs
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
use std::borrow::Cow;
|
||||||
|
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
/// Errors returned by this crate
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("received invalid response: {0}")]
|
||||||
|
InvalidResponse(Cow<'static, str>),
|
||||||
|
#[error("not found")]
|
||||||
|
NotFound,
|
||||||
|
#[error("io error: {0}")]
|
||||||
|
IoError(#[from] std::io::Error),
|
||||||
|
#[error("rusoto tls error: {0}")]
|
||||||
|
RusotoTls(#[from] rusoto_core::request::TlsError),
|
||||||
|
#[error("rusoto http error: {0}")]
|
||||||
|
RusotoHttp(#[from] rusoto_core::HttpDispatchError),
|
||||||
|
#[error("deserialization error: {0}")]
|
||||||
|
Deserialization(#[from] serde_json::Error),
|
||||||
|
}
|
570
src/k2v-client/src/lib.rs
Normal file
570
src/k2v-client/src/lib.rs
Normal file
|
@ -0,0 +1,570 @@
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE};
|
||||||
|
use http::status::StatusCode;
|
||||||
|
use http::HeaderMap;
|
||||||
|
|
||||||
|
use rusoto_core::{ByteStream, DispatchSignedRequest, HttpClient};
|
||||||
|
use rusoto_credential::AwsCredentials;
|
||||||
|
use rusoto_signature::region::Region;
|
||||||
|
use rusoto_signature::signature::SignedRequest;
|
||||||
|
use serde::de::Error as DeError;
|
||||||
|
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||||
|
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
mod error;
|
||||||
|
|
||||||
|
pub use error::Error;
|
||||||
|
|
||||||
|
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
|
const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300);
|
||||||
|
const SERVICE: &str = "k2v";
|
||||||
|
const GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
|
||||||
|
|
||||||
|
/// Client used to query a K2V server.
|
||||||
|
pub struct K2vClient {
|
||||||
|
region: Region,
|
||||||
|
bucket: String,
|
||||||
|
creds: AwsCredentials,
|
||||||
|
client: HttpClient,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl K2vClient {
|
||||||
|
/// Create a new K2V client.
|
||||||
|
pub fn new(
|
||||||
|
region: Region,
|
||||||
|
bucket: String,
|
||||||
|
creds: AwsCredentials,
|
||||||
|
user_agent: Option<String>,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
let mut client = HttpClient::new()?;
|
||||||
|
if let Some(ua) = user_agent {
|
||||||
|
client.local_agent_prepend(ua);
|
||||||
|
} else {
|
||||||
|
client.local_agent_prepend(format!("k2v/{}", env!("CARGO_PKG_VERSION")));
|
||||||
|
}
|
||||||
|
Ok(K2vClient {
|
||||||
|
region,
|
||||||
|
bucket,
|
||||||
|
creds,
|
||||||
|
client,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Perform a ReadItem request, reading the value(s) stored for a single pk+sk.
|
||||||
|
pub async fn read_item(
|
||||||
|
&self,
|
||||||
|
partition_key: &str,
|
||||||
|
sort_key: &str,
|
||||||
|
) -> Result<CausalValue, Error> {
|
||||||
|
let mut req = SignedRequest::new(
|
||||||
|
"GET",
|
||||||
|
SERVICE,
|
||||||
|
&self.region,
|
||||||
|
&format!("/{}/{}", self.bucket, partition_key),
|
||||||
|
);
|
||||||
|
req.add_param("sort_key", sort_key);
|
||||||
|
req.add_header(ACCEPT, "application/octet-stream, application/json");
|
||||||
|
|
||||||
|
let res = self.dispatch(req, None).await?;
|
||||||
|
|
||||||
|
let causality = res
|
||||||
|
.causality_token
|
||||||
|
.ok_or_else(|| Error::InvalidResponse("missing causality token".into()))?;
|
||||||
|
|
||||||
|
if res.status == StatusCode::NO_CONTENT {
|
||||||
|
return Ok(CausalValue {
|
||||||
|
causality,
|
||||||
|
value: vec![K2vValue::Tombstone],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
match res.content_type.as_deref() {
|
||||||
|
Some("application/octet-stream") => Ok(CausalValue {
|
||||||
|
causality,
|
||||||
|
value: vec![K2vValue::Value(res.body)],
|
||||||
|
}),
|
||||||
|
Some("application/json") => {
|
||||||
|
let value = serde_json::from_slice(&res.body)?;
|
||||||
|
Ok(CausalValue { causality, value })
|
||||||
|
}
|
||||||
|
Some(ct) => Err(Error::InvalidResponse(
|
||||||
|
format!("invalid content type: {}", ct).into(),
|
||||||
|
)),
|
||||||
|
None => Err(Error::InvalidResponse("missing content type".into())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Perform a PollItem request, waiting for the value(s) stored for a single pk+sk to be
|
||||||
|
/// updated.
|
||||||
|
pub async fn poll_item(
|
||||||
|
&self,
|
||||||
|
partition_key: &str,
|
||||||
|
sort_key: &str,
|
||||||
|
causality: CausalityToken,
|
||||||
|
timeout: Option<Duration>,
|
||||||
|
) -> Result<Option<CausalValue>, Error> {
|
||||||
|
let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
|
||||||
|
|
||||||
|
let mut req = SignedRequest::new(
|
||||||
|
"GET",
|
||||||
|
SERVICE,
|
||||||
|
&self.region,
|
||||||
|
&format!("/{}/{}", self.bucket, partition_key),
|
||||||
|
);
|
||||||
|
req.add_param("sort_key", sort_key);
|
||||||
|
req.add_param("causality_token", &causality.0);
|
||||||
|
req.add_param("timeout", &timeout.as_secs().to_string());
|
||||||
|
req.add_header(ACCEPT, "application/octet-stream, application/json;q=0.9");
|
||||||
lx marked this conversation as resolved
Outdated
lx
commented
The server doesn't handle The server doesn't handle `q=0.9`, this probably breaks k2v in its current state. In all cases when both are specified, `application/octet-stream` is already preferred when possible.
trinity-1686a
commented
I noticed when testing, but apparently I forgot to commit I noticed when testing, but apparently I forgot to commit
|
|||||||
|
|
||||||
|
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
|
||||||
|
|
||||||
|
let causality = res
|
||||||
|
.causality_token
|
||||||
|
.ok_or_else(|| Error::InvalidResponse("missing causality token".into()))?;
|
||||||
|
|
||||||
|
if res.status == StatusCode::NOT_MODIFIED {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.status == StatusCode::NO_CONTENT {
|
||||||
|
return Ok(Some(CausalValue {
|
||||||
|
causality,
|
||||||
|
value: vec![K2vValue::Tombstone],
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
match res.content_type.as_deref() {
|
||||||
|
Some("application/octet-stream") => Ok(Some(CausalValue {
|
||||||
|
causality,
|
||||||
|
value: vec![K2vValue::Value(res.body)],
|
||||||
|
})),
|
||||||
|
Some("application/json") => {
|
||||||
|
let value = serde_json::from_slice(&res.body)?;
|
||||||
|
Ok(Some(CausalValue { causality, value }))
|
||||||
|
}
|
||||||
|
Some(ct) => Err(Error::InvalidResponse(
|
||||||
|
format!("invalid content type: {}", ct).into(),
|
||||||
|
)),
|
||||||
|
None => Err(Error::InvalidResponse("missing content type".into())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Perform an InsertItem request, inserting a value for a single pk+sk.
|
||||||
|
pub async fn insert_item(
|
||||||
|
&self,
|
||||||
|
partition_key: &str,
|
||||||
|
sort_key: &str,
|
||||||
|
value: Vec<u8>,
|
||||||
|
causality: Option<CausalityToken>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut req = SignedRequest::new(
|
||||||
|
"PUT",
|
||||||
|
SERVICE,
|
||||||
|
&self.region,
|
||||||
|
&format!("/{}/{}", self.bucket, partition_key),
|
||||||
|
);
|
||||||
|
req.add_param("sort_key", sort_key);
|
||||||
|
req.set_payload(Some(value));
|
||||||
|
|
||||||
|
if let Some(causality) = causality {
|
||||||
|
req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.dispatch(req, None).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Perform a DeleteItem request, deleting the value(s) stored for a single pk+sk.
|
||||||
|
pub async fn delete_item(
|
||||||
|
&self,
|
||||||
|
partition_key: &str,
|
||||||
|
sort_key: &str,
|
||||||
|
causality: CausalityToken,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut req = SignedRequest::new(
|
||||||
|
"DELETE",
|
||||||
|
SERVICE,
|
||||||
|
&self.region,
|
||||||
|
&format!("/{}/{}", self.bucket, partition_key),
|
||||||
|
);
|
||||||
|
req.add_param("sort_key", sort_key);
|
||||||
|
req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0);
|
||||||
|
|
||||||
|
self.dispatch(req, None).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO poke team, draft doc outdated fot the return type of this endpoint
|
||||||
lx marked this conversation as resolved
lx
commented
thx thx
|
|||||||
|
/// Perform a ReadIndex request, listing partition key which have at least one associated
|
||||||
|
/// sort key, and which matches the filter.
|
||||||
|
pub async fn read_index<'a>(
|
||||||
lx marked this conversation as resolved
Outdated
trinity-1686a
commented
The documentation says the response is
but actually partitionKeys values are The documentation says the response is
```json
{
prefix: null,
start: null,
end: null,
limit: null,
reverse: false,
partitionKeys: [
{ pk: "keys", n: 3043 },
{ pk: "mailbox:INBOX", n: 42 },
{ pk: "mailbox:Junk", n: 2991 },
{ pk: "mailbox:Trash", n: 10 },
{ pk: "mailboxes", n: 3 },
],
more: false,
nextStart: null,
}
```
but actually partitionKeys values are `{pk: "keys", entries: 1, conflicts: 1, values: 1, bytes: 1}`
|
|||||||
|
&self,
|
||||||
|
filter: Filter<'a>,
|
||||||
|
) -> Result<PaginatedRange<PartitionInfo>, Error> {
|
||||||
|
let mut req =
|
||||||
|
SignedRequest::new("GET", SERVICE, &self.region, &format!("/{}", self.bucket));
|
||||||
|
filter.insert_params(&mut req);
|
||||||
|
|
||||||
|
let res = self.dispatch(req, None).await?;
|
||||||
|
|
||||||
|
let resp: ReadIndexResponse = serde_json::from_slice(&res.body)?;
|
||||||
|
|
||||||
|
let items = resp
|
||||||
|
.partition_keys
|
||||||
|
.into_iter()
|
||||||
|
.map(|ReadIndexItem { pk, info }| (pk, info))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Ok(PaginatedRange {
|
||||||
|
items,
|
||||||
|
next_start: resp.next_start,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Perform an InsertBatch request, inserting multiple values at once. Note: this operation is
|
||||||
|
/// *not* atomic: it is possible for some sub-operations to fails and others to success. In
|
||||||
|
/// that case, failure is reported.
|
||||||
|
pub async fn insert_batch<'a>(&self, operations: &[BatchInsertOp<'a>]) -> Result<(), Error> {
|
||||||
|
let mut req =
|
||||||
|
SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
|
||||||
|
|
||||||
|
let payload = serde_json::to_vec(operations)?;
|
||||||
|
req.set_payload(Some(payload));
|
||||||
|
self.dispatch(req, None).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Perform a ReadBatch request, reading multiple values or range of values at once.
|
||||||
|
pub async fn read_batch<'a>(
|
||||||
|
&self,
|
||||||
|
operations: &[BatchReadOp<'a>],
|
||||||
|
) -> Result<Vec<PaginatedRange<CausalValue>>, Error> {
|
||||||
|
let mut req =
|
||||||
|
SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
|
||||||
|
req.add_param("search", "");
|
||||||
|
|
||||||
|
let payload = serde_json::to_vec(operations)?;
|
||||||
|
req.set_payload(Some(payload));
|
||||||
|
let res = self.dispatch(req, None).await?;
|
||||||
|
|
||||||
|
let resp: Vec<BatchReadResponse> = serde_json::from_slice(&res.body)?;
|
||||||
|
|
||||||
|
Ok(resp
|
||||||
|
.into_iter()
|
||||||
|
.map(|e| PaginatedRange {
|
||||||
|
items: e
|
||||||
|
.items
|
||||||
|
.into_iter()
|
||||||
|
.map(|BatchReadItem { sk, ct, v }| {
|
||||||
|
(
|
||||||
|
sk,
|
||||||
|
CausalValue {
|
||||||
|
causality: ct,
|
||||||
|
value: v,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
next_start: e.next_start,
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without
|
||||||
|
/// providing causality information.
|
||||||
|
pub async fn delete_batch<'a>(
|
||||||
|
&self,
|
||||||
|
operations: &[BatchDeleteOp<'a>],
|
||||||
|
) -> Result<Vec<u64>, Error> {
|
||||||
|
let mut req =
|
||||||
|
SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
|
||||||
|
req.add_param("delete", "");
|
||||||
|
|
||||||
|
let payload = serde_json::to_vec(operations)?;
|
||||||
|
req.set_payload(Some(payload));
|
||||||
|
let res = self.dispatch(req, None).await?;
|
||||||
|
|
||||||
|
let resp: Vec<BatchDeleteResponse> = serde_json::from_slice(&res.body)?;
|
||||||
|
|
||||||
|
Ok(resp.into_iter().map(|r| r.deleted_items).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn dispatch(
|
||||||
|
&self,
|
||||||
|
mut req: SignedRequest,
|
||||||
|
timeout: Option<Duration>,
|
||||||
|
) -> Result<Response, Error> {
|
||||||
|
req.sign(&self.creds);
|
||||||
|
let mut res = self
|
||||||
|
.client
|
||||||
|
.dispatch(req, Some(timeout.unwrap_or(DEFAULT_TIMEOUT)))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let causality_token = res
|
||||||
|
.headers
|
||||||
|
.remove(GARAGE_CAUSALITY_TOKEN)
|
||||||
|
.map(CausalityToken);
|
||||||
|
let content_type = res.headers.remove(CONTENT_TYPE);
|
||||||
|
|
||||||
|
let body = match res.status {
|
||||||
|
StatusCode::OK => read_body(&mut res.headers, res.body).await?,
|
||||||
|
StatusCode::NO_CONTENT => Vec::new(),
|
||||||
|
StatusCode::NOT_FOUND => return Err(Error::NotFound),
|
||||||
|
StatusCode::NOT_MODIFIED => Vec::new(),
|
||||||
|
_ => {
|
||||||
|
return Err(Error::InvalidResponse(
|
||||||
|
format!("invalid error code: {}", res.status).into(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Response {
|
||||||
|
body,
|
||||||
|
status: res.status,
|
||||||
|
causality_token,
|
||||||
|
content_type,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_body(headers: &mut HeaderMap<String>, body: ByteStream) -> Result<Vec<u8>, Error> {
|
||||||
|
let body_len = headers
|
||||||
|
.get(CONTENT_LENGTH)
|
||||||
|
.and_then(|h| h.parse().ok())
|
||||||
|
.unwrap_or(0);
|
||||||
|
let mut res = Vec::with_capacity(body_len);
|
||||||
|
body.into_async_read().read_to_end(&mut res).await?;
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An opaque token used to convey causality between operations.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
|
||||||
|
#[serde(transparent)]
|
||||||
|
pub struct CausalityToken(String);
|
||||||
|
|
||||||
|
impl From<String> for CausalityToken {
|
||||||
|
fn from(v: String) -> Self {
|
||||||
|
CausalityToken(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<CausalityToken> for String {
|
||||||
|
fn from(v: CausalityToken) -> Self {
|
||||||
|
v.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A value in K2V. can be either a binary value, or a tombstone.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub enum K2vValue {
|
||||||
|
Tombstone,
|
||||||
|
Value(Vec<u8>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Vec<u8>> for K2vValue {
|
||||||
|
fn from(v: Vec<u8>) -> Self {
|
||||||
|
K2vValue::Value(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Option<Vec<u8>>> for K2vValue {
|
||||||
|
fn from(v: Option<Vec<u8>>) -> Self {
|
||||||
|
match v {
|
||||||
|
Some(v) => K2vValue::Value(v),
|
||||||
|
None => K2vValue::Tombstone,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'de> Deserialize<'de> for K2vValue {
|
||||||
|
fn deserialize<D>(d: D) -> Result<Self, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let val: Option<&str> = Option::deserialize(d)?;
|
||||||
|
Ok(match val {
|
||||||
|
Some(s) => {
|
||||||
|
K2vValue::Value(base64::decode(s).map_err(|_| DeError::custom("invalid base64"))?)
|
||||||
|
}
|
||||||
|
None => K2vValue::Tombstone,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Serialize for K2vValue {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
match self {
|
||||||
|
K2vValue::Tombstone => serializer.serialize_none(),
|
||||||
|
K2vValue::Value(v) => {
|
||||||
|
let b64 = base64::encode(v);
|
||||||
|
serializer.serialize_str(&b64)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A set of K2vValue and associated causality information.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct CausalValue {
|
||||||
|
pub causality: CausalityToken,
|
||||||
|
pub value: Vec<K2vValue>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Result of paginated requests.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct PaginatedRange<V> {
|
||||||
|
pub items: BTreeMap<String, V>,
|
||||||
|
pub next_start: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Filter for batch operations.
|
||||||
|
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
|
||||||
|
pub struct Filter<'a> {
|
||||||
|
pub start: Option<&'a str>,
|
||||||
|
pub end: Option<&'a str>,
|
||||||
|
pub prefix: Option<&'a str>,
|
||||||
|
pub limit: Option<u64>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub reverse: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Filter<'a> {
|
||||||
|
fn insert_params(&self, req: &mut SignedRequest) {
|
||||||
|
if let Some(start) = &self.start {
|
||||||
|
req.add_param("start", start);
|
||||||
|
}
|
||||||
|
if let Some(end) = &self.end {
|
||||||
|
req.add_param("end", end);
|
||||||
|
}
|
||||||
|
if let Some(prefix) = &self.prefix {
|
||||||
|
req.add_param("prefix", prefix);
|
||||||
|
}
|
||||||
|
if let Some(limit) = &self.limit {
|
||||||
|
req.add_param("limit", &limit.to_string());
|
||||||
|
}
|
||||||
|
if self.reverse {
|
||||||
|
req.add_param("reverse", "true");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct ReadIndexResponse<'a> {
|
||||||
|
#[serde(flatten, borrow)]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
filter: Filter<'a>,
|
||||||
|
partition_keys: Vec<ReadIndexItem>,
|
||||||
|
#[allow(dead_code)]
|
||||||
|
more: bool,
|
||||||
|
next_start: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
struct ReadIndexItem {
|
||||||
|
pk: String,
|
||||||
|
#[serde(flatten)]
|
||||||
|
info: PartitionInfo,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Information about data stored with a given partition key.
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
pub struct PartitionInfo {
|
||||||
|
pub entries: u64,
|
||||||
|
pub conflicts: u64,
|
||||||
|
pub values: u64,
|
||||||
|
pub bytes: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Single sub-operation of an InsertBatch.
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct BatchInsertOp<'a> {
|
||||||
|
#[serde(rename = "pk")]
|
||||||
|
pub partition_key: &'a str,
|
||||||
|
#[serde(rename = "sk")]
|
||||||
|
pub sort_key: &'a str,
|
||||||
|
#[serde(rename = "ct")]
|
||||||
|
pub causality: Option<CausalityToken>,
|
||||||
|
#[serde(rename = "v")]
|
||||||
|
pub value: K2vValue,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Single sub-operation of a ReadBatch.
|
||||||
|
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct BatchReadOp<'a> {
|
||||||
|
pub partition_key: &'a str,
|
||||||
|
#[serde(flatten, borrow)]
|
||||||
|
pub filter: Filter<'a>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub single_item: bool,
|
||||||
lx
commented
This field isn't called This field isn't called `include_tombstones` but just `tombstones` (currently the `-t` flag is not working in k2v-cli)
|
|||||||
|
#[serde(default)]
|
||||||
|
pub concflicts_only: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
pub include_tombstones: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct BatchReadResponse<'a> {
|
||||||
|
#[serde(flatten, borrow)]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
op: BatchReadOp<'a>,
|
||||||
|
items: Vec<BatchReadItem>,
|
||||||
|
#[allow(dead_code)]
|
||||||
|
more: bool,
|
||||||
|
next_start: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
struct BatchReadItem {
|
||||||
|
sk: String,
|
||||||
|
ct: CausalityToken,
|
||||||
|
v: Vec<K2vValue>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Single sub-operation of a DeleteBatch
|
||||||
|
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct BatchDeleteOp<'a> {
|
||||||
|
pub partition_key: &'a str,
|
||||||
|
pub prefix: Option<&'a str>,
|
||||||
|
pub start: Option<&'a str>,
|
||||||
|
pub end: Option<&'a str>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub single_item: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> BatchDeleteOp<'a> {
|
||||||
|
pub fn new(partition_key: &'a str) -> Self {
|
||||||
|
BatchDeleteOp {
|
||||||
|
partition_key,
|
||||||
|
prefix: None,
|
||||||
|
start: None,
|
||||||
|
end: None,
|
||||||
|
single_item: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
struct BatchDeleteResponse<'a> {
|
||||||
|
#[serde(flatten, borrow)]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
filter: BatchDeleteOp<'a>,
|
||||||
|
deleted_items: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Response {
|
||||||
|
body: Vec<u8>,
|
||||||
|
status: StatusCode,
|
||||||
|
causality_token: Option<CausalityToken>,
|
||||||
|
content_type: Option<String>,
|
||||||
|
}
|
47
src/k2v-client/src/main.rs
Normal file
47
src/k2v-client/src/main.rs
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
use k2v_client::*;
|
||||||
|
use rusoto_core::credential::{EnvironmentProvider, ProvideAwsCredentials};
|
||||||
|
use rusoto_core::Region;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Error> {
|
||||||
|
// TODO provide a CLI to perform queries
|
||||||
|
let region = Region::Custom {
|
||||||
|
name: "us-east-1".to_owned(),
|
||||||
|
endpoint: "http://172.30.2.1:3903".to_owned(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let creds = EnvironmentProvider::default().credentials().await.unwrap();
|
||||||
|
|
||||||
|
let client = K2vClient::new(region, "my-bucket".to_owned(), creds, None)?;
|
||||||
|
|
||||||
|
client.insert_item("pk", "sk", vec![0x12], None).await?;
|
||||||
|
|
||||||
|
/*
|
||||||
|
dbg!(client.read_item("pk", "sk").await?);
|
||||||
|
|
||||||
|
client.delete_item("patate", "patate", "eFmifSwRtcl4WaJ9LBG1ywAAAAAAAAAC".to_owned().into()).await?;
|
||||||
|
|
||||||
|
dbg!(client.read_index(Filter::default()).await?);
|
||||||
|
|
||||||
|
client.insert_batch(&[
|
||||||
|
BatchInsertOp {
|
||||||
|
partition_key: "pk",
|
||||||
|
sort_key: "sk1",
|
||||||
|
causality: None,
|
||||||
|
value: vec![1,2,3].into(),
|
||||||
|
},
|
||||||
|
BatchInsertOp {
|
||||||
|
partition_key: "pk",
|
||||||
|
sort_key: "sk2",
|
||||||
|
causality: None,
|
||||||
|
value: vec![1,2,4].into(),
|
||||||
|
},
|
||||||
|
]).await?;
|
||||||
|
|
||||||
|
dbg!(client.read_batch(&[BatchReadOp { partition_key: "pk", ..BatchReadOp::default()}]).await?);
|
||||||
|
|
||||||
|
dbg!(client.delete_batch(&[BatchDeleteOp::new("pk")]).await?);
|
||||||
|
*/
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
Loading…
Reference in a new issue
In the current state this breaks
cargo run
in the root project directory:To fix this, just add: