Merge branch 'main' into next

This commit is contained in:
Alex 2023-06-13 17:02:42 +02:00
commit 90b2d43eb4
35 changed files with 3038 additions and 2545 deletions

1460
Cargo.lock generated

File diff suppressed because it is too large Load diff

3257
Cargo.nix

File diff suppressed because it is too large Load diff

View file

@ -11,11 +11,13 @@ members = [
"src/web", "src/web",
"src/garage", "src/garage",
"src/k2v-client", "src/k2v-client",
"src/format-table",
] ]
default-members = ["src/garage"] default-members = ["src/garage"]
[workspace.dependencies] [workspace.dependencies]
format_table = { version = "0.1.1", path = "src/format-table" }
garage_api = { version = "0.8.2", path = "src/api" } garage_api = { version = "0.8.2", path = "src/api" }
garage_block = { version = "0.8.2", path = "src/block" } garage_block = { version = "0.8.2", path = "src/block" }
garage_db = { version = "0.8.2", path = "src/db", default-features = false } garage_db = { version = "0.8.2", path = "src/db", default-features = false }
@ -24,6 +26,7 @@ garage_rpc = { version = "0.8.2", path = "src/rpc" }
garage_table = { version = "0.8.2", path = "src/table" } garage_table = { version = "0.8.2", path = "src/table" }
garage_util = { version = "0.8.2", path = "src/util" } garage_util = { version = "0.8.2", path = "src/util" }
garage_web = { version = "0.8.2", path = "src/web" } garage_web = { version = "0.8.2", path = "src/web" }
k2v-client = { version = "0.0.4", path = "src/k2v-client" }
[profile.dev] [profile.dev]
lto = "off" lto = "off"

View file

@ -25,7 +25,7 @@ git clone https://git.deuxfleurs.fr/Deuxfleurs/garage
cd garage cd garage
``` ```
*Optionnaly, you can use our nix.conf file to speed up compilations:* *Optionally, you can use our nix.conf file to speed up compilations:*
```bash ```bash
sudo mkdir -p /etc/nix sudo mkdir -p /etc/nix

View file

@ -37,12 +37,18 @@ bootstrap_peers = [
[consul_discovery] [consul_discovery]
api = "catalog"
consul_http_addr = "http://127.0.0.1:8500" consul_http_addr = "http://127.0.0.1:8500"
service_name = "garage-daemon" service_name = "garage-daemon"
ca_cert = "/etc/consul/consul-ca.crt" ca_cert = "/etc/consul/consul-ca.crt"
client_cert = "/etc/consul/consul-client.crt" client_cert = "/etc/consul/consul-client.crt"
client_key = "/etc/consul/consul-key.crt" client_key = "/etc/consul/consul-key.crt"
# for `agent` API mode, unset client_cert and client_key, and optionally enable `token`
# token = "abcdef-01234-56789"
tls_skip_verify = false tls_skip_verify = false
tags = [ "dns-enabled" ]
meta = { dns-acl = "allow trusted" }
[kubernetes_discovery] [kubernetes_discovery]
namespace = "garage" namespace = "garage"
@ -361,6 +367,12 @@ reached by other nodes of the cluster, which should be set in `rpc_public_addr`.
The `consul_http_addr` parameter should be set to the full HTTP(S) address of the Consul server. The `consul_http_addr` parameter should be set to the full HTTP(S) address of the Consul server.
### `api`
Two APIs for service registration are supported: `catalog` and `agent`. `catalog`, the default, will register a service using
the `/v1/catalog` endpoints, enabling mTLS if `client_cert` and `client_key` are provided. The `agent` API uses the
`v1/agent` endpoints instead, where an optional `token` may be provided.
### `service_name` ### `service_name`
`service_name` should be set to the service name under which Garage's `service_name` should be set to the service name under which Garage's
@ -369,6 +381,7 @@ RPC ports are announced.
### `client_cert`, `client_key` ### `client_cert`, `client_key`
TLS client certificate and client key to use when communicating with Consul over TLS. Both are mandatory when doing so. TLS client certificate and client key to use when communicating with Consul over TLS. Both are mandatory when doing so.
Only available when `api = "catalog"`.
### `ca_cert` ### `ca_cert`
@ -379,6 +392,29 @@ TLS CA certificate to use when communicating with Consul over TLS.
Skip server hostname verification in TLS handshake. Skip server hostname verification in TLS handshake.
`ca_cert` is ignored when this is set. `ca_cert` is ignored when this is set.
### `token`
Uses the provided token for communication with Consul. Only available when `api = "agent"`.
The policy assigned to this token should at least have these rules:
```hcl
// the `service_name` specified above
service "garage" {
policy = "write"
}
service_prefix "" {
policy = "read"
}
node_prefix "" {
policy = "read"
}
```
### `tags` and `meta`
Additional list of tags and map of service meta to add during service registration.
## The `[kubernetes_discovery]` section ## The `[kubernetes_discovery]` section

View file

@ -105,7 +105,7 @@ impl AdminApiServer {
let bucket_id = self let bucket_id = self
.garage .garage
.bucket_helper() .bucket_helper()
.resolve_global_bucket_name(&domain) .resolve_global_bucket_name(domain)
.await? .await?
.ok_or(HelperError::NoSuchBucket(domain.to_string()))?; .ok_or(HelperError::NoSuchBucket(domain.to_string()))?;

View file

@ -128,7 +128,7 @@ impl<A: ApiHandler> ApiServer<A> {
let uri = req.uri().clone(); let uri = req.uri().clone();
if let Ok(forwarded_for_ip_addr) = if let Ok(forwarded_for_ip_addr) =
forwarded_headers::handle_forwarded_for_headers(&req.headers()) forwarded_headers::handle_forwarded_for_headers(req.headers())
{ {
info!( info!(
"{} (via {}) {} {}", "{} (via {}) {} {}",

View file

@ -282,8 +282,8 @@ pub(crate) async fn handle_poll_range(
if let Some((items, seen_marker)) = resp { if let Some((items, seen_marker)) = resp {
let resp = PollRangeResponse { let resp = PollRangeResponse {
items: items items: items
.into_iter() .into_values()
.map(|(_k, i)| ReadBatchResponseItem::from(i)) .map(ReadBatchResponseItem::from)
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
seen_marker, seen_marker,
}; };

View file

@ -19,7 +19,7 @@ use crate::signature::error::*;
pub async fn check_payload_signature( pub async fn check_payload_signature(
garage: &Garage, garage: &Garage,
service: &str, service: &'static str,
request: &Request<Body>, request: &Request<Body>,
) -> Result<(Option<Key>, Option<Hash>), Error> { ) -> Result<(Option<Key>, Option<Hash>), Error> {
let mut headers = HashMap::new(); let mut headers = HashMap::new();
@ -51,6 +51,7 @@ pub async fn check_payload_signature(
}; };
let canonical_request = canonical_request( let canonical_request = canonical_request(
service,
request.method(), request.method(),
request.uri(), request.uri(),
&headers, &headers,
@ -184,7 +185,7 @@ fn parse_query_authorization(
if duration > 7 * 24 * 3600 { if duration > 7 * 24 * 3600 {
return Err(Error::bad_request( return Err(Error::bad_request(
"X-Amz-Exprires may not exceed a week".to_string(), "X-Amz-Expires may not exceed a week".to_string(),
)); ));
} }
@ -210,7 +211,7 @@ fn parse_query_authorization(
fn parse_credential(cred: &str) -> Result<(String, String), Error> { fn parse_credential(cred: &str) -> Result<(String, String), Error> {
let first_slash = cred let first_slash = cred
.find('/') .find('/')
.ok_or_bad_request("Credentials does not contain / in authorization field")?; .ok_or_bad_request("Credentials does not contain '/' in authorization field")?;
let (key_id, scope) = cred.split_at(first_slash); let (key_id, scope) = cred.split_at(first_slash);
Ok(( Ok((
key_id.to_string(), key_id.to_string(),
@ -231,15 +232,50 @@ pub fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_re
} }
pub fn canonical_request( pub fn canonical_request(
service: &'static str,
method: &Method, method: &Method,
uri: &hyper::Uri, uri: &hyper::Uri,
headers: &HashMap<String, String>, headers: &HashMap<String, String>,
signed_headers: &str, signed_headers: &str,
content_sha256: &str, content_sha256: &str,
) -> String { ) -> String {
// There seems to be evidence that in AWSv4 signatures, the path component is url-encoded
// a second time when building the canonical request, as specified in this documentation page:
// -> https://docs.aws.amazon.com/rolesanywhere/latest/userguide/authentication-sign-process.html
// However this documentation page is for a specific service ("roles anywhere"), and
// in the S3 service we know for a fact that there is no double-urlencoding, because all of
// the tests we made with external software work without it.
//
// The theory is that double-urlencoding occurs for all services except S3,
// which is what is implemented in rusoto_signature:
// -> https://docs.rs/rusoto_signature/latest/src/rusoto_signature/signature.rs.html#464
//
// Digging into the code of the official AWS Rust SDK, we learn that double-URI-encoding can
// be set or unset on a per-request basis (the signature crates, aws-sigv4 and aws-sig-auth,
// are agnostic to this). Grepping the codebase confirms that S3 is the only API for which
// double_uri_encode is set to false, meaning it is true (its default value) for all other
// AWS services. We will therefore implement this behavior in Garage as well.
//
// Note that this documentation page, which is touted as the "authoritative reference" on
// AWSv4 signatures, makes no mention of either single- or double-urlencoding:
// -> https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html
// This page of the S3 documentation does also not mention anything specific:
// -> https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html
//
// Note that there is also the issue of path normalization, which I hope is unrelated to the
// one of URI-encoding. At least in aws-sigv4 both parameters can be set independently,
// and rusoto_signature does not seem to do any effective path normalization, even though
// it mentions it in the comments (same link to the souce code as above).
// We make the explicit choice of NOT normalizing paths in the K2V API because doing so
// would make non-normalized paths invalid K2V partition keys, and we don't want that.
let path: std::borrow::Cow<str> = if service != "s3" {
uri_encode(uri.path(), false).into()
} else {
uri.path().into()
};
[ [
method.as_str(), method.as_str(),
uri.path(), &path,
&canonical_query_string(uri), &canonical_query_string(uri),
&canonical_header_string(headers, signed_headers), &canonical_header_string(headers, signed_headers),
"", "",

View file

@ -37,7 +37,7 @@ serde_bytes = "0.11"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-util = { version = "0.6", features = ["io"] } tokio-util = { version = "0.7", features = ["io"] }
[features] [features]
system-libs = [ "zstd/pkg-config" ] system-libs = [ "zstd/pkg-config" ]

View file

@ -220,14 +220,12 @@ fn randomize_next_scrub_run_time(timestamp: u64) -> u64 {
// Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to // Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to
// balance scrub load across different cluster nodes. // balance scrub load across different cluster nodes.
let next_run_timestamp = timestamp timestamp
+ SCRUB_INTERVAL + SCRUB_INTERVAL
.saturating_add(Duration::from_secs( .saturating_add(Duration::from_secs(
rand::thread_rng().gen_range(0..3600 * 24 * 10), rand::thread_rng().gen_range(0..3600 * 24 * 10),
)) ))
.as_millis() as u64; .as_millis() as u64
next_run_timestamp
} }
impl Default for ScrubWorkerPersisted { impl Default for ScrubWorkerPersisted {
@ -241,18 +239,14 @@ impl Default for ScrubWorkerPersisted {
} }
} }
#[derive(Default)]
enum ScrubWorkerState { enum ScrubWorkerState {
Running(BlockStoreIterator), Running(BlockStoreIterator),
Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
#[default]
Finished, Finished,
} }
impl Default for ScrubWorkerState {
fn default() -> Self {
ScrubWorkerState::Finished
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum ScrubWorkerCommand { pub enum ScrubWorkerCommand {
Start, Start,

View file

@ -0,0 +1,12 @@
[package]
name = "format_table"
version = "0.1.1"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
description = "Format tables with a stupid API"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
readme = "README.md"
[lib]
path = "lib.rs"

View file

@ -0,0 +1,13 @@
# `format_table`
Format tables with a stupid API. [Documentation](https://docs.rs/format_table).
Example:
```rust
let mut table = vec!["product\tquantity\tprice".to_string()];
for (p, q, r) in [("tomato", 12, 15), ("potato", 10, 20), ("rice", 5, 12)] {
table.push(format!("{}\t{}\t{}", p, q, r));
}
format_table::format_table(table);
```

View file

@ -1,3 +1,19 @@
//! Format tables with a stupid API.
//!
//! Example:
//!
//! ```rust
//! let mut table = vec!["product\tquantity\tprice".to_string()];
//! for (p, q, r) in [("tomato", 12, 15), ("potato", 10, 20), ("rice", 5, 12)] {
//! table.push(format!("{}\t{}\t{}", p, q, r));
//! }
//! format_table::format_table(table);
//! ```
//!
//! A table to be formatted is a `Vec<String>`, containing one string per line.
//! Table columns in each line are separated by a `\t` character.
/// Format a table and return the result as a string.
pub fn format_table_to_string(data: Vec<String>) -> String { pub fn format_table_to_string(data: Vec<String>) -> String {
let data = data let data = data
.iter() .iter()
@ -27,6 +43,7 @@ pub fn format_table_to_string(data: Vec<String>) -> String {
out out
} }
/// Format a table and print the result to stdout.
pub fn format_table(data: Vec<String>) { pub fn format_table(data: Vec<String>) {
print!("{}", format_table_to_string(data)); print!("{}", format_table_to_string(data));
} }

View file

@ -21,6 +21,7 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
format_table.workspace = true
garage_db.workspace = true garage_db.workspace = true
garage_api.workspace = true garage_api.workspace = true
garage_block.workspace = true garage_block.workspace = true
@ -72,6 +73,8 @@ assert-json-diff = "2.0"
serde_json = "1.0" serde_json = "1.0"
base64 = "0.21" base64 = "0.21"
k2v-client.workspace = true
[features] [features]
default = [ "bundled-libs", "metrics", "sled", "lmdb", "sqlite", "k2v" ] default = [ "bundled-libs", "metrics", "sled", "lmdb", "sqlite", "k2v" ]

View file

@ -9,10 +9,11 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use format_table::format_table_to_string;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_util::formater::format_table_to_string;
use garage_table::replication::*; use garage_table::replication::*;
use garage_table::*; use garage_table::*;

View file

@ -1,8 +1,8 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::time::Duration; use std::time::Duration;
use format_table::format_table;
use garage_util::error::*; use garage_util::error::*;
use garage_util::formater::format_table;
use garage_rpc::layout::*; use garage_rpc::layout::*;
use garage_rpc::system::*; use garage_rpc::system::*;

View file

@ -1,8 +1,8 @@
use bytesize::ByteSize; use bytesize::ByteSize;
use format_table::format_table;
use garage_util::crdt::Crdt; use garage_util::crdt::Crdt;
use garage_util::error::*; use garage_util::error::*;
use garage_util::formater::format_table;
use garage_rpc::layout::*; use garage_rpc::layout::*;
use garage_rpc::system::*; use garage_rpc::system::*;

View file

@ -1,11 +1,11 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Duration; use std::time::Duration;
use format_table::format_table;
use garage_util::background::*; use garage_util::background::*;
use garage_util::crdt::*; use garage_util::crdt::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::formater::format_table;
use garage_util::time::*; use garage_util::time::*;
use garage_block::manager::BlockResyncErrorInfo; use garage_block::manager::BlockResyncErrorInfo;
@ -383,13 +383,18 @@ pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()]; let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
for e in el { for e in el {
let next_try = if e.next_try > now {
tf2.convert(Duration::from_millis(e.next_try - now))
} else {
"asap".to_string()
};
table.push(format!( table.push(format!(
"{}\t{}\t{}\t{}\tin {}", "{}\t{}\t{}\t{}\tin {}",
hex::encode(e.hash.as_slice()), hex::encode(e.hash.as_slice()),
e.refcount, e.refcount,
e.error_count, e.error_count,
tf.convert(Duration::from_millis(now - e.last_try)), tf.convert(Duration::from_millis(now - e.last_try)),
tf2.convert(Duration::from_millis(e.next_try - now)) next_try
)); ));
} }
format_table(table); format_table(table);

View file

@ -209,6 +209,7 @@ impl<'a> RequestBuilder<'a> {
all_headers.extend(self.unsigned_headers.clone()); all_headers.extend(self.unsigned_headers.clone());
let canonical_request = signature::payload::canonical_request( let canonical_request = signature::payload::canonical_request(
self.service,
&self.method, &self.method,
&Uri::try_from(&uri).unwrap(), &Uri::try_from(&uri).unwrap(),
&all_headers, &all_headers,

View file

@ -1,5 +1,6 @@
use aws_sdk_s3::{Client, Region}; use aws_sdk_s3::{Client, Region};
use ext::*; use ext::*;
use k2v_client::K2vClient;
#[macro_use] #[macro_use]
pub mod macros; pub mod macros;
@ -68,6 +69,19 @@ impl Context {
bucket_name bucket_name
} }
/// Build a K2vClient for a given bucket
pub fn k2v_client(&self, bucket: &str) -> K2vClient {
let config = k2v_client::K2vClientConfig {
region: REGION.to_string(),
endpoint: self.garage.k2v_uri().to_string(),
aws_access_key_id: self.key.id.clone(),
aws_secret_access_key: self.key.secret.clone(),
bucket: bucket.to_string(),
user_agent: None,
};
K2vClient::new(config).expect("Could not create K2V client")
}
} }
pub fn context() -> Context { pub fn context() -> Context {

View file

@ -0,0 +1 @@
pub mod simple;

View file

@ -0,0 +1,60 @@
use std::time::Duration;
use k2v_client::*;
use crate::common;
#[tokio::test]
async fn test_simple() {
let ctx = common::context();
let bucket = ctx.create_bucket("test-k2v-client-simple");
let k2v_client = ctx.k2v_client(&bucket);
k2v_client
.insert_item("root", "test1", b"Hello, world!".to_vec(), None)
.await
.unwrap();
let res = k2v_client.read_item("root", "test1").await.unwrap();
assert_eq!(res.value.len(), 1);
assert_eq!(res.value[0], K2vValue::Value(b"Hello, world!".to_vec()));
}
#[tokio::test]
async fn test_special_chars() {
let ctx = common::context();
let bucket = ctx.create_bucket("test-k2v-client-simple-special-chars");
let k2v_client = ctx.k2v_client(&bucket);
let (pk, sk) = ("root@plépp", "≤≤««");
k2v_client
.insert_item(pk, sk, b"Hello, world!".to_vec(), None)
.await
.unwrap();
let res = k2v_client.read_item(pk, sk).await.unwrap();
assert_eq!(res.value.len(), 1);
assert_eq!(res.value[0], K2vValue::Value(b"Hello, world!".to_vec()));
// sleep a bit before read_index
tokio::time::sleep(Duration::from_secs(1)).await;
let res = k2v_client.read_index(Default::default()).await.unwrap();
assert_eq!(res.items.len(), 1);
assert_eq!(res.items.keys().next().unwrap(), pk);
let res = k2v_client
.read_batch(&[BatchReadOp {
partition_key: pk,
filter: Default::default(),
single_item: false,
conflicts_only: false,
tombstones: false,
}])
.await
.unwrap();
assert_eq!(res.len(), 1);
let res = &res[0];
assert_eq!(res.items.len(), 1);
assert_eq!(res.items.keys().next().unwrap(), sk);
}

View file

@ -8,3 +8,5 @@ mod s3;
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
mod k2v; mod k2v;
#[cfg(feature = "k2v")]
mod k2v_client;

View file

@ -1,6 +1,6 @@
[package] [package]
name = "k2v-client" name = "k2v-client"
version = "0.1.1" version = "0.0.4"
authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"] authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license = "AGPL-3.0" license = "AGPL-3.0"
@ -10,24 +10,28 @@ readme = "../../README.md"
[dependencies] [dependencies]
base64 = "0.21" base64 = "0.21"
sha2 = "0.10"
hex = "0.4"
http = "0.2" http = "0.2"
log = "0.4" log = "0.4"
rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] } aws-sigv4 = "0.55"
rusoto_credential = "0.48.0" percent-encoding = "2.2"
rusoto_signature = "0.48.0" hyper = { version = "0.14", default-features = false, features = ["client", "http1", "http2"] }
hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12" ] } hyper-rustls = { version = "0.24", features = ["http2"] }
serde = "1.0" serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1.0" serde_json = "1.0"
thiserror = "1.0" thiserror = "1.0"
tokio = "1.24" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
# cli deps # cli deps
clap = { version = "4.1", optional = true, features = ["derive", "env"] } clap = { version = "4.1", optional = true, features = ["derive", "env"] }
garage_util = { workspace = true, optional = true } format_table = { workspace = true, optional = true }
tracing = { version = "0.1", optional = true }
tracing-subscriber = { version = "0.3", optional = true, features = ["env-filter"] }
[features] [features]
cli = ["clap", "tokio/fs", "tokio/io-std", "garage_util"] cli = ["clap", "tokio/fs", "tokio/io-std", "tracing", "tracing-subscriber", "format_table"]
[lib] [lib]
path = "lib.rs" path = "lib.rs"

View file

@ -2,12 +2,11 @@ use std::collections::BTreeMap;
use std::process::exit; use std::process::exit;
use std::time::Duration; use std::time::Duration;
use base64::prelude::*;
use k2v_client::*; use k2v_client::*;
use garage_util::formater::format_table; use format_table::format_table;
use rusoto_core::credential::AwsCredentials;
use rusoto_core::Region;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
@ -155,7 +154,9 @@ impl Value {
if let Some(ref text) = self.text { if let Some(ref text) = self.text {
Ok(text.as_bytes().to_vec()) Ok(text.as_bytes().to_vec())
} else if let Some(ref b64) = self.b64 { } else if let Some(ref b64) = self.b64 {
base64::decode(b64).map_err(|_| Error::Message("invalid base64 input".into())) BASE64_STANDARD
.decode(b64)
.map_err(|_| Error::Message("invalid base64 input".into()))
} else if let Some(ref path) = self.file { } else if let Some(ref path) = self.file {
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
if path == "-" { if path == "-" {
@ -230,7 +231,7 @@ impl ReadOutputKind {
for val in val.value { for val in val.value {
match val { match val {
K2vValue::Value(v) => { K2vValue::Value(v) => {
println!("{}", base64::encode(&v)) println!("{}", BASE64_STANDARD.encode(&v))
} }
K2vValue::Tombstone => { K2vValue::Tombstone => {
println!(); println!();
@ -249,7 +250,7 @@ impl ReadOutputKind {
if let Ok(string) = std::str::from_utf8(&v) { if let Ok(string) = std::str::from_utf8(&v) {
println!(" utf-8: {}", string); println!(" utf-8: {}", string);
} else { } else {
println!(" base64: {}", base64::encode(&v)); println!(" base64: {}", BASE64_STANDARD.encode(&v));
} }
} }
K2vValue::Tombstone => { K2vValue::Tombstone => {
@ -275,7 +276,7 @@ struct BatchOutputKind {
impl BatchOutputKind { impl BatchOutputKind {
fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! { fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! {
for (key, values) in values { for (key, values) in values {
println!("key: {}", key); println!("sort_key: {}", key);
let causality: String = values.causality.into(); let causality: String = values.causality.into();
println!("causality: {}", causality); println!("causality: {}", causality);
for value in values.value { for value in values.value {
@ -284,7 +285,7 @@ impl BatchOutputKind {
if let Ok(string) = std::str::from_utf8(&v) { if let Ok(string) = std::str::from_utf8(&v) {
println!(" value(utf-8): {}", string); println!(" value(utf-8): {}", string);
} else { } else {
println!(" value(base64): {}", base64::encode(&v)); println!(" value(base64): {}", BASE64_STANDARD.encode(&v));
} }
} }
K2vValue::Tombstone => { K2vValue::Tombstone => {
@ -393,16 +394,27 @@ impl Filter {
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Error> { async fn main() -> Result<(), Error> {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "warn")
}
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
.init();
let args = Args::parse(); let args = Args::parse();
let region = Region::Custom { let config = K2vClientConfig {
name: args.region,
endpoint: args.endpoint, endpoint: args.endpoint,
region: args.region,
aws_access_key_id: args.key_id,
aws_secret_access_key: args.secret,
bucket: args.bucket,
user_agent: None,
}; };
let creds = AwsCredentials::new(args.key_id, args.secret, None, None); let client = K2vClient::new(config)?;
let client = K2vClient::new(region, args.bucket, creds, None)?;
match args.command { match args.command {
Command::Insert { Command::Insert {
@ -520,7 +532,7 @@ async fn main() -> Result<(), Error> {
value value
.as_object_mut() .as_object_mut()
.unwrap() .unwrap()
.insert("sort_key".to_owned(), k.into()); .insert("partition_key".to_owned(), k.into());
value value
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -537,7 +549,7 @@ async fn main() -> Result<(), Error> {
} }
let mut to_print = Vec::new(); let mut to_print = Vec::new();
to_print.push(format!("key:\tentries\tconflicts\tvalues\tbytes")); to_print.push(format!("partition_key\tentries\tconflicts\tvalues\tbytes"));
for (k, v) in res.items { for (k, v) in res.items {
to_print.push(format!( to_print.push(format!(
"{}\t{}\t{}\t{}\t{}", "{}\t{}\t{}\t{}\t{}",

View file

@ -18,12 +18,20 @@ pub enum Error {
NotFound, NotFound,
#[error("io error: {0}")] #[error("io error: {0}")]
IoError(#[from] std::io::Error), IoError(#[from] std::io::Error),
#[error("rusoto tls error: {0}")] #[error("http error: {0}")]
RusotoTls(#[from] rusoto_core::request::TlsError), Http(#[from] http::Error),
#[error("rusoto http error: {0}")] #[error("hyper error: {0}")]
RusotoHttp(#[from] rusoto_core::HttpDispatchError), Hyper(#[from] hyper::Error),
#[error("invalid header: {0}")]
Header(#[from] hyper::header::ToStrError),
#[error("deserialization error: {0}")] #[error("deserialization error: {0}")]
Deserialization(#[from] serde_json::Error), Deserialization(#[from] serde_json::Error),
#[error("invalid signature parameters: {0}")]
SignParameters(#[from] aws_sigv4::signing_params::BuildError),
#[error("could not sign request: {0}")]
SignRequest(#[from] aws_sigv4::http_request::SigningError),
#[error("request timed out")]
Timeout,
#[error("{0}")] #[error("{0}")]
Message(Cow<'static, str>), Message(Cow<'static, str>),
} }

View file

@ -1,20 +1,23 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::time::Duration; use std::convert::TryInto;
use std::time::{Duration, SystemTime};
use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE}; use base64::prelude::*;
use http::status::StatusCode;
use http::HeaderMap;
use log::{debug, error}; use log::{debug, error};
use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC};
use http::header::{ACCEPT, CONTENT_TYPE};
use http::status::StatusCode;
use http::{HeaderName, HeaderValue, Request};
use hyper::{body::Bytes, Body};
use hyper::{client::connect::HttpConnector, Client as HttpClient};
use hyper_rustls::HttpsConnector;
use aws_sigv4::http_request::{sign, SignableRequest, SigningParams, SigningSettings};
use rusoto_core::{ByteStream, DispatchSignedRequest, HttpClient};
use rusoto_credential::AwsCredentials;
use rusoto_signature::region::Region;
use rusoto_signature::signature::SignedRequest;
use serde::de::Error as DeError; use serde::de::Error as DeError;
use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tokio::io::AsyncReadExt;
mod error; mod error;
pub use error::Error; pub use error::Error;
@ -22,41 +25,57 @@ pub use error::Error;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300); const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300);
const SERVICE: &str = "k2v"; const SERVICE: &str = "k2v";
const GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; const AMZ_CONTENT_SHA256: HeaderName = HeaderName::from_static("x-amz-content-sha256");
const GARAGE_CAUSALITY_TOKEN: HeaderName = HeaderName::from_static("x-garage-causality-token");
const STRICT_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC
.remove(b'_')
.remove(b'-')
.remove(b'.')
.remove(b'~');
const PATH_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC
.remove(b'/')
.remove(b'_')
.remove(b'-')
.remove(b'.')
.remove(b'~');
pub struct K2vClientConfig {
pub endpoint: String,
pub region: String,
pub aws_access_key_id: String,
pub aws_secret_access_key: String,
pub bucket: String,
pub user_agent: Option<String>,
}
/// Client used to query a K2V server. /// Client used to query a K2V server.
pub struct K2vClient { pub struct K2vClient {
region: Region, config: K2vClientConfig,
bucket: String, user_agent: HeaderValue,
creds: AwsCredentials, client: HttpClient<HttpsConnector<HttpConnector>>,
client: HttpClient,
} }
impl K2vClient { impl K2vClient {
/// Create a new K2V client. /// Create a new K2V client.
pub fn new( pub fn new(config: K2vClientConfig) -> Result<Self, Error> {
region: Region,
bucket: String,
creds: AwsCredentials,
user_agent: Option<String>,
) -> Result<Self, Error> {
let connector = hyper_rustls::HttpsConnectorBuilder::new() let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots() .with_native_roots()
.https_or_http() .https_or_http()
.enable_http1() .enable_http1()
.enable_http2() .enable_http2()
.build(); .build();
let mut client = HttpClient::from_connector(connector); let client = HttpClient::builder().build(connector);
if let Some(ua) = user_agent { let user_agent: std::borrow::Cow<str> = match &config.user_agent {
client.local_agent_prepend(ua); Some(ua) => ua.into(),
} else { None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(),
client.local_agent_prepend(format!("k2v/{}", env!("CARGO_PKG_VERSION"))); };
} let user_agent = HeaderValue::from_str(&user_agent)
.map_err(|_| Error::Message("invalid user agent".into()))?;
Ok(K2vClient { Ok(K2vClient {
region, config,
bucket,
creds,
client, client,
user_agent,
}) })
} }
@ -66,15 +85,10 @@ impl K2vClient {
partition_key: &str, partition_key: &str,
sort_key: &str, sort_key: &str,
) -> Result<CausalValue, Error> { ) -> Result<CausalValue, Error> {
let mut req = SignedRequest::new( let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]);
"GET", let req = Request::get(url)
SERVICE, .header(ACCEPT, "application/octet-stream, application/json")
&self.region, .body(Bytes::new())?;
&format!("/{}/{}", self.bucket, partition_key),
);
req.add_param("sort_key", sort_key);
req.add_header(ACCEPT, "application/octet-stream, application/json");
let res = self.dispatch(req, None).await?; let res = self.dispatch(req, None).await?;
let causality = res let causality = res
@ -91,7 +105,7 @@ impl K2vClient {
match res.content_type.as_deref() { match res.content_type.as_deref() {
Some("application/octet-stream") => Ok(CausalValue { Some("application/octet-stream") => Ok(CausalValue {
causality, causality,
value: vec![K2vValue::Value(res.body)], value: vec![K2vValue::Value(res.body.to_vec())],
}), }),
Some("application/json") => { Some("application/json") => {
let value = serde_json::from_slice(&res.body)?; let value = serde_json::from_slice(&res.body)?;
@ -115,16 +129,17 @@ impl K2vClient {
) -> Result<Option<CausalValue>, Error> { ) -> Result<Option<CausalValue>, Error> {
let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT); let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
let mut req = SignedRequest::new( let url = self.build_url(
"GET", Some(partition_key),
SERVICE, &[
&self.region, ("sort_key", sort_key),
&format!("/{}/{}", self.bucket, partition_key), ("causality_token", &causality.0),
("timeout", &timeout.as_secs().to_string()),
],
); );
req.add_param("sort_key", sort_key); let req = Request::get(url)
req.add_param("causality_token", &causality.0); .header(ACCEPT, "application/octet-stream, application/json")
req.add_param("timeout", &timeout.as_secs().to_string()); .body(Bytes::new())?;
req.add_header(ACCEPT, "application/octet-stream, application/json");
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
@ -146,7 +161,7 @@ impl K2vClient {
match res.content_type.as_deref() { match res.content_type.as_deref() {
Some("application/octet-stream") => Ok(Some(CausalValue { Some("application/octet-stream") => Ok(Some(CausalValue {
causality, causality,
value: vec![K2vValue::Value(res.body)], value: vec![K2vValue::Value(res.body.to_vec())],
})), })),
Some("application/json") => { Some("application/json") => {
let value = serde_json::from_slice(&res.body)?; let value = serde_json::from_slice(&res.body)?;
@ -176,16 +191,10 @@ impl K2vClient {
timeout: timeout.as_secs(), timeout: timeout.as_secs(),
}; };
let mut req = SignedRequest::new( let url = self.build_url(Some(partition_key), &[("poll_range", "")]);
"POST",
SERVICE,
&self.region,
&format!("/{}/{}", self.bucket, partition_key),
);
req.add_param("poll_range", "");
let payload = serde_json::to_vec(&request)?; let payload = serde_json::to_vec(&request)?;
req.set_payload(Some(payload)); let req = Request::post(url).body(Bytes::from(payload))?;
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
if res.status == StatusCode::NOT_MODIFIED { if res.status == StatusCode::NOT_MODIFIED {
@ -219,18 +228,12 @@ impl K2vClient {
value: Vec<u8>, value: Vec<u8>,
causality: Option<CausalityToken>, causality: Option<CausalityToken>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut req = SignedRequest::new( let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]);
"PUT", let mut req = Request::put(url);
SERVICE,
&self.region,
&format!("/{}/{}", self.bucket, partition_key),
);
req.add_param("sort_key", sort_key);
req.set_payload(Some(value));
if let Some(causality) = causality { if let Some(causality) = causality {
req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0); req = req.header(GARAGE_CAUSALITY_TOKEN, &causality.0);
} }
let req = req.body(Bytes::from(value))?;
self.dispatch(req, None).await?; self.dispatch(req, None).await?;
Ok(()) Ok(())
@ -243,14 +246,10 @@ impl K2vClient {
sort_key: &str, sort_key: &str,
causality: CausalityToken, causality: CausalityToken,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut req = SignedRequest::new( let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]);
"DELETE", let req = Request::delete(url)
SERVICE, .header(GARAGE_CAUSALITY_TOKEN, &causality.0)
&self.region, .body(Bytes::new())?;
&format!("/{}/{}", self.bucket, partition_key),
);
req.add_param("sort_key", sort_key);
req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0);
self.dispatch(req, None).await?; self.dispatch(req, None).await?;
Ok(()) Ok(())
@ -262,9 +261,9 @@ impl K2vClient {
&self, &self,
filter: Filter<'_>, filter: Filter<'_>,
) -> Result<PaginatedRange<PartitionInfo>, Error> { ) -> Result<PaginatedRange<PartitionInfo>, Error> {
let mut req = let params = filter.query_params();
SignedRequest::new("GET", SERVICE, &self.region, &format!("/{}", self.bucket)); let url = self.build_url(None, &params);
filter.insert_params(&mut req); let req = Request::get(url).body(Bytes::new())?;
let res = self.dispatch(req, None).await?; let res = self.dispatch(req, None).await?;
@ -286,11 +285,10 @@ impl K2vClient {
/// *not* atomic: it is possible for some sub-operations to fails and others to success. In /// *not* atomic: it is possible for some sub-operations to fails and others to success. In
/// that case, failure is reported. /// that case, failure is reported.
pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> { pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> {
let mut req = let url = self.build_url::<&str>(None, &[]);
SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
let payload = serde_json::to_vec(operations)?; let payload = serde_json::to_vec(operations)?;
req.set_payload(Some(payload)); let req = Request::post(url).body(payload.into())?;
self.dispatch(req, None).await?; self.dispatch(req, None).await?;
Ok(()) Ok(())
} }
@ -300,12 +298,10 @@ impl K2vClient {
&self, &self,
operations: &[BatchReadOp<'_>], operations: &[BatchReadOp<'_>],
) -> Result<Vec<PaginatedRange<CausalValue>>, Error> { ) -> Result<Vec<PaginatedRange<CausalValue>>, Error> {
let mut req = let url = self.build_url(None, &[("search", "")]);
SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
req.add_param("search", "");
let payload = serde_json::to_vec(operations)?; let payload = serde_json::to_vec(operations)?;
req.set_payload(Some(payload)); let req = Request::post(url).body(payload.into())?;
let res = self.dispatch(req, None).await?; let res = self.dispatch(req, None).await?;
let resp: Vec<BatchReadResponse> = serde_json::from_slice(&res.body)?; let resp: Vec<BatchReadResponse> = serde_json::from_slice(&res.body)?;
@ -334,12 +330,10 @@ impl K2vClient {
/// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without /// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without
/// providing causality information. /// providing causality information.
pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result<Vec<u64>, Error> { pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result<Vec<u64>, Error> {
let mut req = let url = self.build_url(None, &[("delete", "")]);
SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
req.add_param("delete", "");
let payload = serde_json::to_vec(operations)?; let payload = serde_json::to_vec(operations)?;
req.set_payload(Some(payload)); let req = Request::post(url).body(payload.into())?;
let res = self.dispatch(req, None).await?; let res = self.dispatch(req, None).await?;
let resp: Vec<BatchDeleteResponse> = serde_json::from_slice(&res.body)?; let resp: Vec<BatchDeleteResponse> = serde_json::from_slice(&res.body)?;
@ -349,33 +343,67 @@ impl K2vClient {
async fn dispatch( async fn dispatch(
&self, &self,
mut req: SignedRequest, mut req: Request<Bytes>,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> Result<Response, Error> { ) -> Result<Response, Error> {
req.sign(&self.creds); req.headers_mut()
let mut res = self .insert(http::header::USER_AGENT, self.user_agent.clone());
.client
.dispatch(req, Some(timeout.unwrap_or(DEFAULT_TIMEOUT)))
.await?;
let causality_token = res use sha2::{Digest, Sha256};
.headers let mut hasher = Sha256::new();
.remove(GARAGE_CAUSALITY_TOKEN) hasher.update(req.body());
.map(CausalityToken); let hash = hex::encode(&hasher.finalize());
let content_type = res.headers.remove(CONTENT_TYPE); req.headers_mut()
.insert(AMZ_CONTENT_SHA256, hash.try_into().unwrap());
debug!("request uri: {:?}", req.uri());
// Sign request
let signing_settings = SigningSettings::default();
let signing_params = SigningParams::builder()
.access_key(&self.config.aws_access_key_id)
.secret_key(&self.config.aws_secret_access_key)
.region(&self.config.region)
.service_name(SERVICE)
.time(SystemTime::now())
.settings(signing_settings)
.build()?;
// Convert the HTTP request into a signable request
let signable_request = SignableRequest::from(&req);
// Sign and then apply the signature to the request
let (signing_instructions, _signature) =
sign(signable_request, &signing_params)?.into_parts();
signing_instructions.apply_to_request(&mut req);
// Send and wait for timeout
let res = tokio::select! {
res = self.client.request(req.map(Body::from)) => res?,
_ = tokio::time::sleep(timeout.unwrap_or(DEFAULT_TIMEOUT)) => {
return Err(Error::Timeout);
}
};
let (mut res, body) = res.into_parts();
let causality_token = match res.headers.remove(GARAGE_CAUSALITY_TOKEN) {
Some(v) => Some(CausalityToken(v.to_str()?.to_string())),
None => None,
};
let content_type = match res.headers.remove(CONTENT_TYPE) {
Some(v) => Some(v.to_str()?.to_string()),
None => None,
};
let body = match res.status { let body = match res.status {
StatusCode::OK => read_body(&mut res.headers, res.body).await?, StatusCode::OK => hyper::body::to_bytes(body).await?,
StatusCode::NO_CONTENT => Vec::new(), StatusCode::NO_CONTENT => Bytes::new(),
StatusCode::NOT_FOUND => return Err(Error::NotFound), StatusCode::NOT_FOUND => return Err(Error::NotFound),
StatusCode::NOT_MODIFIED => Vec::new(), StatusCode::NOT_MODIFIED => Bytes::new(),
s => { s => {
let err_body = read_body(&mut res.headers, res.body) let err_body = hyper::body::to_bytes(body).await.unwrap_or_default();
.await
.unwrap_or_default();
let err_body_str = std::str::from_utf8(&err_body) let err_body_str = std::str::from_utf8(&err_body)
.map(String::from) .map(String::from)
.unwrap_or_else(|_| base64::encode(&err_body)); .unwrap_or_else(|_| BASE64_STANDARD.encode(&err_body));
if s.is_client_error() || s.is_server_error() { if s.is_client_error() || s.is_server_error() {
error!("Error response {}: {}", res.status, err_body_str); error!("Error response {}: {}", res.status, err_body_str);
@ -408,7 +436,7 @@ impl K2vClient {
"Response body: {}", "Response body: {}",
std::str::from_utf8(&body) std::str::from_utf8(&body)
.map(String::from) .map(String::from)
.unwrap_or_else(|_| base64::encode(&body)) .unwrap_or_else(|_| BASE64_STANDARD.encode(&body))
); );
Ok(Response { Ok(Response {
@ -418,16 +446,26 @@ impl K2vClient {
content_type, content_type,
}) })
} }
}
async fn read_body(headers: &mut HeaderMap<String>, body: ByteStream) -> Result<Vec<u8>, Error> { fn build_url<V: AsRef<str>>(&self, partition_key: Option<&str>, query: &[(&str, V)]) -> String {
let body_len = headers let mut url = format!("{}/{}", self.config.endpoint, self.config.bucket);
.get(CONTENT_LENGTH) if let Some(pk) = partition_key {
.and_then(|h| h.parse().ok()) url.push('/');
.unwrap_or(0); url.extend(utf8_percent_encode(pk, &PATH_ENCODE_SET));
let mut res = Vec::with_capacity(body_len); }
body.into_async_read().read_to_end(&mut res).await?; if !query.is_empty() {
Ok(res) url.push('?');
for (i, (k, v)) in query.iter().enumerate() {
if i > 0 {
url.push('&');
}
url.extend(utf8_percent_encode(k, &STRICT_ENCODE_SET));
url.push('=');
url.extend(utf8_percent_encode(v.as_ref(), &STRICT_ENCODE_SET));
}
}
url
}
} }
/// An opaque token used to convey causality between operations. /// An opaque token used to convey causality between operations.
@ -482,9 +520,11 @@ impl<'de> Deserialize<'de> for K2vValue {
{ {
let val: Option<&str> = Option::deserialize(d)?; let val: Option<&str> = Option::deserialize(d)?;
Ok(match val { Ok(match val {
Some(s) => { Some(s) => K2vValue::Value(
K2vValue::Value(base64::decode(s).map_err(|_| DeError::custom("invalid base64"))?) BASE64_STANDARD
} .decode(s)
.map_err(|_| DeError::custom("invalid base64"))?,
),
None => K2vValue::Tombstone, None => K2vValue::Tombstone,
}) })
} }
@ -498,7 +538,7 @@ impl Serialize for K2vValue {
match self { match self {
K2vValue::Tombstone => serializer.serialize_none(), K2vValue::Tombstone => serializer.serialize_none(),
K2vValue::Value(v) => { K2vValue::Value(v) => {
let b64 = base64::encode(v); let b64 = BASE64_STANDARD.encode(v);
serializer.serialize_str(&b64) serializer.serialize_str(&b64)
} }
} }
@ -554,22 +594,24 @@ struct PollRangeResponse {
} }
impl<'a> Filter<'a> { impl<'a> Filter<'a> {
fn insert_params(&self, req: &mut SignedRequest) { fn query_params(&self) -> Vec<(&'static str, std::borrow::Cow<str>)> {
if let Some(start) = &self.start { let mut res = Vec::<(&'static str, std::borrow::Cow<str>)>::with_capacity(8);
req.add_param("start", start); if let Some(start) = self.start.as_deref() {
res.push(("start", start.into()));
} }
if let Some(end) = &self.end { if let Some(end) = self.end.as_deref() {
req.add_param("end", end); res.push(("end", end.into()));
} }
if let Some(prefix) = &self.prefix { if let Some(prefix) = self.prefix.as_deref() {
req.add_param("prefix", prefix); res.push(("prefix", prefix.into()));
} }
if let Some(limit) = &self.limit { if let Some(limit) = &self.limit {
req.add_param("limit", &limit.to_string()); res.push(("limit", limit.to_string().into()));
} }
if self.reverse { if self.reverse {
req.add_param("reverse", "true"); res.push(("reverse", "true".into()));
} }
res
} }
} }
@ -691,7 +733,7 @@ struct ErrorResponse {
} }
struct Response { struct Response {
body: Vec<u8>, body: Bytes,
status: StatusCode, status: StatusCode,
causality_token: Option<CausalityToken>, causality_token: Option<CausalityToken>,
content_type: Option<String>, content_type: Option<String>,

View file

@ -79,7 +79,7 @@ impl RangeSeenMarker {
let bytes = nonversioned_encode(&self)?; let bytes = nonversioned_encode(&self)?;
let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?; let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?;
Ok(BASE64_STANDARD.encode(&bytes)) Ok(BASE64_STANDARD.encode(bytes))
} }
/// Decode from msgpack+zstd+b64 representation, returns None on error. /// Decode from msgpack+zstd+b64 representation, returns None on error.

View file

@ -8,16 +8,26 @@ use serde::{Deserialize, Serialize};
use netapp::NodeID; use netapp::NodeID;
use garage_util::config::ConsulDiscoveryAPI;
use garage_util::config::ConsulDiscoveryConfig; use garage_util::config::ConsulDiscoveryConfig;
const META_PREFIX: &str = "fr-deuxfleurs-garagehq";
#[derive(Deserialize, Clone, Debug)] #[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry { struct ConsulQueryEntry {
#[serde(rename = "Address")] #[serde(rename = "Address")]
address: String, address: String,
#[serde(rename = "ServicePort")] #[serde(rename = "ServicePort")]
service_port: u16, service_port: u16,
#[serde(rename = "NodeMeta")] #[serde(rename = "ServiceMeta")]
node_meta: HashMap<String, String>, meta: HashMap<String, String>,
}
#[derive(Serialize, Clone, Debug)]
#[serde(untagged)]
enum PublishRequest {
Catalog(ConsulPublishEntry),
Service(ConsulPublishService),
} }
#[derive(Serialize, Clone, Debug)] #[derive(Serialize, Clone, Debug)]
@ -26,17 +36,31 @@ struct ConsulPublishEntry {
node: String, node: String,
#[serde(rename = "Address")] #[serde(rename = "Address")]
address: IpAddr, address: IpAddr,
#[serde(rename = "NodeMeta")]
node_meta: HashMap<String, String>,
#[serde(rename = "Service")] #[serde(rename = "Service")]
service: ConsulPublishService, service: ConsulPublishCatalogService,
}
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishCatalogService {
#[serde(rename = "ID")]
service_id: String,
#[serde(rename = "Service")]
service_name: String,
#[serde(rename = "Tags")]
tags: Vec<String>,
#[serde(rename = "Meta")]
meta: HashMap<String, String>,
#[serde(rename = "Address")]
address: IpAddr,
#[serde(rename = "Port")]
port: u16,
} }
#[derive(Serialize, Clone, Debug)] #[derive(Serialize, Clone, Debug)]
struct ConsulPublishService { struct ConsulPublishService {
#[serde(rename = "ID")] #[serde(rename = "ID")]
service_id: String, service_id: String,
#[serde(rename = "Service")] #[serde(rename = "Name")]
service_name: String, service_name: String,
#[serde(rename = "Tags")] #[serde(rename = "Tags")]
tags: Vec<String>, tags: Vec<String>,
@ -44,10 +68,11 @@ struct ConsulPublishService {
address: IpAddr, address: IpAddr,
#[serde(rename = "Port")] #[serde(rename = "Port")]
port: u16, port: u16,
#[serde(rename = "Meta")]
meta: HashMap<String, String>,
} }
// ---- // ----
pub struct ConsulDiscovery { pub struct ConsulDiscovery {
config: ConsulDiscoveryConfig, config: ConsulDiscoveryConfig,
client: reqwest::Client, client: reqwest::Client,
@ -55,44 +80,48 @@ pub struct ConsulDiscovery {
impl ConsulDiscovery { impl ConsulDiscovery {
pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> { pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> {
let client = match (&config.client_cert, &config.client_key) { let mut builder: reqwest::ClientBuilder = reqwest::Client::builder().use_rustls_tls();
(Some(client_cert), Some(client_key)) => { if config.tls_skip_verify {
let mut client_cert_buf = vec![]; builder = builder.danger_accept_invalid_certs(true);
File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; } else if let Some(ca_cert) = &config.ca_cert {
let mut ca_cert_buf = vec![];
File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
builder =
builder.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?);
}
let mut client_key_buf = vec![]; match &config.api {
File::open(client_key)?.read_to_end(&mut client_key_buf)?; ConsulDiscoveryAPI::Catalog => match (&config.client_cert, &config.client_key) {
(Some(client_cert), Some(client_key)) => {
let mut client_cert_buf = vec![];
File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
let identity = reqwest::Identity::from_pem( let mut client_key_buf = vec![];
&[&client_cert_buf[..], &client_key_buf[..]].concat()[..], File::open(client_key)?.read_to_end(&mut client_key_buf)?;
)?;
if config.tls_skip_verify { let identity = reqwest::Identity::from_pem(
reqwest::Client::builder() &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
.use_rustls_tls() )?;
.danger_accept_invalid_certs(true)
.identity(identity)
.build()?
} else if let Some(ca_cert) = &config.ca_cert {
let mut ca_cert_buf = vec![];
File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
reqwest::Client::builder() builder = builder.identity(identity);
.use_rustls_tls() }
.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) (None, None) => {}
.identity(identity) _ => return Err(ConsulError::InvalidTLSConfig),
.build()? },
} else { ConsulDiscoveryAPI::Agent => {
reqwest::Client::builder() if let Some(token) = &config.token {
.use_rustls_tls() let mut headers = reqwest::header::HeaderMap::new();
.identity(identity) headers.insert(
.build()? "x-consul-token",
reqwest::header::HeaderValue::from_str(&token)?,
);
builder = builder.default_headers(headers);
} }
} }
(None, None) => reqwest::Client::new(),
_ => return Err(ConsulError::InvalidTLSConfig),
}; };
let client: reqwest::Client = builder.build()?;
Ok(Self { client, config }) Ok(Self { client, config })
} }
@ -111,8 +140,8 @@ impl ConsulDiscovery {
for ent in entries { for ent in entries {
let ip = ent.address.parse::<IpAddr>().ok(); let ip = ent.address.parse::<IpAddr>().ok();
let pubkey = ent let pubkey = ent
.node_meta .meta
.get("pubkey") .get(&format!("{}-pubkey", META_PREFIX))
.and_then(|k| hex::decode(k).ok()) .and_then(|k| hex::decode(k).ok())
.and_then(|k| NodeID::from_slice(&k[..])); .and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) { if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
@ -138,29 +167,49 @@ impl ConsulDiscovery {
rpc_public_addr: SocketAddr, rpc_public_addr: SocketAddr,
) -> Result<(), ConsulError> { ) -> Result<(), ConsulError> {
let node = format!("garage:{}", hex::encode(&node_id[..8])); let node = format!("garage:{}", hex::encode(&node_id[..8]));
let tags = [
vec!["advertised-by-garage".into(), hostname.into()],
self.config.tags.clone(),
]
.concat();
let advertisement = ConsulPublishEntry { let mut meta = self.config.meta.clone().unwrap_or_default();
node: node.clone(), meta.insert(format!("{}-pubkey", META_PREFIX), hex::encode(node_id));
address: rpc_public_addr.ip(), meta.insert(format!("{}-hostname", META_PREFIX), hostname.to_string());
node_meta: [
("pubkey".to_string(), hex::encode(node_id)), let url = format!(
("hostname".to_string(), hostname.to_string()), "{}/v1/{}",
] self.config.consul_http_addr,
.iter() (match &self.config.api {
.cloned() ConsulDiscoveryAPI::Catalog => "catalog/register",
.collect(), ConsulDiscoveryAPI::Agent => "agent/service/register?replace-existing-checks",
service: ConsulPublishService { })
);
let req = self.client.put(&url);
let advertisement: PublishRequest = match &self.config.api {
ConsulDiscoveryAPI::Catalog => PublishRequest::Catalog(ConsulPublishEntry {
node: node.clone(),
address: rpc_public_addr.ip(),
service: ConsulPublishCatalogService {
service_id: node.clone(),
service_name: self.config.service_name.clone(),
tags,
meta: meta.clone(),
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
},
}),
ConsulDiscoveryAPI::Agent => PublishRequest::Service(ConsulPublishService {
service_id: node.clone(), service_id: node.clone(),
service_name: self.config.service_name.clone(), service_name: self.config.service_name.clone(),
tags: vec!["advertised-by-garage".into(), hostname.into()], tags,
meta,
address: rpc_public_addr.ip(), address: rpc_public_addr.ip(),
port: rpc_public_addr.port(), port: rpc_public_addr.port(),
}, }),
}; };
let http = req.json(&advertisement).send().await?;
let url = format!("{}/v1/catalog/register", self.config.consul_http_addr);
let http = self.client.put(&url).json(&advertisement).send().await?;
http.error_for_status()?; http.error_for_status()?;
Ok(()) Ok(())
@ -176,4 +225,6 @@ pub enum ConsulError {
Reqwest(#[error(source)] reqwest::Error), Reqwest(#[error(source)] reqwest::Error),
#[error(display = "Invalid Consul TLS configuration")] #[error(display = "Invalid Consul TLS configuration")]
InvalidTLSConfig, InvalidTLSConfig,
#[error(display = "Token error: {}", _0)]
Token(#[error(source)] reqwest::header::InvalidHeaderValue),
} }

View file

@ -44,22 +44,22 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
impl<F: TableSchema, R: TableReplication> TableData<F, R> { impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db let store = db
.open_tree(&format!("{}:table", F::TABLE_NAME)) .open_tree(format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
let merkle_tree = db let merkle_tree = db
.open_tree(&format!("{}:merkle_tree", F::TABLE_NAME)) .open_tree(format!("{}:merkle_tree", F::TABLE_NAME))
.expect("Unable to open DB Merkle tree tree"); .expect("Unable to open DB Merkle tree tree");
let merkle_todo = db let merkle_todo = db
.open_tree(&format!("{}:merkle_todo", F::TABLE_NAME)) .open_tree(format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree"); .expect("Unable to open DB Merkle TODO tree");
let insert_queue = db let insert_queue = db
.open_tree(&format!("{}:insert_queue", F::TABLE_NAME)) .open_tree(format!("{}:insert_queue", F::TABLE_NAME))
.expect("Unable to open insert queue DB tree"); .expect("Unable to open insert queue DB tree");
let gc_todo = db let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) .open_tree(format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open GC DB tree"); .expect("Unable to open GC DB tree");
let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
@ -90,7 +90,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> { pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
let tree_key = self.tree_key(p, s); let tree_key = self.tree_key(p, s);
if let Some(bytes) = self.store.get(&tree_key)? { if let Some(bytes) = self.store.get(tree_key)? {
Ok(Some(ByteBuf::from(bytes.to_vec()))) Ok(Some(ByteBuf::from(bytes.to_vec())))
} else { } else {
Ok(None) Ok(None)
@ -132,10 +132,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
} }
} }
fn read_range_aux<'a>( fn read_range_aux(
&self, &self,
partition_hash: Hash, partition_hash: Hash,
range: db::ValueIter<'a>, range: db::ValueIter,
filter: &Option<F::Filter>, filter: &Option<F::Filter>,
limit: usize, limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> { ) -> Result<Vec<Arc<ByteBuf>>, Error> {

View file

@ -34,8 +34,9 @@ impl DeletedFilter {
} }
} }
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] #[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub enum EnumerationOrder { pub enum EnumerationOrder {
#[default]
Forward, Forward,
Reverse, Reverse,
} }
@ -49,9 +50,3 @@ impl EnumerationOrder {
} }
} }
} }
impl Default for EnumerationOrder {
fn default() -> Self {
EnumerationOrder::Forward
}
}

View file

@ -142,8 +142,19 @@ pub struct AdminConfig {
pub trace_sink: Option<String>, pub trace_sink: Option<String>,
} }
#[derive(Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "lowercase")]
pub enum ConsulDiscoveryAPI {
#[default]
Catalog,
Agent,
}
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct ConsulDiscoveryConfig { pub struct ConsulDiscoveryConfig {
/// The consul api to use when registering: either `catalog` (the default) or `agent`
#[serde(default)]
pub api: ConsulDiscoveryAPI,
/// Consul http or https address to connect to to discover more peers /// Consul http or https address to connect to to discover more peers
pub consul_http_addr: String, pub consul_http_addr: String,
/// Consul service name to use /// Consul service name to use
@ -154,9 +165,17 @@ pub struct ConsulDiscoveryConfig {
pub client_cert: Option<String>, pub client_cert: Option<String>,
/// Client TLS key to use when connecting to Consul /// Client TLS key to use when connecting to Consul
pub client_key: Option<String>, pub client_key: Option<String>,
/// /// Token to use for connecting to consul
pub token: Option<String>,
/// Skip TLS hostname verification /// Skip TLS hostname verification
#[serde(default)] #[serde(default)]
pub tls_skip_verify: bool, pub tls_skip_verify: bool,
/// Additional tags to add to the service
#[serde(default)]
pub tags: Vec<String>,
/// Additional service metadata to add
#[serde(default)]
pub meta: Option<std::collections::HashMap<String, String>>,
} }
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
@ -230,7 +249,7 @@ fn secret_from_file(
#[cfg(unix)] #[cfg(unix)]
if std::env::var("GARAGE_ALLOW_WORLD_READABLE_SECRETS").as_deref() != Ok("true") { if std::env::var("GARAGE_ALLOW_WORLD_READABLE_SECRETS").as_deref() != Ok("true") {
use std::os::unix::fs::MetadataExt; use std::os::unix::fs::MetadataExt;
let metadata = std::fs::metadata(&file_path)?; let metadata = std::fs::metadata(file_path)?;
if metadata.mode() & 0o077 != 0 { if metadata.mode() & 0o077 != 0 {
return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path, metadata.mode()).into()); return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path, metadata.mode()).into());
} }

View file

@ -13,7 +13,7 @@ pub fn handle_forwarded_for_headers(headers: &HeaderMap<HeaderValue>) -> Result<
.to_str() .to_str()
.ok_or_message("Error parsing X-Forwarded-For header")?; .ok_or_message("Error parsing X-Forwarded-For header")?;
let client_ip = IpAddr::from_str(&forwarded_for_ip_str) let client_ip = IpAddr::from_str(forwarded_for_ip_str)
.ok_or_message("Valid IP address not found in X-Forwarded-For header")?; .ok_or_message("Valid IP address not found in X-Forwarded-For header")?;
Ok(client_ip.to_string()) Ok(client_ip.to_string())

View file

@ -10,7 +10,6 @@ pub mod crdt;
pub mod data; pub mod data;
pub mod encode; pub mod encode;
pub mod error; pub mod error;
pub mod formater;
pub mod forwarded_headers; pub mod forwarded_headers;
pub mod metrics; pub mod metrics;
pub mod migrate; pub mod migrate;