K2V #293
10 changed files with 396 additions and 41 deletions
13
Cargo.lock
generated
13
Cargo.lock
generated
|
@ -29,6 +29,16 @@ version = "0.5.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
|
||||
|
||||
[[package]]
|
||||
name = "assert-json-diff"
|
||||
version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "50f1c3703dd33532d7f0ca049168930e9099ecac238e23cf932f3a69c42f06da"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
version = "0.3.3"
|
||||
|
@ -821,8 +831,10 @@ dependencies = [
|
|||
name = "garage"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"assert-json-diff",
|
||||
"async-trait",
|
||||
"aws-sdk-s3",
|
||||
"base64",
|
||||
"bytes 1.1.0",
|
||||
"chrono",
|
||||
"futures",
|
||||
|
@ -846,6 +858,7 @@ dependencies = [
|
|||
"rmp-serde 0.15.5",
|
||||
"serde",
|
||||
"serde_bytes",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"sled",
|
||||
"static_init",
|
||||
|
|
|
@ -183,6 +183,6 @@ pub async fn handle_delete_item(
|
|||
.await?;
|
||||
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(Body::empty())?)
|
||||
}
|
||||
|
|
|
@ -63,3 +63,6 @@ hyper = { version = "0.14", features = ["client", "http1", "runtime"] }
|
|||
sha2 = "0.9"
|
||||
|
||||
static_init = "1.0"
|
||||
assert-json-diff = "2.0"
|
||||
serde_json = "1.0"
|
||||
base64 = "0.13"
|
||||
|
|
|
@ -81,8 +81,8 @@ impl<'a> RequestBuilder<'a> {
|
|||
self
|
||||
}
|
||||
|
||||
pub fn path(&mut self, path: String) -> &mut Self {
|
||||
self.path = path;
|
||||
pub fn path(&mut self, path: impl ToString) -> &mut Self {
|
||||
self.path = path.to_string();
|
||||
self
|
||||
}
|
||||
|
||||
|
@ -92,8 +92,12 @@ impl<'a> RequestBuilder<'a> {
|
|||
}
|
||||
|
||||
pub fn query_param<T, U>(&mut self, param: T, value: Option<U>) -> &mut Self
|
||||
where T: ToString, U: ToString, {
|
||||
self.query_params.insert(param.to_string(), value.as_ref().map(ToString::to_string));
|
||||
where
|
||||
T: ToString,
|
||||
U: ToString,
|
||||
{
|
||||
self.query_params
|
||||
.insert(param.to_string(), value.as_ref().map(ToString::to_string));
|
||||
self
|
||||
}
|
||||
|
||||
|
@ -103,7 +107,8 @@ impl<'a> RequestBuilder<'a> {
|
|||
}
|
||||
|
||||
pub fn signed_header(&mut self, name: impl ToString, value: impl ToString) -> &mut Self {
|
||||
self.signed_headers.insert(name.to_string(), value.to_string());
|
||||
self.signed_headers
|
||||
.insert(name.to_string(), value.to_string());
|
||||
self
|
||||
}
|
||||
|
||||
|
@ -113,7 +118,8 @@ impl<'a> RequestBuilder<'a> {
|
|||
}
|
||||
|
||||
pub fn unsigned_header(&mut self, name: impl ToString, value: impl ToString) -> &mut Self {
|
||||
self.unsigned_headers.insert(name.to_string(), value.to_string());
|
||||
self.unsigned_headers
|
||||
.insert(name.to_string(), value.to_string());
|
||||
self
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ impl Context {
|
|||
custom_request,
|
||||
k2v: K2VContext {
|
||||
request: k2v_request,
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
333
src/garage/tests/k2v/item.rs
Normal file
333
src/garage/tests/k2v/item.rs
Normal file
|
@ -0,0 +1,333 @@
|
|||
use crate::common;
|
||||
|
||||
use assert_json_diff::assert_json_eq;
|
||||
use serde_json::json;
|
||||
|
||||
use super::json_body;
|
||||
use hyper::Method;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_items_and_indices() {
|
||||
let ctx = common::context();
|
||||
let bucket = ctx.create_bucket("test-k2v-item-and-index");
|
||||
|
||||
// ReadIndex -- there should be nothing
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let res_body = json_body(res).await;
|
||||
assert_json_eq!(
|
||||
res_body,
|
||||
json!({
|
||||
"prefix": null,
|
||||
"start": null,
|
||||
"end": null,
|
||||
"limit": null,
|
||||
"partitionKeys": [],
|
||||
"more": false,
|
||||
"nextStart": null
|
||||
})
|
||||
);
|
||||
|
||||
let content2_len = "_: hello universe".len();
|
||||
let content3_len = "_: concurrent value".len();
|
||||
|
||||
for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() {
|
||||
let content = format!("{}: hello world", sk).into_bytes();
|
||||
let content2 = format!("{}: hello universe", sk).into_bytes();
|
||||
let content3 = format!("{}: concurrent value", sk).into_bytes();
|
||||
|
||||
// Put initially, no causality token
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.path("root")
|
||||
.query_param("sort_key", Some(sk))
|
||||
.body(content.clone())
|
||||
.method(Method::PUT)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), 200);
|
||||
|
||||
// Get value back
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.path("root")
|
||||
.query_param("sort_key", Some(sk))
|
||||
.signed_header("accept", "*/*")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), 200);
|
||||
assert_eq!(
|
||||
res.headers().get("content-type").unwrap().to_str().unwrap(),
|
||||
"application/octet-stream"
|
||||
);
|
||||
let ct = res
|
||||
.headers()
|
||||
.get("x-garage-causality-token")
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let res_body = hyper::body::to_bytes(res.into_body())
|
||||
.await
|
||||
.unwrap()
|
||||
.to_vec();
|
||||
assert_eq!(res_body, content);
|
||||
|
||||
// ReadIndex -- now there should be some stuff
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let res_body = json_body(res).await;
|
||||
assert_json_eq!(
|
||||
res_body,
|
||||
json!({
|
||||
"prefix": null,
|
||||
"start": null,
|
||||
"end": null,
|
||||
"limit": null,
|
||||
"partitionKeys": [
|
||||
{
|
||||
"pk": "root",
|
||||
"entries": i+1,
|
||||
"conflicts": i,
|
||||
"values": i+i+1,
|
||||
"bytes": i*(content2.len() + content3.len()) + content.len(),
|
||||
}
|
||||
],
|
||||
"more": false,
|
||||
"nextStart": null
|
||||
})
|
||||
);
|
||||
|
||||
// Put again, this time with causality token
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.path("root")
|
||||
.query_param("sort_key", Some(sk))
|
||||
.signed_header("x-garage-causality-token", ct.clone())
|
||||
.body(content2.clone())
|
||||
.method(Method::PUT)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), 200);
|
||||
|
||||
// Get value back
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.path("root")
|
||||
.query_param("sort_key", Some(sk))
|
||||
.signed_header("accept", "*/*")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), 200);
|
||||
assert_eq!(
|
||||
res.headers().get("content-type").unwrap().to_str().unwrap(),
|
||||
"application/octet-stream"
|
||||
);
|
||||
let res_body = hyper::body::to_bytes(res.into_body())
|
||||
.await
|
||||
.unwrap()
|
||||
.to_vec();
|
||||
assert_eq!(res_body, content2);
|
||||
|
||||
// ReadIndex -- now there should be some stuff
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let res_body = json_body(res).await;
|
||||
assert_json_eq!(
|
||||
res_body,
|
||||
json!({
|
||||
"prefix": null,
|
||||
"start": null,
|
||||
"end": null,
|
||||
"limit": null,
|
||||
"partitionKeys": [
|
||||
{
|
||||
"pk": "root",
|
||||
"entries": i+1,
|
||||
"conflicts": i,
|
||||
"values": i+i+1,
|
||||
"bytes": i*content3.len() + (i+1)*content2.len(),
|
||||
}
|
||||
],
|
||||
"more": false,
|
||||
"nextStart": null
|
||||
})
|
||||
);
|
||||
|
||||
// Put again with same CT, now we have concurrent values
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.path("root")
|
||||
.query_param("sort_key", Some(sk))
|
||||
.signed_header("x-garage-causality-token", ct.clone())
|
||||
.body(content3.clone())
|
||||
.method(Method::PUT)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), 200);
|
||||
|
||||
// Get value back
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.path("root")
|
||||
.query_param("sort_key", Some(sk))
|
||||
.signed_header("accept", "*/*")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), 200);
|
||||
assert_eq!(
|
||||
res.headers().get("content-type").unwrap().to_str().unwrap(),
|
||||
"application/json"
|
||||
);
|
||||
let res_json = json_body(res).await;
|
||||
assert_json_eq!(
|
||||
res_json,
|
||||
[base64::encode(&content2), base64::encode(&content3)]
|
||||
);
|
||||
|
||||
// ReadIndex -- now there should be some stuff
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let res_body = json_body(res).await;
|
||||
assert_json_eq!(
|
||||
res_body,
|
||||
json!({
|
||||
"prefix": null,
|
||||
"start": null,
|
||||
"end": null,
|
||||
"limit": null,
|
||||
"partitionKeys": [
|
||||
{
|
||||
"pk": "root",
|
||||
"entries": i+1,
|
||||
"conflicts": i+1,
|
||||
"values": 2*(i+1),
|
||||
"bytes": (i+1)*(content2.len() + content3.len()),
|
||||
}
|
||||
],
|
||||
"more": false,
|
||||
"nextStart": null
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
// Now delete things
|
||||
for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() {
|
||||
// Get value back (we just need the CT)
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.path("root")
|
||||
.query_param("sort_key", Some(sk))
|
||||
.signed_header("accept", "*/*")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), 200);
|
||||
let ct = res
|
||||
.headers()
|
||||
.get("x-garage-causality-token")
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
// Delete it
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.method(Method::DELETE)
|
||||
.path("root")
|
||||
.query_param("sort_key", Some(sk))
|
||||
.signed_header("x-garage-causality-token", ct)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), 204);
|
||||
|
||||
// ReadIndex -- now there should be some stuff
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let res_body = json_body(res).await;
|
||||
if i < 3 {
|
||||
assert_json_eq!(
|
||||
res_body,
|
||||
json!({
|
||||
"prefix": null,
|
||||
"start": null,
|
||||
"end": null,
|
||||
"limit": null,
|
||||
"partitionKeys": [
|
||||
{
|
||||
"pk": "root",
|
||||
"entries": 3-i,
|
||||
"conflicts": 3-i,
|
||||
"values": 2*(3-i),
|
||||
"bytes": (3-i)*(content2_len + content3_len),
|
||||
}
|
||||
],
|
||||
"more": false,
|
||||
"nextStart": null
|
||||
})
|
||||
);
|
||||
} else {
|
||||
assert_json_eq!(
|
||||
res_body,
|
||||
json!({
|
||||
"prefix": null,
|
||||
"start": null,
|
||||
"end": null,
|
||||
"limit": null,
|
||||
"partitionKeys": [],
|
||||
"more": false,
|
||||
"nextStart": null
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1 +1,15 @@
|
|||
pub mod item;
|
||||
pub mod simple;
|
||||
|
||||
use hyper::{Body, Response};
|
||||
|
||||
pub async fn json_body(res: Response<Body>) -> serde_json::Value {
|
||||
let res_body: serde_json::Value = serde_json::from_slice(
|
||||
&hyper::body::to_bytes(res.into_body())
|
||||
.await
|
||||
.unwrap()
|
||||
.to_vec()[..],
|
||||
)
|
||||
.unwrap();
|
||||
res_body
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use crate::common;
|
||||
use common::custom_requester::BodySignature;
|
||||
|
||||
use hyper::Method;
|
||||
|
||||
|
@ -8,29 +7,34 @@ async fn test_simple() {
|
|||
let ctx = common::context();
|
||||
let bucket = ctx.create_bucket("test-k2v-simple");
|
||||
|
||||
let res = ctx.k2v.request
|
||||
let res = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.method(Method::PUT)
|
||||
.path("root".into())
|
||||
.path("root")
|
||||
.query_param("sort_key", Some("test1"))
|
||||
.body(b"Hello, world!".to_vec())
|
||||
.body_signature(BodySignature::Classic)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res.status(), 200);
|
||||
|
||||
let res2 = ctx.k2v.request
|
||||
let res2 = ctx
|
||||
.k2v
|
||||
.request
|
||||
.builder(bucket.clone())
|
||||
.path("root".into())
|
||||
.path("root")
|
||||
.query_param("sort_key", Some("test1"))
|
||||
.signed_header("accept", "application/octet-stream")
|
||||
.body_signature(BodySignature::Classic)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res2.status(), 200);
|
||||
|
||||
let res2_body = hyper::body::to_bytes(res2.into_body()).await.unwrap().to_vec();
|
||||
let res2_body = hyper::body::to_bytes(res2.into_body())
|
||||
.await
|
||||
.unwrap()
|
||||
.to_vec();
|
||||
assert_eq!(res2_body, b"Hello, world!");
|
||||
}
|
||||
|
|
|
@ -3,5 +3,5 @@ mod common;
|
|||
|
||||
mod admin;
|
||||
mod bucket;
|
||||
mod s3;
|
||||
mod k2v;
|
||||
mod s3;
|
||||
|
|
|
@ -35,10 +35,7 @@ async fn test_website() {
|
|||
let req = || {
|
||||
Request::builder()
|
||||
.method("GET")
|
||||
.uri(format!(
|
||||
"http://127.0.0.1:{}/",
|
||||
ctx.garage.web_port
|
||||
))
|
||||
.uri(format!("http://127.0.0.1:{}/", ctx.garage.web_port))
|
||||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.body(Body::empty())
|
||||
.unwrap()
|
||||
|
@ -170,10 +167,7 @@ async fn test_website_s3_api() {
|
|||
{
|
||||
let req = Request::builder()
|
||||
.method("GET")
|
||||
.uri(format!(
|
||||
"http://127.0.0.1:{}/site/",
|
||||
ctx.garage.web_port
|
||||
))
|
||||
.uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
|
||||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.header("Origin", "https://example.com")
|
||||
.body(Body::empty())
|
||||
|
@ -217,10 +211,7 @@ async fn test_website_s3_api() {
|
|||
{
|
||||
let req = Request::builder()
|
||||
.method("OPTIONS")
|
||||
.uri(format!(
|
||||
"http://127.0.0.1:{}/site/",
|
||||
ctx.garage.web_port
|
||||
))
|
||||
.uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
|
||||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.header("Origin", "https://example.com")
|
||||
.header("Access-Control-Request-Method", "PUT")
|
||||
|
@ -244,10 +235,7 @@ async fn test_website_s3_api() {
|
|||
{
|
||||
let req = Request::builder()
|
||||
.method("OPTIONS")
|
||||
.uri(format!(
|
||||
"http://127.0.0.1:{}/site/",
|
||||
ctx.garage.web_port
|
||||
))
|
||||
.uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
|
||||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.header("Origin", "https://example.com")
|
||||
.header("Access-Control-Request-Method", "DELETE")
|
||||
|
@ -288,10 +276,7 @@ async fn test_website_s3_api() {
|
|||
{
|
||||
let req = Request::builder()
|
||||
.method("OPTIONS")
|
||||
.uri(format!(
|
||||
"http://127.0.0.1:{}/site/",
|
||||
ctx.garage.web_port
|
||||
))
|
||||
.uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
|
||||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.header("Origin", "https://example.com")
|
||||
.header("Access-Control-Request-Method", "PUT")
|
||||
|
@ -319,10 +304,7 @@ async fn test_website_s3_api() {
|
|||
{
|
||||
let req = Request::builder()
|
||||
.method("GET")
|
||||
.uri(format!(
|
||||
"http://127.0.0.1:{}/site/",
|
||||
ctx.garage.web_port
|
||||
))
|
||||
.uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
|
||||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
|
|
Loading…
Reference in a new issue