use std::time::Duration; use crate::common; use assert_json_diff::assert_json_eq; use base64::prelude::*; use serde_json::json; use super::json_body; use hyper::{Method, StatusCode}; #[tokio::test] async fn test_items_and_indices() { let ctx = common::context(); let bucket = ctx.create_bucket("test-k2v-item-and-index"); // ReadIndex -- there should be nothing let res = ctx .k2v .request .builder(bucket.clone()) .send() .await .unwrap(); let res_body = json_body(res).await; assert_json_eq!( res_body, json!({ "prefix": null, "start": null, "end": null, "limit": null, "reverse": false, "partitionKeys": [], "more": false, "nextStart": null }) ); let content2_len = "_: hello universe".len(); let content3_len = "_: concurrent value".len(); for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() { let content = format!("{}: hello world", sk).into_bytes(); let content2 = format!("{}: hello universe", sk).into_bytes(); let content3 = format!("{}: concurrent value", sk).into_bytes(); // Put initially, no causality token let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some(sk)) .body(content.clone()) .method(Method::PUT) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::NO_CONTENT); // Get value back let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some(sk)) .signed_header("accept", "*/*") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/octet-stream" ); let ct = res .headers() .get("x-garage-causality-token") .unwrap() .to_str() .unwrap() .to_string(); let res_body = hyper::body::to_bytes(res.into_body()) .await .unwrap() .to_vec(); assert_eq!(res_body, content); // ReadIndex -- now there should be some stuff tokio::time::sleep(Duration::from_secs(1)).await; let res = ctx .k2v .request .builder(bucket.clone()) .send() .await .unwrap(); let res_body = json_body(res).await; assert_json_eq!( res_body, json!({ "prefix": null, "start": null, "end": null, "limit": null, "reverse": false, "partitionKeys": [ { "pk": "root", "entries": i+1, "conflicts": i, "values": i+i+1, "bytes": i*(content2.len() + content3.len()) + content.len(), } ], "more": false, "nextStart": null }) ); // Put again, this time with causality token let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some(sk)) .signed_header("x-garage-causality-token", ct.clone()) .body(content2.clone()) .method(Method::PUT) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::NO_CONTENT); // Get value back let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some(sk)) .signed_header("accept", "*/*") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/octet-stream" ); let res_body = hyper::body::to_bytes(res.into_body()) .await .unwrap() .to_vec(); assert_eq!(res_body, content2); // ReadIndex -- now there should be some stuff tokio::time::sleep(Duration::from_secs(1)).await; let res = ctx .k2v .request .builder(bucket.clone()) .send() .await .unwrap(); let res_body = json_body(res).await; assert_json_eq!( res_body, json!({ "prefix": null, "start": null, "end": null, "limit": null, "reverse": false, "partitionKeys": [ { "pk": "root", "entries": i+1, "conflicts": i, "values": i+i+1, "bytes": i*content3.len() + (i+1)*content2.len(), } ], "more": false, "nextStart": null }) ); // Put again with same CT, now we have concurrent values let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some(sk)) .signed_header("x-garage-causality-token", ct.clone()) .body(content3.clone()) .method(Method::PUT) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::NO_CONTENT); // Get value back let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some(sk)) .signed_header("accept", "*/*") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/json" ); let res_json = json_body(res).await; assert_json_eq!( res_json, [ BASE64_STANDARD.encode(&content2), BASE64_STANDARD.encode(&content3) ] ); // ReadIndex -- now there should be some stuff tokio::time::sleep(Duration::from_secs(1)).await; let res = ctx .k2v .request .builder(bucket.clone()) .send() .await .unwrap(); let res_body = json_body(res).await; assert_json_eq!( res_body, json!({ "prefix": null, "start": null, "end": null, "limit": null, "reverse": false, "partitionKeys": [ { "pk": "root", "entries": i+1, "conflicts": i+1, "values": 2*(i+1), "bytes": (i+1)*(content2.len() + content3.len()), } ], "more": false, "nextStart": null }) ); } // Now delete things for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() { // Get value back (we just need the CT) let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some(sk)) .signed_header("accept", "*/*") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); let ct = res .headers() .get("x-garage-causality-token") .unwrap() .to_str() .unwrap() .to_string(); // Delete it let res = ctx .k2v .request .builder(bucket.clone()) .method(Method::DELETE) .path("root") .query_param("sort_key", Some(sk)) .signed_header("x-garage-causality-token", ct) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::NO_CONTENT); // ReadIndex -- now there should be some stuff tokio::time::sleep(Duration::from_secs(1)).await; let res = ctx .k2v .request .builder(bucket.clone()) .send() .await .unwrap(); let res_body = json_body(res).await; if i < 3 { assert_json_eq!( res_body, json!({ "prefix": null, "start": null, "end": null, "limit": null, "reverse": false, "partitionKeys": [ { "pk": "root", "entries": 3-i, "conflicts": 3-i, "values": 2*(3-i), "bytes": (3-i)*(content2_len + content3_len), } ], "more": false, "nextStart": null }) ); } else { assert_json_eq!( res_body, json!({ "prefix": null, "start": null, "end": null, "limit": null, "reverse": false, "partitionKeys": [], "more": false, "nextStart": null }) ); } } } #[tokio::test] async fn test_item_return_format() { let ctx = common::context(); let bucket = ctx.create_bucket("test-k2v-item-return-format"); let single_value = b"A single value".to_vec(); let concurrent_value = b"A concurrent value".to_vec(); // -- Test with a single value -- let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .body(single_value.clone()) .method(Method::PUT) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::NO_CONTENT); // f0: either let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .signed_header("accept", "*/*") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/octet-stream" ); let ct = res .headers() .get("x-garage-causality-token") .unwrap() .to_str() .unwrap() .to_string(); let res_body = hyper::body::to_bytes(res.into_body()) .await .unwrap() .to_vec(); assert_eq!(res_body, single_value); // f1: not specified let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/json" ); let res_body = json_body(res).await; assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)])); // f2: binary let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .signed_header("accept", "application/octet-stream") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/octet-stream" ); let res_body = hyper::body::to_bytes(res.into_body()) .await .unwrap() .to_vec(); assert_eq!(res_body, single_value); // f3: json let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .signed_header("accept", "application/json") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/json" ); let res_body = json_body(res).await; assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)])); // -- Test with a second, concurrent value -- let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .body(concurrent_value.clone()) .method(Method::PUT) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::NO_CONTENT); // f0: either let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .signed_header("accept", "*/*") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/json" ); let res_body = json_body(res).await; assert_json_eq!( res_body, json!([ BASE64_STANDARD.encode(&single_value), BASE64_STANDARD.encode(&concurrent_value) ]) ); // f1: not specified let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/json" ); let res_body = json_body(res).await; assert_json_eq!( res_body, json!([ BASE64_STANDARD.encode(&single_value), BASE64_STANDARD.encode(&concurrent_value) ]) ); // f2: binary let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .signed_header("accept", "application/octet-stream") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::CONFLICT); // CONFLICT // f3: json let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .signed_header("accept", "application/json") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/json" ); let res_body = json_body(res).await; assert_json_eq!( res_body, json!([ BASE64_STANDARD.encode(&single_value), BASE64_STANDARD.encode(&concurrent_value) ]) ); // -- Delete first value, concurrently with second insert -- // -- (we now have a concurrent value and a deletion) -- let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .method(Method::DELETE) .signed_header("x-garage-causality-token", ct) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::NO_CONTENT); // f0: either let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .signed_header("accept", "*/*") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/json" ); let res_body = json_body(res).await; assert_json_eq!( res_body, json!([BASE64_STANDARD.encode(&concurrent_value), null]) ); // f1: not specified let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/json" ); let ct = res .headers() .get("x-garage-causality-token") .unwrap() .to_str() .unwrap() .to_string(); let res_body = json_body(res).await; assert_json_eq!( res_body, json!([BASE64_STANDARD.encode(&concurrent_value), null]) ); // f2: binary let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .signed_header("accept", "application/octet-stream") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::CONFLICT); // CONFLICT // f3: json let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .signed_header("accept", "application/json") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/json" ); let res_body = json_body(res).await; assert_json_eq!( res_body, json!([BASE64_STANDARD.encode(&concurrent_value), null]) ); // -- Delete everything -- let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .method(Method::DELETE) .signed_header("x-garage-causality-token", ct) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::NO_CONTENT); // f0: either let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .signed_header("accept", "*/*") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::NO_CONTENT); // NO CONTENT // f1: not specified let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/json" ); let res_body = json_body(res).await; assert_json_eq!(res_body, json!([null])); // f2: binary let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .signed_header("accept", "application/octet-stream") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::NO_CONTENT); // NO CONTENT // f3: json let res = ctx .k2v .request .builder(bucket.clone()) .path("root") .query_param("sort_key", Some("v1")) .signed_header("accept", "application/json") .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( res.headers().get("content-type").unwrap().to_str().unwrap(), "application/json" ); let res_body = json_body(res).await; assert_json_eq!(res_body, json!([null])); }