Compare commits

...

11 commits

14 changed files with 1946 additions and 2049 deletions

1206
Cargo.lock generated

File diff suppressed because it is too large Load diff

2709
Cargo.nix

File diff suppressed because it is too large Load diff

View file

@ -47,7 +47,7 @@ http-range = "0.1"
hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
multer = "2.0"
percent-encoding = "2.1.0"
roxmltree = "0.14"
roxmltree = "0.18"
serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11"
serde_json = "1.0"

View file

@ -28,7 +28,7 @@ hex = "0.4"
tracing = "0.1"
rand = "0.8"
async-compression = { version = "0.3", features = ["tokio", "zstd"] }
async-compression = { version = "0.4", features = ["tokio", "zstd"] }
zstd = { version = "0.12", default-features = false }
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }

View file

@ -61,7 +61,8 @@ opentelemetry-otlp = { version = "0.10", optional = true }
prometheus = { version = "0.13", optional = true }
[dev-dependencies]
aws-sdk-s3 = "0.19"
aws-config = "0.55.2"
aws-sdk-s3 = "0.28"
chrono = "0.4"
http = "0.2"
hmac = "0.12"

View file

@ -1,7 +1,6 @@
use crate::common;
use crate::common::ext::CommandExt;
use aws_sdk_s3::model::BucketLocationConstraint;
use aws_sdk_s3::output::DeleteBucketOutput;
use aws_sdk_s3::operation::delete_bucket::DeleteBucketOutput;
#[tokio::test]
async fn test_bucket_all() {
@ -63,10 +62,7 @@ async fn test_bucket_all() {
.await
.unwrap();
match r.location_constraint.unwrap() {
BucketLocationConstraint::Unknown(v) if v.as_str() == "garage-integ-test" => (),
_ => unreachable!("wrong region"),
}
assert_eq!(r.location_constraint.unwrap().as_str(), "garage-integ-test");
}
{
// (Stub) check GetVersioning

View file

@ -1,15 +1,16 @@
use aws_sdk_s3::{Client, Config, Credentials, Endpoint};
use aws_sdk_s3::config::Credentials;
use aws_sdk_s3::{Client, Config};
use super::garage::{Instance, Key};
use super::garage::Key;
use crate::common::garage::DEFAULT_PORT;
pub fn build_client(instance: &Instance, key: &Key) -> Client {
pub fn build_client(key: &Key) -> Client {
let credentials = Credentials::new(&key.id, &key.secret, None, None, "garage-integ-test");
let endpoint = Endpoint::immutable(instance.s3_uri());
let config = Config::builder()
.endpoint_url(format!("http://127.0.0.1:{}", DEFAULT_PORT))
.region(super::REGION)
.credentials_provider(credentials)
.endpoint_resolver(endpoint)
.build();
Client::from_conf(config)

View file

@ -1,4 +1,5 @@
use aws_sdk_s3::{Client, Region};
use aws_sdk_s3::config::Region;
use aws_sdk_s3::Client;
use ext::*;
use k2v_client::K2vClient;
@ -32,7 +33,7 @@ impl Context {
fn new() -> Self {
let garage = garage::instance();
let key = garage.key(None);
let client = client::build_client(garage, &key);
let client = client::build_client(&key);
let custom_request = CustomRequester::new_s3(garage, &key);
let k2v_request = CustomRequester::new_k2v(garage, &key);

View file

@ -1,6 +1,6 @@
use crate::common;
use aws_sdk_s3::model::{CompletedMultipartUpload, CompletedPart};
use aws_sdk_s3::types::ByteStream;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
const SZ_5MB: usize = 5 * 1024 * 1024;
const SZ_10MB: usize = 10 * 1024 * 1024;

View file

@ -1,6 +1,6 @@
use crate::common;
use aws_sdk_s3::model::{Delete, ObjectIdentifier};
use aws_sdk_s3::types::ByteStream;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{Delete, ObjectIdentifier};
const STD_KEY: &str = "hello world";
const CTRL_KEY: &str = "\x00\x01\x02\x00";

View file

@ -2,7 +2,7 @@ use crate::common;
#[tokio::test]
async fn test_simple() {
use aws_sdk_s3::types::ByteStream;
use aws_sdk_s3::primitives::ByteStream;
let ctx = common::context();
let bucket = ctx.create_bucket("test-simple");

View file

@ -4,8 +4,8 @@ use crate::k2v::json_body;
use assert_json_diff::assert_json_eq;
use aws_sdk_s3::{
model::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
types::ByteStream,
primitives::ByteStream,
types::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
};
use http::{Request, StatusCode};
use hyper::{

View file

@ -311,23 +311,19 @@ impl BatchOutputKind {
.collect::<Vec<_>>()
}
fn display_poll_range_output(
&self,
seen_marker: String,
values: BTreeMap<String, CausalValue>,
) -> ! {
fn display_poll_range_output(&self, poll_range: PollRangeResult) -> ! {
if self.json {
let json = serde_json::json!({
"values": self.values_json(values),
"seen_marker": seen_marker,
"values": self.values_json(poll_range.items),
"seen_marker": poll_range.seen_marker,
});
let stdout = std::io::stdout();
serde_json::to_writer_pretty(stdout, &json).unwrap();
exit(0)
} else {
println!("seen marker: {}", seen_marker);
self.display_human_output(values)
println!("seen marker: {}", poll_range.seen_marker);
self.display_human_output(poll_range.items)
}
}
@ -501,8 +497,8 @@ async fn main() -> Result<(), Error> {
)
.await?;
match res {
Some((items, seen_marker)) => {
output_kind.display_poll_range_output(seen_marker, items);
Some(poll_range_output) => {
output_kind.display_poll_range_output(poll_range_output);
}
None => {
if output_kind.json {

View file

@ -182,7 +182,7 @@ impl K2vClient {
filter: Option<PollRangeFilter<'_>>,
seen_marker: Option<&str>,
timeout: Option<Duration>,
) -> Result<Option<(BTreeMap<String, CausalValue>, String)>, Error> {
) -> Result<Option<PollRangeResult>, Error> {
let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
let request = PollRangeRequest {
@ -217,7 +217,10 @@ impl K2vClient {
})
.collect::<BTreeMap<_, _>>();
Ok(Some((items, resp.seen_marker)))
Ok(Some(PollRangeResult {
items,
seen_marker: resp.seen_marker,
}))
}
/// Perform an InsertItem request, inserting a value for a single pk+sk.
@ -570,6 +573,7 @@ pub struct Filter<'a> {
pub reverse: bool,
}
/// Filter for a poll range operations.
#[derive(Debug, Default, Clone, Serialize)]
pub struct PollRangeFilter<'a> {
pub start: Option<&'a str>,
@ -577,6 +581,15 @@ pub struct PollRangeFilter<'a> {
pub prefix: Option<&'a str>,
}
/// Response to a poll_range query
#[derive(Debug, Default, Clone, Serialize)]
pub struct PollRangeResult {
/// List of items that have changed since last PollRange call.
pub items: BTreeMap<String, CausalValue>,
/// opaque string representing items already seen for future PollRange calls.
pub seen_marker: String,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct PollRangeRequest<'a> {