Compare commits

..

41 commits

Author SHA1 Message Date
15da2156f6 Change position of the node-id argument 2022-03-19 18:03:23 +01:00
0529f3c34d Patch cargo2nix openssl override 2022-03-17 12:17:38 +01:00
db46cdef79
Update netapp to v0.4.1 2022-03-15 17:09:57 +01:00
ba6b56ae68
Fix some new clippy lints 2022-03-14 12:27:49 +01:00
0af314b295
Add comment for fsync 2022-03-14 11:54:00 +01:00
d78bf379fb
Fix resync queue to not drop items 2022-03-14 11:51:37 +01:00
f7e6f4616f
Spawn a single resync worker 2022-03-14 11:51:37 +01:00
dc5ec4ecf9
Add appropriate fsync() calls in write_block
to ensure that data is persisted properly
2022-03-14 11:51:32 +01:00
fe62d01b7e
Implement exponential backoff for resync retries 2022-03-14 11:41:20 +01:00
bfb4353df5
Update Grafana dashboard 2022-03-14 10:55:30 +01:00
9b2b531f4d
Make admin server optional 2022-03-14 10:54:25 +01:00
a19341b188
Add Grafana dashboard for Garage 2022-03-14 10:54:25 +01:00
2377a92f6b
Add wrapper over sled tree to count items (used for big queues) 2022-03-14 10:54:25 +01:00
203e8d2c34
Bump version to 0.7 because of incompatible Netapp 2022-03-14 10:54:24 +01:00
f869ca625d
Add spans to table calls, change span names in RPC 2022-03-14 10:54:12 +01:00
0cc31ee169
add missing netapp telemetry feature 2022-03-14 10:54:11 +01:00
dc8d0496cc
Refactoring: rename config files, make modifications less invasive 2022-03-14 10:53:51 +01:00
d9a35359bf
Add metrics to web endpoint 2022-03-14 10:53:50 +01:00
2a5609b292
Add metrics to API endpoint 2022-03-14 10:53:36 +01:00
818daa5c78
Refactor how durations are measured 2022-03-14 10:53:35 +01:00
f0d0cd9a20
Remove strum crate dependency; add protobuf nix dependency 2022-03-14 10:53:00 +01:00
55d4471599
Remove ... at end of hex IDs 2022-03-14 10:52:31 +01:00
bb04d94fa9
Update to Netapp 0.4 which supports distributed tracing 2022-03-14 10:52:30 +01:00
8c2fb0c066
Add tracing integration with opentelemetry 2022-03-14 10:52:13 +01:00
b6561f6e1b
Add docker-compose for traces & metrics 2022-03-14 10:51:52 +01:00
2cab84b1fe
Add many metrics in table/ and rpc/ 2022-03-14 10:51:50 +01:00
1e2cf26373
Implement basic metrics in table 2022-03-14 10:51:17 +01:00
mricher
e349af13a7
Update dependencies and add admin module with metrics
- Global dependencies updated in Cargo.lock
- New module created in src/admin to host:
  - the (future) admin REST API
  - the metric collection
- add configuration block

No metrics implemented yet
2022-03-14 10:51:12 +01:00
9d44127245
add support for kubernetes service discovery
This commit adds support to discover garage instances running in
kubernetes.

Once enabled by setting `kubernetes_namespace` and
`kubernetes_service_name` garage will create a Custom Resources
`garagenodes.deuxfleurs.fr` with nodes public key as the resource name.
and IP and Port information as spec in the namespace configured by
`kubernetes_namespace`.

For discovering nodes the resources are filtered with the optionally set
`kubernetes_service_name` which sets a label
`garage.deuxfleurs.fr/service` on the resources.

This allows to separate multiple garage deployments in a single
namespace.

the `kubernetes_skip_crd` variable allows to disable the creation of the
CRD by garage itself. The user must deploy this manually.
2022-03-12 13:05:52 +01:00
c00b2c9948 Functional tests for admin commands 2022-03-07 17:32:07 +01:00
8df1e186de Functional tests for website endpoints 2022-03-07 17:32:07 +01:00
2ef60b8417 Functional test for multipart endpoints 2022-03-07 17:32:07 +01:00
1e639ec67c Functional test for ListMultipartUploads 2022-03-07 17:32:07 +01:00
cfea1e0315 Functional tests for bucket endpoints 2022-03-07 17:32:02 +01:00
05eb79929e Functional tests for object operations 2022-03-07 17:05:10 +01:00
0f4e0e8bb9 Move ListObjects tests to Rust 2022-03-07 17:05:10 +01:00
2a3afcaf65 Test WinSCP 2022-03-03 14:29:10 +01:00
8a5bbc3b0b
More permissive OPTIONS on S3 API 2022-03-01 11:15:16 +01:00
97f245f218
Add tracing output to signature calculation 2022-02-28 12:22:39 +01:00
8129a98291
Process CORS earlier in pipeline 2022-02-28 12:22:39 +01:00
54e02b4c3b Force static builds for all platforms 2022-02-24 16:12:37 +01:00
75 changed files with 10348 additions and 1506 deletions

1651
Cargo.lock generated

File diff suppressed because it is too large Load diff

2977
Cargo.nix

File diff suppressed because it is too large Load diff

View file

@ -4,9 +4,10 @@ members = [
"src/rpc",
"src/table",
"src/model",
"src/admin",
"src/api",
"src/web",
"src/garage",
"src/garage"
]
[profile.dev]

View file

@ -11,14 +11,26 @@ with import ./nix/common.nix;
let
crossSystem = { config = target; };
in let
log = v: builtins.trace v v;
pkgs = import pkgsSrc {
inherit system crossSystem;
overlays = [ cargo2nixOverlay ];
};
/*
Rust and Nix triples are not the same. Cargo2nix has a dedicated library
to convert Nix triples to Rust ones. We need this conversion as we want to
set later options linked to our (rust) target in a generic way. Not only
the triple terminology is different, but also the "roles" are named differently.
Nix uses a build/host/target terminology where Nix's "host" maps to Cargo's "target".
*/
rustTarget = log (pkgs.rustBuilder.rustLib.rustTriple pkgs.stdenv.hostPlatform);
/*
Cargo2nix is built for rustOverlay which installs Rust from Mozilla releases.
We want our own Rust to avoir incompatibilities, like we had with musl 1.2.0.
We want our own Rust to avoid incompatibilities, like we had with musl 1.2.0.
rustc was built with musl < 1.2.0 and nix shipped musl >= 1.2.0 which lead to compilation breakage.
So we want a Rust release that is bound to our Nix repository to avoid these problems.
See here for more info: https://musl.libc.org/time64.html
@ -37,51 +49,69 @@ in let
overrides = pkgs.rustBuilder.overrides.all ++ [
/*
We want to inject the git version while keeping the build deterministic.
[1] We need to alter Nix hardening to be able to statically compile: PIE,
Position Independent Executables seems to be supported only on amd64. Having
this flags set either make our executables crash or compile as dynamic on many platforms.
In the following section codegenOpts, we reactive it for the supported targets
(only amd64 curently) through the `-static-pie` flag. PIE is a feature used
by ASLR, which helps mitigate security issues.
Learn more about Nix Hardening: https://github.com/NixOS/nixpkgs/blob/master/pkgs/build-support/cc-wrapper/add-hardening.sh
[2] We want to inject the git version while keeping the build deterministic.
As we do not want to consider the .git folder as part of the input source,
we ask the user (the CI often) to pass the value to Nix.
*/
(pkgs.rustBuilder.rustLib.makeOverride {
name = "garage";
overrideAttrs = drv: if git_version != null then {
preConfigure = ''
${drv.preConfigure or ""}
export GIT_VERSION="${git_version}"
'';
} else {};
overrideAttrs = drv:
/* [1] */ { hardeningDisable = [ "pie" ]; }
//
/* [2] */ (if git_version != null then {
preConfigure = ''
${drv.preConfigure or ""}
export GIT_VERSION="${git_version}"
'';
} else {});
})
/*
On a sandbox pure NixOS environment, /usr/bin/file is not available.
This is a known problem: https://github.com/NixOS/nixpkgs/issues/98440
We simply patch the file as suggested
*/
/*(pkgs.rustBuilder.rustLib.makeOverride {
name = "libsodium-sys";
overrideAttrs = drv: {
preConfigure = ''
${drv.preConfigure or ""}
sed -i 's,/usr/bin/file,${file}/bin/file,g' ./configure
'';
}
})*/
];
packageFun = import ./Cargo.nix;
/*
We compile fully static binaries with musl to simplify deployment on most systems.
When possible, we reactivate PIE hardening (see above).
Also, if you set the RUSTFLAGS environment variable, the following parameters will
be ignored.
For more information on static builds, please refer to Rust's RFC 1721.
https://rust-lang.github.io/rfcs/1721-crt-static.html#specifying-dynamicstatic-c-runtime-linkage
*/
codegenOpts = {
"armv6l-unknown-linux-musleabihf" = [ "target-feature=+crt-static" "link-arg=-static" ]; /* compile as dynamic with static-pie */
"aarch64-unknown-linux-musl" = [ "target-feature=+crt-static" "link-arg=-static" ]; /* segfault with static-pie */
"i686-unknown-linux-musl" = [ "target-feature=+crt-static" "link-arg=-static" ]; /* segfault with static-pie */
"x86_64-unknown-linux-musl" = [ "target-feature=+crt-static" "link-arg=-static-pie" ];
};
/*
The following definition is not elegant as we use a low level function of Cargo2nix
that enables us to pass our custom rustChannel object
that enables us to pass our custom rustChannel object. We need this low level definition
to pass Nix's Rust toolchains instead of Mozilla's one.
target is mandatory but must be kept to null to allow cargo2nix to set it to the appropriate value
for each crate.
*/
rustPkgs = pkgs.rustBuilder.makePackageSet {
inherit packageFun rustChannel release;
inherit packageFun rustChannel release codegenOpts;
packageOverrides = overrides;
target = null; /* we set target to null because we want that cargo2nix computes it automatically */
target = null;
buildRustPackages = pkgs.buildPackages.rustBuilder.makePackageSet {
inherit rustChannel packageFun;
inherit rustChannel packageFun codegenOpts;
packageOverrides = overrides;
target = null; /* we set target to null because we want that cargo2nix computes it automatically */
target = null;
};
};

View file

@ -261,10 +261,10 @@ have 66% chance of being stored by Venus and 33% chance of being stored by Mercu
Given the information above, we will configure our cluster as follow:
```bash
garage layout assign -z par1 -c 10 -t mercury 563e
garage layout assign -z par1 -c 20 -t venus 86f0
garage layout assign -z lon1 -c 20 -t earth 6814
garage layout assign -z bru1 -c 15 -t mars 212f
garage layout assign 563e -z par1 -c 10 -t mercury
garage layout assign 86f0 -z par1 -c 20 -t venus
garage layout assign 6814 -z lon1 -c 20 -t earth
garage layout assign 212f -z bru1 -c 15 -t mars
```
At this point, the changes in the cluster layout have not yet been applied.

View file

@ -29,6 +29,10 @@ bootstrap_peers = [
consul_host = "consul.service"
consul_service_name = "garage-daemon"
kubernetes_namespace = "garage"
kubernetes_service_name = "garage-daemon"
kubernetes_skip_crd = false
sled_cache_capacity = 134217728
sled_flush_every_ms = 2000
@ -181,6 +185,20 @@ RPC ports are announced.
Garage does not yet support talking to Consul over TLS.
### `kubernetes_namespace`, `kubernetes_service_name` and `kubernetes_skip_crd`
Garage supports discovering other nodes of the cluster using kubernetes custom
resources. For this to work `kubernetes_namespace` and `kubernetes_service_name`
need to be configured.
`kubernetes_namespace` sets the namespace in which the custom resources are
configured. `kubernetes_service_name` is added as a label to these resources to
filter them, to allow for multiple deployments in a single namespace.
`kubernetes_skip_crd` can be set to true to disable the automatic creation and
patching of the `garagenodes.deuxfleurs.fr` CRD. You will need to create the CRD
manually.
### `sled_cache_capacity`
This parameter can be used to tune the capacity of the cache used by

View file

@ -8,10 +8,10 @@ rec {
sha256 = "1xy9zpypqfxs5gcq5dcla4bfkhxmh5nzn9dyqkr03lqycm9wg5cr";
};
cargo2nixSrc = fetchGit {
# As of 2022-02-03
# As of 2022-03-17
url = "https://github.com/superboum/cargo2nix";
ref = "backward-compat";
rev = "08d963f32a774353ee8acf3f61749915875c1ec4";
ref = "main";
rev = "bcbf3ba99e9e01a61eb83a24624419c2dd9dec64";
};

View file

@ -18,6 +18,7 @@ let
pkgsHost = import pkgsSrc {};
lib = pkgsHost.lib;
kaniko = (import ./kaniko.nix) pkgsHost;
winscp = (import ./winscp.nix) pkgsHost;
in
lib.flatten (builtins.map (pkgs: [
pkgs.rustPlatform.rust.rustc
@ -25,5 +26,6 @@ in
pkgs.buildPackages.stdenv.cc
]) pkgsList) ++ [
kaniko
winscp
]

28
nix/winscp.nix Normal file
View file

@ -0,0 +1,28 @@
pkgs:
pkgs.stdenv.mkDerivation rec {
pname = "winscp";
version = "5.19.6";
src = pkgs.fetchzip {
url = "https://winscp.net/download/WinSCP-${version}-Portable.zip";
sha256 = "sha256-8+6JuT0b1fFQ6etaFTMSjIKvDGzmJoHAuByXiqCBzu0=";
stripRoot = false;
};
buildPhase = ''
cat > winscp <<EOF
#!${pkgs.bash}/bin/bash
WINEDEBUG=-all
${pkgs.winePackages.minimal}/bin/wine $out/opt/WinSCP.com
EOF
'';
installPhase = ''
mkdir -p $out/{bin,opt}
cp {WinSCP.com,WinSCP.exe} $out/opt
cp winscp $out/bin
chmod +x $out/bin/winscp
'';
}

View file

@ -44,6 +44,9 @@ root_domain = ".s3.garage.localhost"
bind_addr = "0.0.0.0:$((3920+$count))"
root_domain = ".web.garage.localhost"
index = "index.html"
[admin]
api_bind_addr = "0.0.0.0:$((9900+$count))"
EOF
echo -en "$LABEL configuration written to $CONF_PATH\n"

4
script/dev-env-winscp.sh Normal file
View file

@ -0,0 +1,4 @@
export AWS_ACCESS_KEY_ID=`cat /tmp/garage.s3 |cut -d' ' -f1`
export AWS_SECRET_ACCESS_KEY=`cat /tmp/garage.s3 |cut -d' ' -f2`
export AWS_DEFAULT_REGION='garage'
export WINSCP_URL="s3://${AWS_ACCESS_KEY_ID}:${AWS_SECRET_ACCESS_KEY}@127.0.0.1:4443 -certificate=* -rawsettings S3DefaultRegion=garage S3UrlStyle=1"

View file

@ -0,0 +1,3 @@
COMPOSE_PROJECT_NAME=telemetry
OTEL_COLLECT_TAG=0.44.0
ELASTIC_BUNDLE_TAG=7.17.0

View file

@ -0,0 +1,10 @@
apm-server:
# Defines the host and port the server is listening on. Use "unix:/path/to.sock" to listen on a unix domain socket.
host: "0.0.0.0:8200"
#-------------------------- Elasticsearch output --------------------------
output.elasticsearch:
# Array of hosts to connect to.
# Scheme and port can be left out and will be set to the default (`http` and `9200`).
# In case you specify and additional path, the scheme is required: `http://localhost:9200/path`.
# IPv6 addresses should always be defined as: `https://[2001:db8::1]:9200`.
hosts: ["localhost:9200"]

View file

@ -0,0 +1,69 @@
version: "2"
services:
otel:
image: otel/opentelemetry-collector-contrib:${OTEL_COLLECT_TAG}
command: [ "--config=/etc/otel-config.yaml" ]
volumes:
- ./otel-config.yaml:/etc/otel-config.yaml
network_mode: "host"
elastic:
image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTIC_BUNDLE_TAG}
container_name: elastic
environment:
- "node.name=elastic"
- "http.port=9200"
- "cluster.name=es-docker-cluster"
- "discovery.type=single-node"
- "bootstrap.memory_lock=true"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
nofile: 65536
volumes:
- "es_data:/usr/share/elasticsearch/data"
network_mode: "host"
# kibana instance and collectors
# see https://www.elastic.co/guide/en/elastic-stack-get-started/current/get-started-docker.html
kibana:
image: docker.elastic.co/kibana/kibana:${ELASTIC_BUNDLE_TAG}
container_name: kibana
environment:
SERVER_NAME: "kibana.local"
# ELASTICSEARCH_URL: "http://localhost:9700"
ELASTICSEARCH_HOSTS: "http://localhost:9200"
depends_on: [ 'elastic' ]
network_mode: "host"
apm:
image: docker.elastic.co/apm/apm-server:${ELASTIC_BUNDLE_TAG}
container_name: apm
volumes:
- "./apm-config.yaml:/usr/share/apm-server/apm-server.yml:ro"
depends_on: [ 'elastic' ]
network_mode: "host"
grafana:
# see https://grafana.com/docs/grafana/latest/installation/docker/
image: "grafana/grafana:8.3.5"
container_name: grafana
# restart: unless-stopped
environment:
- "GF_INSTALL_PLUGINS=grafana-clock-panel,grafana-simple-json-datasource,grafana-piechart-panel,grafana-worldmap-panel,grafana-polystat-panel"
network_mode: "host"
volumes:
# chown 472:472 if needed
- grafana:/var/lib/grafana
- ./grafana/provisioning/:/etc/grafana/provisioning/
volumes:
es_data:
driver: local
grafana:
driver: local
metricbeat:
driver: local

View file

@ -0,0 +1,20 @@
apiVersion: 1
datasources:
- name: DS_ELASTICSEARCH
type: elasticsearch
access: proxy
url: http://elastic:9700
password: ''
user: ''
database: metricbeat-*
basicAuth: false
isDefault: true
jsonData:
esVersion: 70
logLevelField: ''
logMessageField: ''
maxConcurrentShardRequests: 5
timeField: "@timestamp"
timeInterval: 10s
readOnly: false

View file

@ -0,0 +1,47 @@
receivers:
# Data sources: metrics, traces
otlp:
protocols:
grpc:
endpoint: ":4317"
http:
endpoint: ":55681"
# Data sources: metrics
prometheus:
config:
scrape_configs:
- job_name: "garage"
scrape_interval: 5s
static_configs:
- targets: ["localhost:3909"]
exporters:
logging:
logLevel: info
# see https://www.elastic.co/guide/en/apm/get-started/current/open-telemetry-elastic.html#open-telemetry-collector
otlp/elastic:
endpoint: "localhost:8200"
tls:
insecure: true
processors:
batch:
extensions:
health_check:
pprof:
endpoint: :1888
zpages:
endpoint: :55679
service:
extensions: [pprof, zpages, health_check]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging, otlp/elastic]
metrics:
receivers: [otlp, prometheus]
processors: [batch]
exporters: [logging, otlp/elastic]

File diff suppressed because it is too large Load diff

View file

@ -116,295 +116,33 @@ if [ -z "$SKIP_DUCK" ]; then
done
fi
# Advanced testing via S3API
if [ -z "$SKIP_AWS" ]; then
echo "🔌 Test S3API"
echo "Test Objects"
aws s3api put-object --bucket eprouvette --key a
aws s3api put-object --bucket eprouvette --key a/a
aws s3api put-object --bucket eprouvette --key a/b
aws s3api put-object --bucket eprouvette --key a/c
aws s3api put-object --bucket eprouvette --key a/d/a
aws s3api put-object --bucket eprouvette --key a/é
aws s3api put-object --bucket eprouvette --key b
aws s3api put-object --bucket eprouvette --key c
aws s3api list-objects-v2 --bucket eprouvette >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --page-size 0 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --page-size 999999999 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --page-size 1 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --delimiter '/' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects-v2 --bucket eprouvette --delimiter '/' --page-size 1 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects-v2 --bucket eprouvette --prefix 'a/' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --prefix 'a/' --delimiter '/' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 4 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects-v2 --bucket eprouvette --prefix 'a/' --page-size 1 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --prefix 'a/' --delimiter '/' --page-size 1 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 4 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects-v2 --bucket eprouvette --start-after 'Z' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects-v2 --bucket eprouvette --start-after 'c' >$CMDOUT
! [ -s $CMDOUT ]
aws s3api list-objects --bucket eprouvette >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects --bucket eprouvette --page-size 1 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects --bucket eprouvette --delimiter '/' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
# @FIXME it does not work as expected but might be a limitation of aws s3api
# The problem is the conjunction of a delimiter + pagination + v1 of listobjects
#aws s3api list-objects --bucket eprouvette --delimiter '/' --page-size 1 >$CMDOUT
#[ $(jq '.Contents | length' $CMDOUT) == 3 ]
#[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects --bucket eprouvette --prefix 'a/' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects --bucket eprouvette --prefix 'a/' --delimiter '/' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 4 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects --bucket eprouvette --prefix 'a/' --page-size 1 >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
# @FIXME idem
#aws s3api list-objects --bucket eprouvette --prefix 'a/' --delimiter '/' --page-size 1 >$CMDOUT
#[ $(jq '.Contents | length' $CMDOUT) == 4 ]
#[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-objects --bucket eprouvette --starting-token 'Z' >$CMDOUT
[ $(jq '.Contents | length' $CMDOUT) == 8 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-objects --bucket eprouvette --starting-token 'c' >$CMDOUT
! [ -s $CMDOUT ]
aws s3api list-objects-v2 --bucket eprouvette | \
jq -c '. | {Objects: [.Contents[] | {Key: .Key}], Quiet: true}' | \
aws s3api delete-objects --bucket eprouvette --delete file:///dev/stdin
echo "Test Multipart Upload"
aws s3api create-multipart-upload --bucket eprouvette --key a
aws s3api create-multipart-upload --bucket eprouvette --key a
aws s3api create-multipart-upload --bucket eprouvette --key c
aws s3api create-multipart-upload --bucket eprouvette --key c/a
aws s3api create-multipart-upload --bucket eprouvette --key c/b
aws s3api list-multipart-uploads --bucket eprouvette >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-multipart-uploads --bucket eprouvette --page-size 1 >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-multipart-uploads --bucket eprouvette --delimiter '/' >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-multipart-uploads --bucket eprouvette --delimiter '/' --page-size 1 >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-multipart-uploads --bucket eprouvette --prefix 'c' >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-multipart-uploads --bucket eprouvette --prefix 'c' --page-size 1 >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 3 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-multipart-uploads --bucket eprouvette --prefix 'c' --delimiter '/' >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 1 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-multipart-uploads --bucket eprouvette --prefix 'c' --delimiter '/' --page-size 1 >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 1 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 1 ]
aws s3api list-multipart-uploads --bucket eprouvette --starting-token 'ZZZZZ' >$CMDOUT
[ $(jq '.Uploads | length' $CMDOUT) == 5 ]
[ $(jq '.CommonPrefixes | length' $CMDOUT) == 0 ]
aws s3api list-multipart-uploads --bucket eprouvette --starting-token 'd' >$CMDOUT
! [ -s $CMDOUT ]
aws s3api list-multipart-uploads --bucket eprouvette | \
jq -r '.Uploads[] | "\(.Key) \(.UploadId)"' | \
while read r; do
key=$(echo $r|cut -d' ' -f 1);
uid=$(echo $r|cut -d' ' -f 2);
aws s3api abort-multipart-upload --bucket eprouvette --key $key --upload-id $uid;
echo "Deleted ${key}:${uid}"
done
echo "Test for ListParts"
UPLOAD_ID=$(aws s3api create-multipart-upload --bucket eprouvette --key list-parts | jq -r .UploadId)
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
[ $(jq '.Parts | length' $CMDOUT) == 0 ]
[ $(jq -r '.StorageClass' $CMDOUT) == 'STANDARD' ] # check that the result is not empty
ETAG1=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 1 --body /tmp/garage.2.rnd | jq .ETag)
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
[ $(jq '.Parts | length' $CMDOUT) == 1 ]
[ $(jq '.Parts[0].PartNumber' $CMDOUT) == 1 ]
[ $(jq '.Parts[0].Size' $CMDOUT) == 5242880 ]
[ $(jq '.Parts[0].ETag' $CMDOUT) == $ETAG1 ]
ETAG2=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 3 --body /tmp/garage.3.rnd | jq .ETag)
ETAG3=$(aws s3api upload-part --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --part-number 2 --body /tmp/garage.2.rnd | jq .ETag)
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
[ $(jq '.Parts | length' $CMDOUT) == 3 ]
[ $(jq '.Parts[1].ETag' $CMDOUT) == $ETAG3 ]
aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID --page-size 1 >$CMDOUT
[ $(jq '.Parts | length' $CMDOUT) == 3 ]
[ $(jq '.Parts[1].ETag' $CMDOUT) == $ETAG3 ]
cat >/tmp/garage.multipart_struct <<EOF
{
"Parts": [
{
"ETag": $ETAG1,
"PartNumber": 1
},
{
"ETag": $ETAG3,
"PartNumber": 2
},
{
"ETag": $ETAG2,
"PartNumber": 3
}
]
}
if [ -z "$SKIP_WINSCP" ]; then
echo "🛠️ Testing with winscp"
source ${SCRIPT_FOLDER}/dev-env-winscp.sh
winscp <<EOF
open $WINSCP_URL
ls
mkdir eprouvette/winscp
EOF
aws s3api complete-multipart-upload \
--bucket eprouvette --key list-parts --upload-id $UPLOAD_ID \
--multipart-upload file:///tmp/garage.multipart_struct
! aws s3api list-parts --bucket eprouvette --key list-parts --upload-id $UPLOAD_ID >$CMDOUT
aws s3 rm "s3://eprouvette/list-parts"
# @FIXME We do not write tests with --starting-token due to a bug with awscli
# See here: https://github.com/aws/aws-cli/issues/6666
echo "Test for UploadPartCopy"
aws s3 cp "/tmp/garage.3.rnd" "s3://eprouvette/copy_part_source"
UPLOAD_ID=$(aws s3api create-multipart-upload --bucket eprouvette --key test_multipart | jq -r .UploadId)
PART1=$(aws s3api upload-part \
--bucket eprouvette --key test_multipart \
--upload-id $UPLOAD_ID --part-number 1 \
--body /tmp/garage.2.rnd | jq .ETag)
PART2=$(aws s3api upload-part-copy \
--bucket eprouvette --key test_multipart \
--upload-id $UPLOAD_ID --part-number 2 \
--copy-source "/eprouvette/copy_part_source" \
--copy-source-range "bytes=500-5000500" \
| jq .CopyPartResult.ETag)
PART3=$(aws s3api upload-part \
--bucket eprouvette --key test_multipart \
--upload-id $UPLOAD_ID --part-number 3 \
--body /tmp/garage.3.rnd | jq .ETag)
cat >/tmp/garage.multipart_struct <<EOF
{
"Parts": [
{
"ETag": $PART1,
"PartNumber": 1
},
{
"ETag": $PART2,
"PartNumber": 2
},
{
"ETag": $PART3,
"PartNumber": 3
}
]
}
for idx in {1..3}.{rnd,b64}; do
winscp <<EOF
open $WINSCP_URL
put Z:\\tmp\\garage.$idx eprouvette/winscp/garage.$idx.winscp
ls eprouvette/winscp/
get eprouvette/winscp/garage.$idx.winscp Z:\\tmp\\garage.$idx.dl
rm eprouvette/winscp/garage.$idx.winscp
EOF
diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl
done
winscp <<EOF
open $WINSCP_URL
rm eprouvette/winscp
EOF
aws s3api complete-multipart-upload \
--bucket eprouvette --key test_multipart --upload-id $UPLOAD_ID \
--multipart-upload file:///tmp/garage.multipart_struct
aws s3 cp "s3://eprouvette/test_multipart" /tmp/garage.test_multipart
cat /tmp/garage.2.rnd <(tail -c +501 /tmp/garage.3.rnd | head -c 5000001) /tmp/garage.3.rnd > /tmp/garage.test_multipart_reference
diff /tmp/garage.test_multipart /tmp/garage.test_multipart_reference >/tmp/garage.test_multipart_diff 2>&1
aws s3 rm "s3://eprouvette/copy_part_source"
aws s3 rm "s3://eprouvette/test_multipart"
rm /tmp/garage.multipart_struct
rm /tmp/garage.test_multipart
rm /tmp/garage.test_multipart_reference
rm /tmp/garage.test_multipart_diff
echo "Test CORS endpoints"
garage -c /tmp/config.1.toml bucket website --allow eprouvette
aws s3api put-object --bucket eprouvette --key index.html
CORS='{"CORSRules":[{"AllowedHeaders":["*"],"AllowedMethods":["GET","PUT"],"AllowedOrigins":["*"]}]}'
aws s3api put-bucket-cors --bucket eprouvette --cors-configuration $CORS
[ `aws s3api get-bucket-cors --bucket eprouvette | jq -c` == $CORS ]
curl -s -i -H 'Origin: http://example.com' --header "Host: eprouvette.web.garage.localhost" http://127.0.0.1:3921/ | grep access-control-allow-origin
curl -s -i -X OPTIONS -H 'Access-Control-Request-Method: PUT' -H 'Origin: http://example.com' --header "Host: eprouvette.web.garage.localhost" http://127.0.0.1:3921/ | grep access-control-allow-methods
curl -s -i -X OPTIONS -H 'Access-Control-Request-Method: DELETE' -H 'Origin: http://example.com' --header "Host: eprouvette.web.garage.localhost" http://127.0.0.1:3921/ | grep '403 Forbidden'
#@TODO we may want to test the S3 endpoint but we need to handle authentication, which is way more complex.
aws s3api delete-bucket-cors --bucket eprouvette
! [ -s `aws s3api get-bucket-cors --bucket eprouvette` ]
curl -s -i -X OPTIONS -H 'Access-Control-Request-Method: PUT' -H 'Origin: http://example.com' --header "Host: eprouvette.web.garage.localhost" http://127.0.0.1:3921/ | grep '403 Forbidden'
aws s3api delete-object --bucket eprouvette --key index.html
garage -c /tmp/config.1.toml bucket website --deny eprouvette
fi
rm /tmp/garage.{1..3}.{rnd,b64}
if [ -z "$SKIP_AWS" ]; then
echo "🪣 Test bucket logic "
AWS_ACCESS_KEY_ID=`cat /tmp/garage.s3 |cut -d' ' -f1`
[ $(aws s3 ls | wc -l) == 1 ]
garage -c /tmp/config.1.toml bucket create seau
garage -c /tmp/config.1.toml bucket allow --read seau --key $AWS_ACCESS_KEY_ID
[ $(aws s3 ls | wc -l) == 2 ]
garage -c /tmp/config.1.toml bucket deny --read seau --key $AWS_ACCESS_KEY_ID
[ $(aws s3 ls | wc -l) == 1 ]
garage -c /tmp/config.1.toml bucket allow --read seau --key $AWS_ACCESS_KEY_ID
[ $(aws s3 ls | wc -l) == 2 ]
garage -c /tmp/config.1.toml bucket delete --yes seau
[ $(aws s3 ls | wc -l) == 1 ]
fi
if [ -z "$SKIP_AWS" ]; then
echo "🧪 Website Testing"
echo "<h1>hello world</h1>" > /tmp/garage-index.html
aws s3 cp /tmp/garage-index.html s3://eprouvette/index.html
[ `curl -s -o /dev/null -w "%{http_code}" --header "Host: eprouvette.web.garage.localhost" http://127.0.0.1:3921/ ` == 404 ]
garage -c /tmp/config.1.toml bucket website --allow eprouvette
[ `curl -s -o /dev/null -w "%{http_code}" --header "Host: eprouvette.web.garage.localhost" http://127.0.0.1:3921/ ` == 200 ]
garage -c /tmp/config.1.toml bucket website --deny eprouvette
[ `curl -s -o /dev/null -w "%{http_code}" --header "Host: eprouvette.web.garage.localhost" http://127.0.0.1:3921/ ` == 404 ]
aws s3 rm s3://eprouvette/index.html
rm /tmp/garage-index.html
fi
echo "🏁 Teardown"
AWS_ACCESS_KEY_ID=`cat /tmp/garage.s3 |cut -d' ' -f1`
AWS_SECRET_ACCESS_KEY=`cat /tmp/garage.s3 |cut -d' ' -f2`

View file

@ -13,6 +13,7 @@ let
overlays = [ cargo2nixOverlay ];
};
kaniko = (import ./nix/kaniko.nix) pkgs;
winscp = (import ./nix/winscp.nix) pkgs;
in
@ -76,10 +77,13 @@ function refresh_toolchain {
pkgs.rustPlatform.rust.cargo
pkgs.clippy
pkgs.rustfmt
pkgs.perl
pkgs.protobuf
cargo2nix.packages.x86_64-linux.cargo2nix
] else [])
++
(if integration then [
(if integration then [
winscp
pkgs.s3cmd
pkgs.awscli2
pkgs.minio-client

29
src/admin/Cargo.toml Normal file
View file

@ -0,0 +1,29 @@
[package]
name = "garage_admin"
version = "0.7.0"
authors = ["Maximilien Richer <code@mricher.fr>"]
edition = "2018"
license = "AGPL-3.0"
description = "Administration and metrics REST HTTP server for Garage"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
[lib]
path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
garage_util = { version = "0.7.0", path = "../util" }
hex = "0.4"
futures = "0.3"
futures-util = "0.3"
http = "0.2"
hyper = "0.14"
tracing = "0.1.30"
opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
opentelemetry-prometheus = "0.10"
opentelemetry-otlp = "0.10"
prometheus = "0.13"

6
src/admin/lib.rs Normal file
View file

@ -0,0 +1,6 @@
//! Crate for handling the admin and metric HTTP APIs
#[macro_use]
extern crate tracing;
pub mod metrics;
pub mod tracing_setup;

146
src/admin/metrics.rs Normal file
View file

@ -0,0 +1,146 @@
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::SystemTime;
use futures::future::*;
use hyper::{
header::CONTENT_TYPE,
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server,
};
use opentelemetry::{
global,
metrics::{BoundCounter, BoundValueRecorder},
trace::{FutureExt, TraceContextExt, Tracer},
Context,
};
use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder};
use garage_util::error::Error as GarageError;
use garage_util::metrics::*;
// serve_req on metric endpoint
async fn serve_req(
req: Request<Body>,
admin_server: Arc<AdminServer>,
) -> Result<Response<Body>, hyper::Error> {
info!("Receiving request at path {}", req.uri());
let request_start = SystemTime::now();
admin_server.metrics.http_counter.add(1);
let response = match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
let mut buffer = vec![];
let encoder = TextEncoder::new();
let tracer = opentelemetry::global::tracer("garage");
let metric_families = tracer.in_span("admin/gather_metrics", |_| {
admin_server.exporter.registry().gather()
});
encoder.encode(&metric_families, &mut buffer).unwrap();
admin_server
.metrics
.http_body_gauge
.record(buffer.len() as u64);
Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}
_ => Response::builder()
.status(404)
.body(Body::from("Not implemented"))
.unwrap(),
};
admin_server
.metrics
.http_req_histogram
.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(response)
}
// AdminServer hold the admin server internal admin_server and the metric exporter
pub struct AdminServer {
exporter: PrometheusExporter,
metrics: AdminServerMetrics,
}
// GarageMetricadmin_server holds the metrics counter definition for Garage
// FIXME: we would rather have that split up among the different libraries?
struct AdminServerMetrics {
http_counter: BoundCounter<u64>,
http_body_gauge: BoundValueRecorder<u64>,
http_req_histogram: BoundValueRecorder<f64>,
}
impl AdminServer {
/// init initilialize the AdminServer and background metric server
pub fn init() -> AdminServer {
let exporter = opentelemetry_prometheus::exporter().init();
let meter = global::meter("garage/admin_server");
AdminServer {
exporter,
metrics: AdminServerMetrics {
http_counter: meter
.u64_counter("admin.http_requests_total")
.with_description("Total number of HTTP requests made.")
.init()
.bind(&[]),
http_body_gauge: meter
.u64_value_recorder("admin.http_response_size_bytes")
.with_description("The metrics HTTP response sizes in bytes.")
.init()
.bind(&[]),
http_req_histogram: meter
.f64_value_recorder("admin.http_request_duration_seconds")
.with_description("The HTTP request latencies in seconds.")
.init()
.bind(&[]),
},
}
}
/// run execute the admin server on the designated HTTP port and listen for requests
pub async fn run(
self,
bind_addr: SocketAddr,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
let admin_server = Arc::new(self);
// For every connection, we must make a `Service` to handle all
// incoming HTTP requests on said connection.
let make_svc = make_service_fn(move |_conn| {
let admin_server = admin_server.clone();
// This is the `Service` that will handle the connection.
// `service_fn` is a helper to convert a function that
// returns a Response into a `Service`.
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer
.span_builder("admin/request")
.with_trace_id(gen_trace_id())
.start(&tracer);
serve_req(req, admin_server.clone())
.with_context(Context::current_with_span(span))
}))
}
});
let server = Server::bind(&bind_addr).serve(make_svc);
let graceful = server.with_graceful_shutdown(shutdown_signal);
info!("Admin server listening on http://{}", bind_addr);
graceful.await?;
Ok(())
}
}

View file

@ -0,0 +1,37 @@
use std::time::Duration;
use opentelemetry::sdk::{
trace::{self, IdGenerator, Sampler},
Resource,
};
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;
use garage_util::data::*;
use garage_util::error::*;
pub fn init_tracing(export_to: &str, node_id: Uuid) -> Result<(), Error> {
let node_id = hex::encode(&node_id.as_slice()[..8]);
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(export_to)
.with_timeout(Duration::from_secs(3)),
)
.with_trace_config(
trace::config()
.with_id_generator(IdGenerator::default())
.with_sampler(Sampler::AlwaysOn)
.with_resource(Resource::new(vec![
KeyValue::new("service.name", "garage"),
KeyValue::new("service.instance.id", node_id),
])),
)
.install_batch(opentelemetry::runtime::Tokio)
.ok_or_message("Unable to initialize tracing")?;
Ok(())
}

View file

@ -1,6 +1,6 @@
[package]
name = "garage_api"
version = "0.6.0"
version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@ -14,9 +14,9 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
garage_model = { version = "0.6.0", path = "../model" }
garage_table = { version = "0.6.0", path = "../table" }
garage_util = { version = "0.6.0", path = "../util" }
garage_model = { version = "0.7.0", path = "../model" }
garage_table = { version = "0.7.0", path = "../table" }
garage_util = { version = "0.7.0", path = "../util" }
base64 = "0.13"
bytes = "1.0"
@ -26,7 +26,7 @@ err-derive = "0.3"
hex = "0.4"
hmac = "0.10"
idna = "0.2"
log = "0.4"
tracing = "0.1.30"
md-5 = "0.9"
nom = "7.1"
sha2 = "0.9"
@ -49,3 +49,5 @@ serde_bytes = "0.11"
serde_json = "1.0"
quick-xml = { version = "0.21", features = [ "serialize" ] }
url = "2.1"
opentelemetry = "0.17"

View file

@ -7,8 +7,16 @@ use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server};
use opentelemetry::{
global,
metrics::{Counter, ValueRecorder},
trace::{FutureExt, TraceContextExt, Tracer},
Context, KeyValue,
};
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::metrics::{gen_trace_id, RecordDuration};
use garage_model::garage::Garage;
use garage_model::key_table::Key;
@ -30,6 +38,34 @@ use crate::s3_put::*;
use crate::s3_router::{Authorization, Endpoint};
use crate::s3_website::*;
struct ApiMetrics {
request_counter: Counter<u64>,
error_counter: Counter<u64>,
request_duration: ValueRecorder<f64>,
}
impl ApiMetrics {
fn new() -> Self {
let meter = global::meter("garage/api");
Self {
request_counter: meter
.u64_counter("api.request_counter")
.with_description("Number of API calls to the various S3 API endpoints")
.init(),
error_counter: meter
.u64_counter("api.error_counter")
.with_description(
"Number of API calls to the various S3 API endpoints that resulted in errors",
)
.init(),
request_duration: meter
.f64_value_recorder("api.request_duration")
.with_description("Duration of API calls to the various S3 API endpoints")
.init(),
}
}
}
/// Run the S3 API server
pub async fn run_api_server(
garage: Arc<Garage>,
@ -37,13 +73,19 @@ pub async fn run_api_server(
) -> Result<(), GarageError> {
let addr = &garage.config.s3_api.api_bind_addr;
let metrics = Arc::new(ApiMetrics::new());
let service = make_service_fn(|conn: &AddrStream| {
let garage = garage.clone();
let metrics = metrics.clone();
let client_addr = conn.remote_addr();
async move {
Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
let garage = garage.clone();
handler(garage, req, client_addr)
let metrics = metrics.clone();
handler(garage, metrics, req, client_addr)
}))
}
});
@ -59,13 +101,29 @@ pub async fn run_api_server(
async fn handler(
garage: Arc<Garage>,
metrics: Arc<ApiMetrics>,
req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, GarageError> {
let uri = req.uri().clone();
info!("{} {} {}", addr, req.method(), uri);
debug!("{:?}", req);
match handler_inner(garage.clone(), req).await {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer
.span_builder("S3 API call (unknown)")
.with_trace_id(gen_trace_id())
.with_attributes(vec![
KeyValue::new("method", format!("{}", req.method())),
KeyValue::new("uri", req.uri().to_string()),
])
.start(&tracer);
let res = handler_stage2(garage.clone(), metrics, req)
.with_context(Context::current_with_span(span))
.await;
match res {
Ok(x) => {
debug!("{} {:?}", x.status(), x.headers());
Ok(x)
@ -92,7 +150,11 @@ async fn handler(
}
}
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
async fn handler_stage2(
garage: Arc<Garage>,
metrics: Arc<ApiMetrics>,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
let authority = req
.headers()
.get(header::HOST)
@ -111,9 +173,53 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
let (endpoint, bucket_name) = Endpoint::from_request(&req, bucket_name.map(ToOwned::to_owned))?;
debug!("Endpoint: {:?}", endpoint);
if let Endpoint::PostObject {} = endpoint {
let current_context = Context::current();
let current_span = current_context.span();
current_span.update_name::<String>(format!("S3 API {}", endpoint.name()));
current_span.set_attribute(KeyValue::new("endpoint", endpoint.name()));
current_span.set_attribute(KeyValue::new(
"bucket",
bucket_name.clone().unwrap_or_default(),
));
let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())];
let res = handler_stage3(garage, req, endpoint, bucket_name)
.record_duration(&metrics.request_duration, &metrics_tags[..])
.await;
metrics.request_counter.add(1, &metrics_tags[..]);
let status_code = match &res {
Ok(r) => r.status(),
Err(e) => e.http_status_code(),
};
if status_code.is_client_error() || status_code.is_server_error() {
metrics.error_counter.add(
1,
&[
metrics_tags[0].clone(),
KeyValue::new("status_code", status_code.as_str().to_string()),
],
);
}
res
}
async fn handler_stage3(
garage: Arc<Garage>,
req: Request<Body>,
endpoint: Endpoint,
bucket_name: Option<String>,
) -> Result<Response<Body>, Error> {
// Some endpoints are processed early, before we even check for an API key
if let Endpoint::PostObject = endpoint {
return handle_post_object(garage, req, bucket_name.unwrap()).await;
}
if let Endpoint::Options = endpoint {
return handle_options_s3api(garage, &req, bucket_name).await;
}
let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?;
let api_key = api_key.ok_or_else(|| {
@ -161,7 +267,6 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
};
let resp = match endpoint {
Endpoint::Options => handle_options(&req, &bucket).await,
Endpoint::HeadObject {
key, part_number, ..
} => handle_head(garage, &req, bucket_id, &key, part_number).await,

View file

@ -1,6 +1,6 @@
//! Crate for serving a S3 compatible API
#[macro_use]
extern crate log;
extern crate tracing;
pub mod error;
pub use error::Error;

View file

@ -300,7 +300,12 @@ pub async fn handle_upload_part_copy(
current_offset = block_end;
}
trace!("UploadPartCopy will copy {} bytes", size_to_copy);
if size_to_copy < 1024 * 1024 {
return Err(Error::BadRequest(format!(
"Not enough data to copy: {} bytes (minimum: 1MB)",
size_to_copy
)));
}
// Now, actually copy the blocks
let mut md5hasher = Md5::new();

View file

@ -100,7 +100,63 @@ pub async fn handle_put_cors(
.body(Body::empty())?)
}
pub async fn handle_options(req: &Request<Body>, bucket: &Bucket) -> Result<Response<Body>, Error> {
pub async fn handle_options_s3api(
garage: Arc<Garage>,
req: &Request<Body>,
bucket_name: Option<String>,
) -> Result<Response<Body>, Error> {
// FIXME: CORS rules of buckets with local aliases are
// not taken into account.
// If the bucket name is a global bucket name,
// we try to apply the CORS rules of that bucket.
// If a user has a local bucket name that has
// the same name, its CORS rules won't be applied
// and will be shadowed by the rules of the globally
// existing bucket (but this is inevitable because
// OPTIONS calls are not auhtenticated).
if let Some(bn) = bucket_name {
let helper = garage.bucket_helper();
let bucket_id = helper.resolve_global_bucket_name(&bn).await?;
if let Some(id) = bucket_id {
let bucket = garage
.bucket_table
.get(&EmptyKey, &id)
.await?
.filter(|b| !b.state.is_deleted())
.ok_or(Error::NoSuchBucket)?;
handle_options_for_bucket(req, &bucket)
} else {
// If there is a bucket name in the request, but that name
// does not correspond to a global alias for a bucket,
// then it's either a non-existing bucket or a local bucket.
// We have no way of knowing, because the request is not
// authenticated and thus we can't resolve local aliases.
// We take the permissive approach of allowing everything,
// because we don't want to prevent web apps that use
// local bucket names from making API calls.
Ok(Response::builder()
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(ACCESS_CONTROL_ALLOW_METHODS, "*")
.status(StatusCode::OK)
.body(Body::empty())?)
}
} else {
// If there is no bucket name in the request,
// we are doing a ListBuckets call, which we want to allow
// for all origins.
Ok(Response::builder()
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(ACCESS_CONTROL_ALLOW_METHODS, "GET")
.status(StatusCode::OK)
.body(Body::empty())?)
}
}
pub fn handle_options_for_bucket(
req: &Request<Body>,
bucket: &Bucket,
) -> Result<Response<Body>, Error> {
let origin = req
.headers()
.get("Origin")
@ -144,12 +200,7 @@ pub fn find_matching_cors_rule<'a>(
None => vec![],
};
return Ok(cors_config.iter().find(|rule| {
cors_rule_matches(
rule,
origin,
&req.method().to_string(),
request_headers.iter(),
)
cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter())
}));
}
}

View file

@ -1042,12 +1042,12 @@ mod tests {
query.common.prefix = "a/".to_string();
assert_eq!(
common_prefix(&objs.get(0).unwrap(), &query.common),
common_prefix(objs.get(0).unwrap(), &query.common),
Some("a/b/")
);
query.common.prefix = "a/b/".to_string();
assert_eq!(common_prefix(&objs.get(0).unwrap(), &query.common), None);
assert_eq!(common_prefix(objs.get(0).unwrap(), &query.common), None);
}
#[test]
@ -1272,7 +1272,7 @@ mod tests {
Version {
bucket_id: uuid,
key: "a".to_string(),
uuid: uuid,
uuid,
deleted: false.into(),
blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks),
parts_etags: crdt::Map::<u64, String>::from_iter(etags),

View file

@ -414,8 +414,7 @@ pub enum Endpoint {
// It's intended to be used with HTML forms, using a multipart/form-data body.
// It works a lot like presigned requests, but everything is in the form instead
// of being query parameters of the URL, so authenticating it is a bit different.
PostObject {
},
PostObject,
}}
impl Endpoint {
@ -430,7 +429,11 @@ impl Endpoint {
let path = uri.path().trim_start_matches('/');
let query = uri.query();
if bucket.is_none() && path.is_empty() {
return Ok((Self::ListBuckets, None));
if *req.method() == Method::OPTIONS {
return Ok((Self::Options, None));
} else {
return Ok((Self::ListBuckets, None));
}
}
let (bucket, key) = if let Some(bucket) = bucket {

View file

@ -259,8 +259,7 @@ impl RoutingRuleInner {
let has_prefix = self
.condition
.as_ref()
.map(|c| c.prefix.as_ref())
.flatten()
.and_then(|c| c.prefix.as_ref())
.is_some();
self.redirect.validate(has_prefix)
}

View file

@ -51,7 +51,7 @@ pub async fn check_payload_signature(
let canonical_request = canonical_request(
request.method(),
&request.uri().path().to_string(),
request.uri().path(),
&canonical_query_string(request.uri()),
&headers,
&authorization.signed_headers,
@ -60,6 +60,9 @@ pub async fn check_payload_signature(
let (_, scope) = parse_credential(&authorization.credential)?;
let string_to_sign = string_to_sign(&authorization.date, &scope, &canonical_request);
trace!("canonical request:\n{}", canonical_request);
trace!("string to sign:\n{}", string_to_sign);
let key = verify_v4(
garage,
&authorization.credential,

View file

@ -1,6 +1,6 @@
[package]
name = "garage"
version = "0.6.0"
version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@ -21,17 +21,18 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
garage_api = { version = "0.6.0", path = "../api" }
garage_model = { version = "0.6.0", path = "../model" }
garage_rpc = { version = "0.6.0", path = "../rpc" }
garage_table = { version = "0.6.0", path = "../table" }
garage_util = { version = "0.6.0", path = "../util" }
garage_web = { version = "0.6.0", path = "../web" }
garage_api = { version = "0.7.0", path = "../api" }
garage_model = { version = "0.7.0", path = "../model" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.7.0", path = "../web" }
garage_admin = { version = "0.7.0", path = "../admin" }
bytes = "1.0"
git-version = "0.3.4"
hex = "0.4"
log = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4"
rand = "0.8"
async-trait = "0.1.7"
@ -49,11 +50,13 @@ futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.0"
#netapp = { version = "0.3.2", git = "https://git.deuxfleurs.fr/lx/netapp" }
#netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4.1"
[dev-dependencies]
aws-sdk-s3 = "0.6"
http = "0.2"
hyper = { version = "0.14", features = ["client", "http1", "runtime"] }
static_init = "1.0"

View file

@ -1,7 +1,5 @@
use std::path::PathBuf;
use log::warn;
use garage_util::error::*;
pub const READ_KEY_ERROR: &str = "Unable to read node key. It will be generated by your garage node the first time is it launched. Ensure that your garage node is currently running. (The node key is supposed to be stored in your metadata directory.)";

View file

@ -2,7 +2,7 @@
//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage instance
#[macro_use]
extern crate log;
extern crate tracing;
mod admin;
mod cli;
@ -55,7 +55,7 @@ struct Opt {
#[tokio::main]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "garage=info")
std::env::set_var("RUST_LOG", "netapp=info,garage=info")
}
pretty_env_logger::init();
sodiumoxide::init().expect("Unable to init sodiumoxide");
@ -106,7 +106,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Generate a temporary keypair for our RPC client
let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair();
let netapp = NetApp::new(network_key, sk);
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk);
// Find and parse the address of the target host
let (id, addr) = if let Some(h) = opt.rpc_host {
@ -115,7 +115,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
.err_context(READ_KEY_ERROR)?;
if let Some(a) = config.as_ref().map(|c| c.rpc_public_addr).flatten() {
if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr) {
(node_id, a)
} else {
let default_addr = SocketAddr::new(

View file

@ -6,6 +6,8 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
use garage_admin::metrics::*;
use garage_admin::tracing_setup::*;
use garage_api::run_api_server;
use garage_model::garage::Garage;
use garage_web::run_web_server;
@ -34,6 +36,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
.open()
.expect("Unable to open sled DB");
info!("Initialize admin web server and metric backend...");
let admin_server_init = AdminServer::init();
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
@ -41,9 +46,14 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing Garage main data store...");
let garage = Garage::new(config.clone(), db, background);
info!("Initialize tracing...");
if let Some(export_to) = config.admin.trace_sink {
init_tracing(&export_to, garage.system.id)?;
}
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Crate admin RPC handler...");
info!("Create admin RPC handler...");
AdminRpcHandler::new(garage.clone());
info!("Initializing API server...");
@ -58,6 +68,15 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
wait_from(watch_cancel.clone()),
));
let admin_server = if let Some(admin_bind_addr) = config.admin.api_bind_addr {
info!("Configure and run admin web server...");
Some(tokio::spawn(
admin_server_init.run(admin_bind_addr, wait_from(watch_cancel.clone())),
))
} else {
None
};
// Stuff runs
// When a cancel signal is sent, stuff stops
@ -67,6 +86,11 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
if let Err(e) = web_server.await? {
warn!("Web server exited with error: {}", e);
}
if let Some(a) = admin_server {
if let Err(e) = a.await? {
warn!("Admin web server exited with error: {}", e);
}
}
// Remove RPC handlers for system to break reference cycles
garage.system.netapp.drop_all_handlers();

74
src/garage/tests/admin.rs Normal file
View file

@ -0,0 +1,74 @@
use crate::common;
use crate::common::ext::*;
const BCKT_NAME: &str = "seau";
#[tokio::test]
async fn test_admin_bucket_perms() {
let ctx = common::context();
let hb = || ctx.client.head_bucket().bucket(BCKT_NAME).send();
assert!(hb().await.is_err());
ctx.garage
.command()
.args(["bucket", "create", BCKT_NAME])
.quiet()
.expect_success_status("Could not create bucket");
assert!(hb().await.is_err());
ctx.garage
.command()
.args([
"bucket",
"allow",
"--read",
"--key",
&ctx.garage.key.id,
BCKT_NAME,
])
.quiet()
.expect_success_status("Could not create bucket");
assert!(hb().await.is_ok());
ctx.garage
.command()
.args([
"bucket",
"deny",
"--read",
"--key",
&ctx.garage.key.name,
BCKT_NAME,
])
.quiet()
.expect_success_status("Could not create bucket");
assert!(hb().await.is_err());
ctx.garage
.command()
.args([
"bucket",
"allow",
"--read",
"--key",
&ctx.garage.key.name,
BCKT_NAME,
])
.quiet()
.expect_success_status("Could not create bucket");
assert!(hb().await.is_ok());
ctx.garage
.command()
.args(["bucket", "delete", "--yes", BCKT_NAME])
.quiet()
.expect_success_status("Could not delete bucket");
assert!(hb().await.is_err());
}

View file

@ -0,0 +1,87 @@
use crate::common;
use aws_sdk_s3::model::BucketLocationConstraint;
use aws_sdk_s3::output::DeleteBucketOutput;
#[tokio::test]
async fn test_bucket_all() {
let ctx = common::context();
let bucket_name = "hello";
{
// Create bucket
//@TODO check with an invalid bucket name + with an already existing bucket
let r = ctx
.client
.create_bucket()
.bucket(bucket_name)
.send()
.await
.unwrap();
assert_eq!(r.location.unwrap(), "/hello");
}
{
// List buckets
let r = ctx.client.list_buckets().send().await.unwrap();
assert!(r
.buckets
.as_ref()
.unwrap()
.iter()
.filter(|x| x.name.as_ref().is_some())
.find(|x| x.name.as_ref().unwrap() == "hello")
.is_some());
}
{
// Get its location
let r = ctx
.client
.get_bucket_location()
.bucket(bucket_name)
.send()
.await
.unwrap();
match r.location_constraint.unwrap() {
BucketLocationConstraint::Unknown(v) if v.as_str() == "garage-integ-test" => (),
_ => unreachable!("wrong region"),
}
}
{
// (Stub) check GetVersioning
let r = ctx
.client
.get_bucket_versioning()
.bucket(bucket_name)
.send()
.await
.unwrap();
assert!(r.status.is_none());
}
{
// Delete bucket
// @TODO add a check with a non-empty bucket and check failure
let r = ctx
.client
.delete_bucket()
.bucket(bucket_name)
.send()
.await
.unwrap();
assert_eq!(r, DeleteBucketOutput::builder().build());
}
{
// Check bucket is deleted with List buckets
let r = ctx.client.list_buckets().send().await.unwrap();
assert!(r
.buckets
.as_ref()
.unwrap()
.iter()
.filter(|x| x.name.as_ref().is_some())
.find(|x| x.name.as_ref().unwrap() == "hello")
.is_none());
}
}

View file

@ -6,7 +6,7 @@ use std::sync::Once;
use super::ext::*;
// https://xkcd.com/221/
const DEFAULT_PORT: u16 = 49995;
pub const DEFAULT_PORT: u16 = 49995;
static GARAGE_TEST_SECRET: &str =
"c3ea8cb80333d04e208d136698b1a01ae370d463f0d435ab2177510b3478bf44";
@ -65,6 +65,9 @@ root_domain = ".s3.garage"
bind_addr = "127.0.0.1:{web_port}"
root_domain = ".web.garage"
index = "index.html"
[admin]
api_bind_addr = "127.0.0.1:{admin_port}"
"#,
path = path.display(),
secret = GARAGE_TEST_SECRET,
@ -72,6 +75,7 @@ index = "index.html"
api_port = port,
rpc_port = port + 1,
web_port = port + 2,
admin_port = port + 3,
);
fs::write(path.join("config.toml"), config).expect("Could not write garage config file");

View file

@ -1,4 +1,10 @@
#[macro_use]
mod common;
mod admin;
mod bucket;
mod list;
mod multipart;
mod objects;
mod simple;
mod website;

615
src/garage/tests/list.rs Normal file
View file

@ -0,0 +1,615 @@
use crate::common;
const KEYS: [&str; 8] = ["a", "a/a", "a/b", "a/c", "a/d/a", "a/é", "b", "c"];
const KEYS_MULTIPART: [&str; 5] = ["a", "a", "c", "c/a", "c/b"];
#[tokio::test]
async fn test_listobjectsv2() {
let ctx = common::context();
let bucket = ctx.create_bucket("listobjectsv2");
for k in KEYS {
ctx.client
.put_object()
.bucket(&bucket)
.key(k)
.send()
.await
.unwrap();
}
{
// Scoping the variable to avoid reusing it
// in a following assert due to copy paste
let r = ctx
.client
.list_objects_v2()
.bucket(&bucket)
.send()
.await
.unwrap();
assert_eq!(r.contents.unwrap().len(), 8);
assert!(r.common_prefixes.is_none());
}
//@FIXME aws-sdk-s3 automatically checks max-key values.
// If we set it to zero, it drops it, and it is probably
// the same behavior on values bigger than 1000.
// Boto and awscli do not perform these tests, we should write
// our own minimal library to bypass AWS SDK's tests and be
// sure that we behave correctly.
{
// With 2 elements
let r = ctx
.client
.list_objects_v2()
.bucket(&bucket)
.max_keys(2)
.send()
.await
.unwrap();
assert_eq!(r.contents.unwrap().len(), 2);
assert!(r.common_prefixes.is_none());
assert!(r.next_continuation_token.is_some());
}
{
// With pagination
let mut cnt = 0;
let mut next = None;
let last_idx = KEYS.len() - 1;
for i in 0..KEYS.len() {
let r = ctx
.client
.list_objects_v2()
.bucket(&bucket)
.set_continuation_token(next)
.max_keys(1)
.send()
.await
.unwrap();
cnt += 1;
next = r.next_continuation_token;
assert_eq!(r.contents.unwrap().len(), 1);
assert!(r.common_prefixes.is_none());
if i != last_idx {
assert!(next.is_some());
}
}
assert_eq!(cnt, KEYS.len());
}
{
// With a delimiter
let r = ctx
.client
.list_objects_v2()
.bucket(&bucket)
.delimiter("/")
.send()
.await
.unwrap();
assert_eq!(r.contents.unwrap().len(), 3);
assert_eq!(r.common_prefixes.unwrap().len(), 1);
}
{
// With a delimiter and pagination
let mut cnt_pfx = 0;
let mut cnt_key = 0;
let mut next = None;
for _i in 0..KEYS.len() {
let r = ctx
.client
.list_objects_v2()
.bucket(&bucket)
.set_continuation_token(next)
.delimiter("/")
.max_keys(1)
.send()
.await
.unwrap();
next = r.next_continuation_token;
match (r.contents, r.common_prefixes) {
(Some(k), None) if k.len() == 1 => cnt_key += 1,
(None, Some(pfx)) if pfx.len() == 1 => cnt_pfx += 1,
_ => unreachable!("logic error"),
};
if next.is_none() {
break;
}
}
assert_eq!(cnt_key, 3);
assert_eq!(cnt_pfx, 1);
}
{
// With a prefix
let r = ctx
.client
.list_objects_v2()
.bucket(&bucket)
.prefix("a/")
.send()
.await
.unwrap();
assert_eq!(r.contents.unwrap().len(), 5);
assert!(r.common_prefixes.is_none());
}
{
// With a prefix and a delimiter
let r = ctx
.client
.list_objects_v2()
.bucket(&bucket)
.prefix("a/")
.delimiter("/")
.send()
.await
.unwrap();
assert_eq!(r.contents.unwrap().len(), 4);
assert_eq!(r.common_prefixes.unwrap().len(), 1);
}
{
// With a prefix, a delimiter and max_key
let r = ctx
.client
.list_objects_v2()
.bucket(&bucket)
.prefix("a/")
.delimiter("/")
.max_keys(1)
.send()
.await
.unwrap();
assert_eq!(r.contents.as_ref().unwrap().len(), 1);
assert_eq!(
r.contents
.unwrap()
.first()
.unwrap()
.key
.as_ref()
.unwrap()
.as_str(),
"a/a"
);
assert!(r.common_prefixes.is_none());
}
{
// With start_after before all keys
let r = ctx
.client
.list_objects_v2()
.bucket(&bucket)
.start_after("Z")
.send()
.await
.unwrap();
assert_eq!(r.contents.unwrap().len(), 8);
assert!(r.common_prefixes.is_none());
}
{
// With start_after after all keys
let r = ctx
.client
.list_objects_v2()
.bucket(&bucket)
.start_after("c")
.send()
.await
.unwrap();
assert!(r.contents.is_none());
assert!(r.common_prefixes.is_none());
}
}
#[tokio::test]
async fn test_listobjectsv1() {
let ctx = common::context();
let bucket = ctx.create_bucket("listobjects");
for k in KEYS {
ctx.client
.put_object()
.bucket(&bucket)
.key(k)
.send()
.await
.unwrap();
}
{
let r = ctx
.client
.list_objects()
.bucket(&bucket)
.send()
.await
.unwrap();
assert_eq!(r.contents.unwrap().len(), 8);
assert!(r.common_prefixes.is_none());
}
{
// With 2 elements
let r = ctx
.client
.list_objects()
.bucket(&bucket)
.max_keys(2)
.send()
.await
.unwrap();
assert_eq!(r.contents.unwrap().len(), 2);
assert!(r.common_prefixes.is_none());
assert!(r.next_marker.is_some());
}
{
// With pagination
let mut cnt = 0;
let mut next = None;
let last_idx = KEYS.len() - 1;
for i in 0..KEYS.len() {
let r = ctx
.client
.list_objects()
.bucket(&bucket)
.set_marker(next)
.max_keys(1)
.send()
.await
.unwrap();
cnt += 1;
next = r.next_marker;
assert_eq!(r.contents.unwrap().len(), 1);
assert!(r.common_prefixes.is_none());
if i != last_idx {
assert!(next.is_some());
}
}
assert_eq!(cnt, KEYS.len());
}
{
// With a delimiter
let r = ctx
.client
.list_objects()
.bucket(&bucket)
.delimiter("/")
.send()
.await
.unwrap();
assert_eq!(r.contents.unwrap().len(), 3);
assert_eq!(r.common_prefixes.unwrap().len(), 1);
}
{
// With a delimiter and pagination
let mut cnt_pfx = 0;
let mut cnt_key = 0;
let mut next = None;
for _i in 0..KEYS.len() {
let r = ctx
.client
.list_objects()
.bucket(&bucket)
.delimiter("/")
.set_marker(next)
.max_keys(1)
.send()
.await
.unwrap();
next = r.next_marker;
match (r.contents, r.common_prefixes) {
(Some(k), None) if k.len() == 1 => cnt_key += 1,
(None, Some(pfx)) if pfx.len() == 1 => cnt_pfx += 1,
_ => unreachable!("logic error"),
};
if next.is_none() {
break;
}
}
assert_eq!(cnt_key, 3);
// We have no optimization to skip the whole prefix
// on listobjectsv1 so we return the same one 5 times,
// for each element. It is up to the client to merge its result.
// This is compliant with AWS spec.
assert_eq!(cnt_pfx, 5);
}
{
// With a prefix
let r = ctx
.client
.list_objects()
.bucket(&bucket)
.prefix("a/")
.send()
.await
.unwrap();
assert_eq!(r.contents.unwrap().len(), 5);
assert!(r.common_prefixes.is_none());
}
{
// With a prefix and a delimiter
let r = ctx
.client
.list_objects()
.bucket(&bucket)
.prefix("a/")
.delimiter("/")
.send()
.await
.unwrap();
assert_eq!(r.contents.unwrap().len(), 4);
assert_eq!(r.common_prefixes.unwrap().len(), 1);
}
{
// With a prefix, a delimiter and max_key
let r = ctx
.client
.list_objects()
.bucket(&bucket)
.prefix("a/")
.delimiter("/")
.max_keys(1)
.send()
.await
.unwrap();
assert_eq!(r.contents.as_ref().unwrap().len(), 1);
assert_eq!(
r.contents
.unwrap()
.first()
.unwrap()
.key
.as_ref()
.unwrap()
.as_str(),
"a/a"
);
assert!(r.common_prefixes.is_none());
}
{
// With marker before all keys
let r = ctx
.client
.list_objects()
.bucket(&bucket)
.marker("Z")
.send()
.await
.unwrap();
assert_eq!(r.contents.unwrap().len(), 8);
assert!(r.common_prefixes.is_none());
}
{
// With start_after after all keys
let r = ctx
.client
.list_objects()
.bucket(&bucket)
.marker("c")
.send()
.await
.unwrap();
assert!(r.contents.is_none());
assert!(r.common_prefixes.is_none());
}
}
#[tokio::test]
async fn test_listmultipart() {
let ctx = common::context();
let bucket = ctx.create_bucket("listmultipartuploads");
for k in KEYS_MULTIPART {
ctx.client
.create_multipart_upload()
.bucket(&bucket)
.key(k)
.send()
.await
.unwrap();
}
{
// Default
let r = ctx
.client
.list_multipart_uploads()
.bucket(&bucket)
.send()
.await
.unwrap();
assert_eq!(r.uploads.unwrap().len(), 5);
assert!(r.common_prefixes.is_none());
}
{
// With pagination
let mut next = None;
let mut upnext = None;
let last_idx = KEYS_MULTIPART.len() - 1;
for i in 0..KEYS_MULTIPART.len() {
let r = ctx
.client
.list_multipart_uploads()
.bucket(&bucket)
.set_key_marker(next)
.set_upload_id_marker(upnext)
.max_uploads(1)
.send()
.await
.unwrap();
next = r.next_key_marker;
upnext = r.next_upload_id_marker;
assert_eq!(r.uploads.unwrap().len(), 1);
assert!(r.common_prefixes.is_none());
if i != last_idx {
assert!(next.is_some());
}
}
}
{
// With delimiter
let r = ctx
.client
.list_multipart_uploads()
.bucket(&bucket)
.delimiter("/")
.send()
.await
.unwrap();
assert_eq!(r.uploads.unwrap().len(), 3);
assert_eq!(r.common_prefixes.unwrap().len(), 1);
}
{
// With delimiter and pagination
let mut next = None;
let mut upnext = None;
let mut upcnt = 0;
let mut pfxcnt = 0;
let mut loopcnt = 0;
while loopcnt < KEYS_MULTIPART.len() {
let r = ctx
.client
.list_multipart_uploads()
.bucket(&bucket)
.delimiter("/")
.max_uploads(1)
.set_key_marker(next)
.set_upload_id_marker(upnext)
.send()
.await
.unwrap();
next = r.next_key_marker;
upnext = r.next_upload_id_marker;
loopcnt += 1;
upcnt += r.uploads.unwrap_or_default().len();
pfxcnt += r.common_prefixes.unwrap_or_default().len();
if next.is_none() {
break;
}
}
assert_eq!(upcnt + pfxcnt, loopcnt);
assert_eq!(upcnt, 3);
assert_eq!(pfxcnt, 1);
}
{
// With prefix
let r = ctx
.client
.list_multipart_uploads()
.bucket(&bucket)
.prefix("c")
.send()
.await
.unwrap();
assert_eq!(r.uploads.unwrap().len(), 3);
assert!(r.common_prefixes.is_none());
}
{
// With prefix and delimiter
let r = ctx
.client
.list_multipart_uploads()
.bucket(&bucket)
.prefix("c")
.delimiter("/")
.send()
.await
.unwrap();
assert_eq!(r.uploads.unwrap().len(), 1);
assert_eq!(r.common_prefixes.unwrap().len(), 1);
}
{
// With prefix, delimiter and max keys
let r = ctx
.client
.list_multipart_uploads()
.bucket(&bucket)
.prefix("c")
.delimiter("/")
.max_uploads(1)
.send()
.await
.unwrap();
assert_eq!(r.uploads.unwrap().len(), 1);
assert!(r.common_prefixes.is_none());
}
{
// With starting token before the first element
let r = ctx
.client
.list_multipart_uploads()
.bucket(&bucket)
.key_marker("ZZZZZ")
.send()
.await
.unwrap();
assert_eq!(r.uploads.unwrap().len(), 5);
assert!(r.common_prefixes.is_none());
}
{
// With starting token after the last element
let r = ctx
.client
.list_multipart_uploads()
.bucket(&bucket)
.key_marker("d")
.send()
.await
.unwrap();
assert!(r.uploads.is_none());
assert!(r.common_prefixes.is_none());
}
}

View file

@ -0,0 +1,415 @@
use crate::common;
use aws_sdk_s3::model::{CompletedMultipartUpload, CompletedPart};
use aws_sdk_s3::ByteStream;
const SZ_5MB: usize = 5 * 1024 * 1024;
const SZ_10MB: usize = 10 * 1024 * 1024;
#[tokio::test]
async fn test_uploadlistpart() {
let ctx = common::context();
let bucket = ctx.create_bucket("uploadpart");
let u1 = vec![0xee; SZ_5MB];
let u2 = vec![0x11; SZ_5MB];
let up = ctx
.client
.create_multipart_upload()
.bucket(&bucket)
.key("a")
.send()
.await
.unwrap();
let uid = up.upload_id.as_ref().unwrap();
assert!(up.upload_id.is_some());
{
let r = ctx
.client
.list_parts()
.bucket(&bucket)
.key("a")
.upload_id(uid)
.send()
.await
.unwrap();
assert!(r.parts.is_none());
}
let p1 = ctx
.client
.upload_part()
.bucket(&bucket)
.key("a")
.upload_id(uid)
.part_number(2)
.body(ByteStream::from(u1))
.send()
.await
.unwrap();
{
// ListPart on 1st element
let r = ctx
.client
.list_parts()
.bucket(&bucket)
.key("a")
.upload_id(uid)
.send()
.await
.unwrap();
let ps = r.parts.unwrap();
assert_eq!(ps.len(), 1);
let fp = ps.iter().find(|x| x.part_number == 2).unwrap();
assert!(fp.last_modified.is_some());
assert_eq!(
fp.e_tag.as_ref().unwrap(),
"\"3366bb9dcf710d6801b5926467d02e19\""
);
assert_eq!(fp.size, SZ_5MB as i64);
}
let p2 = ctx
.client
.upload_part()
.bucket(&bucket)
.key("a")
.upload_id(uid)
.part_number(1)
.body(ByteStream::from(u2))
.send()
.await
.unwrap();
{
// ListPart on the 2 elements
let r = ctx
.client
.list_parts()
.bucket(&bucket)
.key("a")
.upload_id(uid)
.send()
.await
.unwrap();
let ps = r.parts.unwrap();
assert_eq!(ps.len(), 2);
let fp = ps.iter().find(|x| x.part_number == 1).unwrap();
assert!(fp.last_modified.is_some());
assert_eq!(
fp.e_tag.as_ref().unwrap(),
"\"3c484266f9315485694556e6c693bfa2\""
);
assert_eq!(fp.size, SZ_5MB as i64);
}
{
// Call pagination
let r = ctx
.client
.list_parts()
.bucket(&bucket)
.key("a")
.upload_id(uid)
.max_parts(1)
.send()
.await
.unwrap();
assert!(r.part_number_marker.is_none());
assert!(r.next_part_number_marker.is_some());
assert_eq!(r.max_parts, 1_i32);
assert!(r.is_truncated);
assert_eq!(r.key.unwrap(), "a");
assert_eq!(r.upload_id.unwrap().as_str(), uid.as_str());
assert_eq!(r.parts.unwrap().len(), 1);
let r2 = ctx
.client
.list_parts()
.bucket(&bucket)
.key("a")
.upload_id(uid)
.max_parts(1)
.part_number_marker(r.next_part_number_marker.as_ref().unwrap())
.send()
.await
.unwrap();
assert_eq!(
r2.part_number_marker.as_ref().unwrap(),
r.next_part_number_marker.as_ref().unwrap()
);
assert_eq!(r2.max_parts, 1_i32);
assert!(r2.is_truncated);
assert_eq!(r2.key.unwrap(), "a");
assert_eq!(r2.upload_id.unwrap().as_str(), uid.as_str());
assert_eq!(r2.parts.unwrap().len(), 1);
}
let cmp = CompletedMultipartUpload::builder()
.parts(
CompletedPart::builder()
.part_number(1)
.e_tag(p2.e_tag.unwrap())
.build(),
)
.parts(
CompletedPart::builder()
.part_number(2)
.e_tag(p1.e_tag.unwrap())
.build(),
)
.build();
ctx.client
.complete_multipart_upload()
.bucket(&bucket)
.key("a")
.upload_id(uid)
.multipart_upload(cmp)
.send()
.await
.unwrap();
// The multipart upload must not appear anymore
assert!(ctx
.client
.list_parts()
.bucket(&bucket)
.key("a")
.upload_id(uid)
.send()
.await
.is_err());
{
// The object must appear as a regular object
let r = ctx
.client
.head_object()
.bucket(&bucket)
.key("a")
.send()
.await
.unwrap();
assert_eq!(r.content_length, (SZ_5MB * 2) as i64);
}
}
#[tokio::test]
async fn test_uploadpartcopy() {
let ctx = common::context();
let bucket = ctx.create_bucket("uploadpartcopy");
let u1 = vec![0x11; SZ_10MB];
let u2 = vec![0x22; SZ_5MB];
let u3 = vec![0x33; SZ_5MB];
let u4 = vec![0x44; SZ_5MB];
let u5 = vec![0x55; SZ_5MB];
let overflow = 5500000 - SZ_5MB;
let mut exp_obj = u3.clone();
exp_obj.extend(&u4[500..]);
exp_obj.extend(&u5[..overflow + 1]);
exp_obj.extend(&u2);
exp_obj.extend(&u1[500..5500000 + 1]);
// (setup) Upload a single part object
ctx.client
.put_object()
.bucket(&bucket)
.key("source1")
.body(ByteStream::from(u1))
.send()
.await
.unwrap();
// (setup) Upload a multipart object with 2 parts
{
let up = ctx
.client
.create_multipart_upload()
.bucket(&bucket)
.key("source2")
.send()
.await
.unwrap();
let uid = up.upload_id.as_ref().unwrap();
let p1 = ctx
.client
.upload_part()
.bucket(&bucket)
.key("source2")
.upload_id(uid)
.part_number(1)
.body(ByteStream::from(u4))
.send()
.await
.unwrap();
let p2 = ctx
.client
.upload_part()
.bucket(&bucket)
.key("source2")
.upload_id(uid)
.part_number(2)
.body(ByteStream::from(u5))
.send()
.await
.unwrap();
let cmp = CompletedMultipartUpload::builder()
.parts(
CompletedPart::builder()
.part_number(1)
.e_tag(p1.e_tag.unwrap())
.build(),
)
.parts(
CompletedPart::builder()
.part_number(2)
.e_tag(p2.e_tag.unwrap())
.build(),
)
.build();
ctx.client
.complete_multipart_upload()
.bucket(&bucket)
.key("source2")
.upload_id(uid)
.multipart_upload(cmp)
.send()
.await
.unwrap();
}
// Our multipart object that does copy
let up = ctx
.client
.create_multipart_upload()
.bucket(&bucket)
.key("target")
.send()
.await
.unwrap();
let uid = up.upload_id.as_ref().unwrap();
let p3 = ctx
.client
.upload_part()
.bucket(&bucket)
.key("target")
.upload_id(uid)
.part_number(3)
.body(ByteStream::from(u2))
.send()
.await
.unwrap();
let p1 = ctx
.client
.upload_part()
.bucket(&bucket)
.key("target")
.upload_id(uid)
.part_number(1)
.body(ByteStream::from(u3))
.send()
.await
.unwrap();
let p2 = ctx
.client
.upload_part_copy()
.bucket(&bucket)
.key("target")
.upload_id(uid)
.part_number(2)
.copy_source("uploadpartcopy/source2")
.copy_source_range("bytes=500-5500000")
.send()
.await
.unwrap();
let p4 = ctx
.client
.upload_part_copy()
.bucket(&bucket)
.key("target")
.upload_id(uid)
.part_number(4)
.copy_source("uploadpartcopy/source1")
.copy_source_range("bytes=500-5500000")
.send()
.await
.unwrap();
let cmp = CompletedMultipartUpload::builder()
.parts(
CompletedPart::builder()
.part_number(1)
.e_tag(p1.e_tag.unwrap())
.build(),
)
.parts(
CompletedPart::builder()
.part_number(2)
.e_tag(p2.copy_part_result.unwrap().e_tag.unwrap())
.build(),
)
.parts(
CompletedPart::builder()
.part_number(3)
.e_tag(p3.e_tag.unwrap())
.build(),
)
.parts(
CompletedPart::builder()
.part_number(4)
.e_tag(p4.copy_part_result.unwrap().e_tag.unwrap())
.build(),
)
.build();
ctx.client
.complete_multipart_upload()
.bucket(&bucket)
.key("target")
.upload_id(uid)
.multipart_upload(cmp)
.send()
.await
.unwrap();
// (check) Get object
let obj = ctx
.client
.get_object()
.bucket(&bucket)
.key("target")
.send()
.await
.unwrap();
let real_obj = obj
.body
.collect()
.await
.expect("Error reading data")
.into_bytes();
assert_eq!(real_obj.len(), exp_obj.len());
assert_eq!(real_obj, exp_obj);
}

266
src/garage/tests/objects.rs Normal file
View file

@ -0,0 +1,266 @@
use crate::common;
use aws_sdk_s3::model::{Delete, ObjectIdentifier};
use aws_sdk_s3::ByteStream;
const STD_KEY: &str = "hello world";
const CTRL_KEY: &str = "\x00\x01\x02\x00";
const UTF8_KEY: &str = "\u{211D}\u{1F923}\u{1F44B}";
const BODY: &[u8; 62] = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
#[tokio::test]
async fn test_putobject() {
let ctx = common::context();
let bucket = ctx.create_bucket("putobject");
{
// Send an empty object (can serve as a directory marker)
// with a content type
let etag = "\"d41d8cd98f00b204e9800998ecf8427e\"";
let content_type = "text/csv";
let r = ctx
.client
.put_object()
.bucket(&bucket)
.key(STD_KEY)
.content_type(content_type)
.send()
.await
.unwrap();
assert_eq!(r.e_tag.unwrap().as_str(), etag);
// We return a version ID here
// We should check if Amazon is returning one when versioning is not enabled
assert!(r.version_id.is_some());
let _version = r.version_id.unwrap();
let o = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.send()
.await
.unwrap();
assert_bytes_eq!(o.body, b"");
assert_eq!(o.e_tag.unwrap(), etag);
// We do not return version ID
// We should check if Amazon is returning one when versioning is not enabled
// assert_eq!(o.version_id.unwrap(), _version);
assert_eq!(o.content_type.unwrap(), content_type);
assert!(o.last_modified.is_some());
assert_eq!(o.content_length, 0);
assert_eq!(o.parts_count, 0);
assert_eq!(o.tag_count, 0);
}
{
// Key with control characters,
// no content type and some data
let etag = "\"49f68a5c8493ec2c0bf489821c21fc3b\"";
let data = ByteStream::from_static(b"hi");
let r = ctx
.client
.put_object()
.bucket(&bucket)
.key(CTRL_KEY)
.body(data)
.send()
.await
.unwrap();
assert_eq!(r.e_tag.unwrap().as_str(), etag);
assert!(r.version_id.is_some());
let o = ctx
.client
.get_object()
.bucket(&bucket)
.key(CTRL_KEY)
.send()
.await
.unwrap();
assert_bytes_eq!(o.body, b"hi");
assert_eq!(o.e_tag.unwrap(), etag);
assert!(o.last_modified.is_some());
assert_eq!(o.content_length, 2);
assert_eq!(o.parts_count, 0);
assert_eq!(o.tag_count, 0);
}
{
// Key with UTF8 codepoints including emoji
let etag = "\"d41d8cd98f00b204e9800998ecf8427e\"";
let r = ctx
.client
.put_object()
.bucket(&bucket)
.key(UTF8_KEY)
.send()
.await
.unwrap();
assert_eq!(r.e_tag.unwrap().as_str(), etag);
assert!(r.version_id.is_some());
let o = ctx
.client
.get_object()
.bucket(&bucket)
.key(UTF8_KEY)
.send()
.await
.unwrap();
assert_bytes_eq!(o.body, b"");
assert_eq!(o.e_tag.unwrap(), etag);
assert!(o.last_modified.is_some());
assert_eq!(o.content_length, 0);
assert_eq!(o.parts_count, 0);
assert_eq!(o.tag_count, 0);
}
}
#[tokio::test]
async fn test_getobject() {
let ctx = common::context();
let bucket = ctx.create_bucket("getobject");
let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
let data = ByteStream::from_static(BODY);
let r = ctx
.client
.put_object()
.bucket(&bucket)
.key(STD_KEY)
.body(data)
.send()
.await
.unwrap();
assert_eq!(r.e_tag.unwrap().as_str(), etag);
{
let o = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.range("bytes=1-9")
.send()
.await
.unwrap();
assert_eq!(o.content_range.unwrap().as_str(), "bytes 1-9/62");
assert_bytes_eq!(o.body, &BODY[1..10]);
}
{
let o = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.range("bytes=9-")
.send()
.await
.unwrap();
assert_eq!(o.content_range.unwrap().as_str(), "bytes 9-61/62");
assert_bytes_eq!(o.body, &BODY[9..]);
}
{
let o = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.range("bytes=-5")
.send()
.await
.unwrap();
assert_eq!(o.content_range.unwrap().as_str(), "bytes 57-61/62");
assert_bytes_eq!(o.body, &BODY[57..]);
}
}
#[tokio::test]
async fn test_deleteobject() {
let ctx = common::context();
let bucket = ctx.create_bucket("deleteobject");
let mut to_del = Delete::builder();
// add content without data
for i in 0..5 {
let k = format!("k-{}", i);
ctx.client
.put_object()
.bucket(&bucket)
.key(k.to_string())
.send()
.await
.unwrap();
if i > 0 {
to_del = to_del.objects(ObjectIdentifier::builder().key(k).build());
}
}
// add content with data
for i in 0..5 {
let k = format!("l-{}", i);
let data = ByteStream::from_static(BODY);
ctx.client
.put_object()
.bucket(&bucket)
.key(k.to_string())
.body(data)
.send()
.await
.unwrap();
if i > 0 {
to_del = to_del.objects(ObjectIdentifier::builder().key(k).build());
}
}
ctx.client
.delete_object()
.bucket(&bucket)
.key("k-0")
.send()
.await
.unwrap();
ctx.client
.delete_object()
.bucket(&bucket)
.key("l-0")
.send()
.await
.unwrap();
let r = ctx
.client
.delete_objects()
.bucket(&bucket)
.delete(to_del.build())
.send()
.await
.unwrap();
assert_eq!(r.deleted.unwrap().len(), 8);
let l = ctx
.client
.list_objects_v2()
.bucket(&bucket)
.send()
.await
.unwrap();
assert!(l.contents.is_none());
}

342
src/garage/tests/website.rs Normal file
View file

@ -0,0 +1,342 @@
use crate::common;
use crate::common::ext::*;
use aws_sdk_s3::{
model::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
ByteStream,
};
use http::Request;
use hyper::{
body::{to_bytes, Body},
Client,
};
const BODY: &[u8; 16] = b"<h1>bonjour</h1>";
const BODY_ERR: &[u8; 6] = b"erreur";
#[tokio::test]
async fn test_website() {
const BCKT_NAME: &str = "my-website";
let ctx = common::context();
let bucket = ctx.create_bucket(BCKT_NAME);
let data = ByteStream::from_static(BODY);
ctx.client
.put_object()
.bucket(&bucket)
.key("index.html")
.body(data)
.send()
.await
.unwrap();
let client = Client::new();
let req = || {
Request::builder()
.method("GET")
.uri(format!(
"http://127.0.0.1:{}/",
common::garage::DEFAULT_PORT + 2
))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::empty())
.unwrap()
};
let mut resp = client.request(req()).await.unwrap();
assert_eq!(resp.status(), 404);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
); /* check that we do not leak body */
ctx.garage
.command()
.args(["bucket", "website", "--allow", BCKT_NAME])
.quiet()
.expect_success_status("Could not allow website on bucket");
resp = client.request(req()).await.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
);
ctx.garage
.command()
.args(["bucket", "website", "--deny", BCKT_NAME])
.quiet()
.expect_success_status("Could not deny website on bucket");
resp = client.request(req()).await.unwrap();
assert_eq!(resp.status(), 404);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
); /* check that we do not leak body */
}
#[tokio::test]
async fn test_website_s3_api() {
const BCKT_NAME: &str = "my-cors";
let ctx = common::context();
let bucket = ctx.create_bucket(BCKT_NAME);
let data = ByteStream::from_static(BODY);
ctx.client
.put_object()
.bucket(&bucket)
.key("site/home.html")
.body(data)
.send()
.await
.unwrap();
ctx.client
.put_object()
.bucket(&bucket)
.key("err/error.html")
.body(ByteStream::from_static(BODY_ERR))
.send()
.await
.unwrap();
let conf = WebsiteConfiguration::builder()
.index_document(IndexDocument::builder().suffix("home.html").build())
.error_document(ErrorDocument::builder().key("err/error.html").build())
.build();
ctx.client
.put_bucket_website()
.bucket(&bucket)
.website_configuration(conf)
.send()
.await
.unwrap();
let cors = CorsConfiguration::builder()
.cors_rules(
CorsRule::builder()
.id("main-rule")
.allowed_headers("*")
.allowed_methods("GET")
.allowed_methods("PUT")
.allowed_origins("*")
.build(),
)
.build();
ctx.client
.put_bucket_cors()
.bucket(&bucket)
.cors_configuration(cors)
.send()
.await
.unwrap();
{
let cors_res = ctx
.client
.get_bucket_cors()
.bucket(&bucket)
.send()
.await
.unwrap();
let main_rule = cors_res.cors_rules().unwrap().iter().next().unwrap();
assert_eq!(main_rule.id.as_ref().unwrap(), "main-rule");
assert_eq!(
main_rule.allowed_headers.as_ref().unwrap(),
&vec!["*".to_string()]
);
assert_eq!(
main_rule.allowed_origins.as_ref().unwrap(),
&vec!["*".to_string()]
);
assert_eq!(
main_rule.allowed_methods.as_ref().unwrap(),
&vec!["GET".to_string(), "PUT".to_string()]
);
}
let client = Client::new();
// Test direct requests with CORS
{
let req = Request::builder()
.method("GET")
.uri(format!(
"http://127.0.0.1:{}/site/",
common::garage::DEFAULT_PORT + 2
))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.body(Body::empty())
.unwrap();
let mut resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(
resp.headers().get("access-control-allow-origin").unwrap(),
"*"
);
assert_eq!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
);
}
// Test ErrorDocument on 404
{
let req = Request::builder()
.method("GET")
.uri(format!(
"http://127.0.0.1:{}/wrong.html",
common::garage::DEFAULT_PORT + 2
))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::empty())
.unwrap();
let mut resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 404);
assert_eq!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY_ERR.as_ref()
);
}
// Test CORS with an allowed preflight request
{
let req = Request::builder()
.method("OPTIONS")
.uri(format!(
"http://127.0.0.1:{}/site/",
common::garage::DEFAULT_PORT + 2
))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "PUT")
.body(Body::empty())
.unwrap();
let mut resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(
resp.headers().get("access-control-allow-origin").unwrap(),
"*"
);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
);
}
// Test CORS with a forbidden preflight request
{
let req = Request::builder()
.method("OPTIONS")
.uri(format!(
"http://127.0.0.1:{}/site/",
common::garage::DEFAULT_PORT + 2
))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "DELETE")
.body(Body::empty())
.unwrap();
let mut resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 403);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
);
}
//@TODO test CORS on the S3 endpoint. We need to handle auth manually to check it.
// Delete cors
ctx.client
.delete_bucket_cors()
.bucket(&bucket)
.send()
.await
.unwrap();
// Check CORS are deleted from the API
// @FIXME check what is the expected behavior when GetBucketCors is called on a bucket without
// any CORS.
assert!(ctx
.client
.get_bucket_cors()
.bucket(&bucket)
.send()
.await
.is_err());
// Test CORS are not sent anymore on a previously allowed request
{
let req = Request::builder()
.method("OPTIONS")
.uri(format!(
"http://127.0.0.1:{}/site/",
common::garage::DEFAULT_PORT + 2
))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "PUT")
.body(Body::empty())
.unwrap();
let mut resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 403);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
);
}
// Disallow website from the API
ctx.client
.delete_bucket_website()
.bucket(&bucket)
.send()
.await
.unwrap();
// Check that the website is not served anymore
{
let req = Request::builder()
.method("GET")
.uri(format!(
"http://127.0.0.1:{}/site/",
common::garage::DEFAULT_PORT + 2
))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::empty())
.unwrap();
let mut resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 404);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY_ERR.as_ref()
);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
);
}
}

View file

@ -1,6 +1,6 @@
[package]
name = "garage_model"
version = "0.6.0"
version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@ -14,16 +14,16 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
garage_rpc = { version = "0.6.0", path = "../rpc" }
garage_table = { version = "0.6.0", path = "../table" }
garage_util = { version = "0.6.0", path = "../util" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
garage_util = { version = "0.7.0", path = "../util" }
garage_model_050 = { package = "garage_model", version = "0.5.1" }
async-trait = "0.1.7"
arc-swap = "1.0"
err-derive = "0.3"
hex = "0.4"
log = "0.4"
tracing = "0.1.30"
rand = "0.8"
zstd = { version = "0.9", default-features = false }
@ -36,6 +36,8 @@ serde_bytes = "0.11"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.0"
#netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4.1"

View file

@ -5,16 +5,24 @@ use std::time::Duration;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use zstd::stream::{decode_all as zstd_decode, Encoder};
use futures::future::*;
use futures::select;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
use zstd::stream::{decode_all as zstd_decode, Encoder};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
Context, KeyValue,
};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@ -23,15 +31,14 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block_metrics::*;
use crate::block_ref_table::*;
use crate::garage::Garage;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
pub const BACKGROUND_TRANQUILITY: u32 = 3;
pub const BACKGROUND_TRANQUILITY: u32 = 2;
// Timeout for RPCs that read and write blocks to remote nodes
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
@ -40,7 +47,8 @@ const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
// The delay between the time where a resync operation fails
// and the time when it is retried.
// and the time when it is retried, with exponential backoff
// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
// The delay between the moment when the reference counter
@ -148,12 +156,15 @@ pub struct BlockManager {
rc: sled::Tree,
resync_queue: sled::Tree,
resync_queue: SledCountedTree,
resync_notify: Notify,
resync_errors: SledCountedTree,
system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
pub(crate) garage: ArcSwapOption<Garage>,
metrics: BlockManagerMetrics,
}
// This custom struct contains functions that must only be ran
@ -175,6 +186,12 @@ impl BlockManager {
let resync_queue = db
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
let resync_queue = SledCountedTree::new(resync_queue);
let resync_errors = db
.open_tree("block_local_resync_errors")
.expect("Unable to open block_local_resync_errors tree");
let resync_errors = SledCountedTree::new(resync_errors);
let endpoint = system
.netapp
@ -182,6 +199,8 @@ impl BlockManager {
let manager_locked = BlockManagerLocked();
let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone());
let block_manager = Arc::new(Self {
replication,
data_dir,
@ -189,9 +208,11 @@ impl BlockManager {
rc,
resync_queue,
resync_notify: Notify::new(),
resync_errors,
system,
endpoint,
garage: ArcSwapOption::from(None),
metrics,
});
block_manager.endpoint.set_handler(block_manager.clone());
@ -380,15 +401,36 @@ impl BlockManager {
/// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> {
self.mutation_lock
let write_size = data.inner_buffer().len() as u64;
let res = self
.mutation_lock
.lock()
.await
.write_block(hash, data, self)
.await
.bound_record_duration(&self.metrics.block_write_duration)
.await?;
self.metrics.bytes_written.add(write_size);
Ok(res)
}
/// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
.await?;
self.metrics
.bytes_read
.add(data.inner_buffer().len() as u64);
Ok(BlockRpc::PutBlock { hash: *hash, data })
}
async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
let mut path = self.block_path(hash);
let compressed = match self.is_block_compressed(hash).await {
Ok(c) => c,
@ -414,6 +456,8 @@ impl BlockManager {
};
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
self.mutation_lock
.lock()
.await
@ -423,7 +467,7 @@ impl BlockManager {
return Err(Error::CorruptData(*hash));
}
Ok(BlockRpc::PutBlock { hash: *hash, data })
Ok(data)
}
/// Check if this node should have a block, but don't actually have it
@ -467,21 +511,22 @@ impl BlockManager {
// ---- Resync loop ----
pub fn spawn_background_worker(self: Arc<Self>) {
// Launch n simultaneous workers for background resync loop preprocessing
for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone();
let background = self.system.background.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await;
background.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
bm2.resync_loop(must_exit)
});
// Launch a background workers for background resync loop processing
let background = self.system.background.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
background.spawn_worker("block resync worker".into(), move |must_exit| {
self.resync_loop(must_exit)
});
}
});
}
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), Error> {
trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
@ -517,19 +562,74 @@ impl BlockManager {
}
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
if let Some(first_pair_res) = self.resync_queue.iter().next() {
let (time_bytes, hash_bytes) = first_pair_res?;
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
let res = self.resync_block(&hash).await;
if let Err(e) = &res {
warn!("Error when resyncing {:?}: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?;
if let Some(ec) = self.resync_errors.get(hash.as_slice())? {
let ec = ErrorCounter::decode(ec);
if now < ec.next_try() {
// if next retry after an error is not yet,
// don't do resync and return early, but still
// make sure the item is still in queue at expected time
self.put_to_resync_at(&hash, ec.next_try())?;
// ec.next_try() > now >= time_msec, so this remove
// is not removing the one we added just above
self.resync_queue.remove(time_bytes)?;
return Ok(false);
}
}
let tracer = opentelemetry::global::tracer("garage");
let trace_id = gen_uuid();
let span = tracer
.span_builder("Resync block")
.with_trace_id(
opentelemetry::trace::TraceId::from_hex(&hex::encode(
&trace_id.as_slice()[..16],
))
.unwrap(),
)
.with_attributes(vec![KeyValue::new("block", format!("{:?}", hash))])
.start(&tracer);
let res = self
.resync_block(&hash)
.with_context(Context::current_with_span(span))
.bound_record_duration(&self.metrics.resync_duration)
.await;
self.metrics.resync_counter.add(1);
if let Err(e) = &res {
self.metrics.resync_error_counter.add(1);
warn!("Error when resyncing {:?}: {}", hash, e);
let err_counter = match self.resync_errors.get(hash.as_slice())? {
Some(ec) => ErrorCounter::decode(ec).add1(now + 1),
None => ErrorCounter::new(now + 1),
};
self.resync_errors
.insert(hash.as_slice(), err_counter.encode())?;
self.put_to_resync_at(&hash, err_counter.next_try())?;
// err_counter.next_try() >= now + 1 > now,
// the entry we remove from the queue is not
// the entry we inserted with put_to_resync_at
self.resync_queue.remove(time_bytes)?;
} else {
self.resync_errors.remove(hash.as_slice())?;
self.resync_queue.remove(time_bytes)?;
}
Ok(true)
} else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
select! {
_ = delay.fuse() => {},
@ -607,6 +707,12 @@ impl BlockManager {
need_nodes.len()
);
for node in need_nodes.iter() {
self.metrics
.resync_send_counter
.add(1, &[KeyValue::new("to", format!("{:?}", node))]);
}
let put_block_message = self.read_block(hash).await?;
self.system
.rpc
@ -644,6 +750,9 @@ impl BlockManager {
);
let block_data = self.rpc_get_raw_block(hash).await?;
self.metrics.resync_recv_counter.add(1);
self.write_block(hash, &block_data).await?;
}
@ -760,9 +869,11 @@ impl BlockManagerLocked {
let data = data.inner_buffer();
let mut path = mgr.block_dir(hash);
fs::create_dir_all(&path).await?;
let directory = path.clone();
path.push(hex::encode(hash));
fs::create_dir_all(&directory).await?;
let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
(Ok(true), _) => return Ok(BlockRpc::Ok),
(Ok(false), false) => return Ok(BlockRpc::Ok),
@ -783,6 +894,7 @@ impl BlockManagerLocked {
path2.set_extension("tmp");
let mut f = fs::File::create(&path2).await?;
f.write_all(data).await?;
f.sync_all().await?;
drop(f);
fs::rename(path2, path).await?;
@ -790,6 +902,19 @@ impl BlockManagerLocked {
fs::remove_file(to_delete).await?;
}
// We want to ensure that when this function returns, data is properly persisted
// to disk. The first step is the sync_all above that does an fsync on the data file.
// Now, we do an fsync on the containing directory, to ensure that the rename
// is persisted properly. See:
// http://thedjbway.b0llix.net/qmail/syncdir.html
let dir = fs::OpenOptions::new()
.read(true)
.mode(0)
.open(directory)
.await?;
dir.sync_all().await?;
drop(dir);
Ok(BlockRpc::Ok)
}
@ -819,6 +944,7 @@ impl BlockManagerLocked {
path.set_extension("zst");
}
fs::remove_file(path).await?;
mgr.metrics.delete_counter.add(1);
}
Ok(())
}
@ -925,6 +1051,52 @@ impl RcEntry {
}
}
/// Counts the number of errors when resyncing a block,
/// and the time of the last try.
/// Used to implement exponential backoff.
#[derive(Clone, Copy, Debug)]
struct ErrorCounter {
errors: u64,
last_try: u64,
}
impl ErrorCounter {
fn new(now: u64) -> Self {
Self {
errors: 1,
last_try: now,
}
}
fn decode(data: sled::IVec) -> Self {
Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
}
}
fn encode(&self) -> Vec<u8> {
[
u64::to_be_bytes(self.errors),
u64::to_be_bytes(self.last_try),
]
.concat()
}
fn add1(self, now: u64) -> Self {
Self {
errors: self.errors + 1,
last_try: now,
}
}
fn delay_msec(&self) -> u64 {
(RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, 10)
}
fn next_try(&self) -> u64 {
self.last_try + self.delay_msec()
}
}
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?;

102
src/model/block_metrics.rs Normal file
View file

@ -0,0 +1,102 @@
use opentelemetry::{global, metrics::*};
use garage_util::sled_counter::SledCountedTree;
/// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics {
pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
pub(crate) resync_counter: BoundCounter<u64>,
pub(crate) resync_error_counter: BoundCounter<u64>,
pub(crate) resync_duration: BoundValueRecorder<f64>,
pub(crate) resync_send_counter: Counter<u64>,
pub(crate) resync_recv_counter: BoundCounter<u64>,
pub(crate) bytes_read: BoundCounter<u64>,
pub(crate) block_read_duration: BoundValueRecorder<f64>,
pub(crate) bytes_written: BoundCounter<u64>,
pub(crate) block_write_duration: BoundValueRecorder<f64>,
pub(crate) delete_counter: BoundCounter<u64>,
pub(crate) corruption_counter: BoundCounter<u64>,
}
impl BlockManagerMetrics {
pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self {
let meter = global::meter("garage_model/block");
Self {
_resync_queue_len: meter
.u64_value_observer("block.resync_queue_length", move |observer| {
observer.observe(resync_queue.len() as u64, &[])
})
.with_description(
"Number of block hashes queued for local check and possible resync",
)
.init(),
_resync_errored_blocks: meter
.u64_value_observer("block.resync_errored_blocks", move |observer| {
observer.observe(resync_errors.len() as u64, &[])
})
.with_description("Number of block hashes whose last resync resulted in an error")
.init(),
resync_counter: meter
.u64_counter("block.resync_counter")
.with_description("Number of calls to resync_block")
.init()
.bind(&[]),
resync_error_counter: meter
.u64_counter("block.resync_error_counter")
.with_description("Number of calls to resync_block that returned an error")
.init()
.bind(&[]),
resync_duration: meter
.f64_value_recorder("block.resync_duration")
.with_description("Duration of resync_block operations")
.init()
.bind(&[]),
resync_send_counter: meter
.u64_counter("block.resync_send_counter")
.with_description("Number of blocks sent to another node in resync operations")
.init(),
resync_recv_counter: meter
.u64_counter("block.resync_recv_counter")
.with_description("Number of blocks received from other nodes in resync operations")
.init()
.bind(&[]),
bytes_read: meter
.u64_counter("block.bytes_read")
.with_description("Number of bytes read from disk")
.init()
.bind(&[]),
block_read_duration: meter
.f64_value_recorder("block.read_duration")
.with_description("Duration of block read operations")
.init()
.bind(&[]),
bytes_written: meter
.u64_counter("block.bytes_written")
.with_description("Number of bytes written to disk")
.init()
.bind(&[]),
block_write_duration: meter
.f64_value_recorder("block.write_duration")
.with_description("Duration of block write operations")
.init()
.bind(&[]),
delete_counter: meter
.u64_counter("block.delete_counter")
.with_description("Number of blocks deleted")
.init()
.bind(&[]),
corruption_counter: meter
.u64_counter("block.corruption_counter")
.with_description("Data corruptions detected on block reads")
.init()
.bind(&[]),
}
}
}

View file

@ -30,8 +30,7 @@ impl<'a> BucketHelper<'a> {
// the AWS spec, and hex-encoded UUIDs are 64 chars long.
let hexbucket = hex::decode(bucket_name.as_str())
.ok()
.map(|by| Uuid::try_from(&by))
.flatten();
.and_then(|by| Uuid::try_from(&by));
if let Some(bucket_id) = hexbucket {
Ok(self
.0
@ -46,8 +45,7 @@ impl<'a> BucketHelper<'a> {
.bucket_alias_table
.get(&EmptyKey, bucket_name)
.await?
.map(|x| *x.state.get())
.flatten())
.and_then(|x| *x.state.get()))
}
}

View file

@ -106,8 +106,7 @@ impl Key {
/// Get permissions for a bucket
pub fn bucket_permissions(&self, bucket: &Uuid) -> BucketKeyPerm {
self.params()
.map(|params| params.authorized_buckets.get(bucket))
.flatten()
.and_then(|params| params.authorized_buckets.get(bucket))
.cloned()
.unwrap_or(BucketKeyPerm::NO_PERMISSIONS)
}

View file

@ -1,5 +1,5 @@
#[macro_use]
extern crate log;
extern crate tracing;
pub mod permission;
@ -11,6 +11,7 @@ pub mod object_table;
pub mod version_table;
pub mod block;
mod block_metrics;
pub mod garage;
pub mod helper;

View file

@ -1,6 +1,6 @@
[package]
name = "garage_rpc"
version = "0.6.0"
version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@ -14,13 +14,14 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
garage_util = { version = "0.6.0", path = "../util" }
garage_util = { version = "0.7.0", path = "../util" }
garage_admin = { version = "0.7.0", path = "../admin" }
arc-swap = "1.0"
bytes = "1.0"
gethostname = "0.2"
hex = "0.4"
log = "0.4"
tracing = "0.1.30"
rand = "0.8"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
@ -30,12 +31,24 @@ serde = { version = "1.0", default-features = false, features = ["derive", "rc"]
serde_bytes = "0.11"
serde_json = "1.0"
# newer version requires rust edition 2021
kube = { version = "0.62", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.13", features = ["v1_22"] }
openssl = { version = "0.10", features = ["vendored"] }
schemars = "0.8"
# newer version requires rust edition 2021
pnet = "0.28"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.0"
#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] }
netapp = { version = "0.4.1", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }

View file

@ -51,10 +51,8 @@ pub async fn get_consul_nodes(
let pubkey = ent
.node_meta
.get("pubkey")
.map(|k| hex::decode(&k).ok())
.flatten()
.map(|k| NodeID::from_slice(&k[..]))
.flatten();
.and_then(|k| hex::decode(&k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
} else {
@ -139,10 +137,10 @@ pub async fn publish_consul_service(
let resp = client.request(req).await?;
debug!("Response of advertising to Consul: {:?}", resp);
let resp_code = resp.status();
let resp_bytes = &hyper::body::to_bytes(resp.into_body()).await?;
debug!(
"{}",
std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?)
.unwrap_or("<invalid utf8>")
std::str::from_utf8(resp_bytes).unwrap_or("<invalid utf8>")
);
if resp_code != StatusCode::OK {

116
src/rpc/kubernetes.rs Normal file
View file

@ -0,0 +1,116 @@
use std::collections::BTreeMap;
use std::net::{IpAddr, SocketAddr};
use kube::{
api::{ListParams, Patch, PatchParams, PostParams},
Api, Client, CustomResource, CustomResourceExt,
};
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use netapp::NodeID;
use garage_util::error::Error;
static K8S_GROUP: &str = "deuxfleurs.fr";
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[kube(
group = "deuxfleurs.fr",
version = "v1",
kind = "GarageNode",
namespaced
)]
pub struct Node {
hostname: String,
address: IpAddr,
port: u16,
}
pub async fn create_kubernetes_crd() -> Result<(), Error> {
let client = Client::try_default().await?;
let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
let params = PatchParams::apply(&format!("garage.{}", K8S_GROUP));
let crd = GarageNode::crd();
let patch = Patch::Apply(crd);
crds.patch(&format!("garagenodes.{}", K8S_GROUP), &params, &patch)
.await?;
Ok(())
}
pub async fn get_kubernetes_nodes(
kubernetes_service_name: &str,
kubernetes_namespace: &str,
) -> Result<Vec<(NodeID, SocketAddr)>, Error> {
let client = Client::try_default().await?;
let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
let lp = ListParams::default().labels(&format!(
"garage.{}/service={}",
K8S_GROUP, kubernetes_service_name
));
let nodes = nodes.list(&lp).await?;
let mut ret = Vec::with_capacity(nodes.items.len());
for node in nodes {
println!("Found Pod: {:?}", node.metadata.name);
let pubkey = &node
.metadata
.name
.and_then(|k| hex::decode(&k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let Some(pubkey) = pubkey {
ret.push((*pubkey, SocketAddr::new(node.spec.address, node.spec.port)))
}
}
Ok(ret)
}
pub async fn publish_kubernetes_node(
kubernetes_service_name: &str,
kubernetes_namespace: &str,
node_id: NodeID,
hostname: &str,
rpc_public_addr: SocketAddr,
) -> Result<(), Error> {
let node_pubkey = hex::encode(node_id);
let mut node = GarageNode::new(
&node_pubkey,
Node {
hostname: hostname.to_string(),
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
},
);
let labels = node.metadata.labels.insert(BTreeMap::new());
labels.insert(
format!("garage.{}/service", K8S_GROUP),
kubernetes_service_name.to_string(),
);
debug!("Node object to be applied: {:#?}", node);
let client = Client::try_default().await?;
let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
if let Ok(old_node) = nodes.get(&node_pubkey).await {
node.metadata.resource_version = old_node.metadata.resource_version;
nodes
.replace(&node_pubkey, &PostParams::default(), &node)
.await?;
} else {
nodes.create(&PostParams::default(), &node).await?;
};
Ok(())
}

View file

@ -1,14 +1,16 @@
//! Crate containing rpc related functions and types used in Garage
#[macro_use]
extern crate log;
extern crate tracing;
mod consul;
mod kubernetes;
pub mod layout;
pub mod ring;
pub mod system;
mod metrics;
pub mod rpc_helper;
pub use rpc_helper::*;

55
src/rpc/metrics.rs Normal file
View file

@ -0,0 +1,55 @@
use std::sync::Arc;
use opentelemetry::{global, metrics::*};
use tokio::sync::Semaphore;
/// TableMetrics reference all counter used for metrics
pub struct RpcMetrics {
pub(crate) _rpc_available_permits: ValueObserver<u64>,
pub(crate) rpc_counter: Counter<u64>,
pub(crate) rpc_timeout_counter: Counter<u64>,
pub(crate) rpc_netapp_error_counter: Counter<u64>,
pub(crate) rpc_garage_error_counter: Counter<u64>,
pub(crate) rpc_duration: ValueRecorder<f64>,
pub(crate) rpc_queueing_time: ValueRecorder<f64>,
}
impl RpcMetrics {
pub fn new(sem: Arc<Semaphore>) -> Self {
let meter = global::meter("garage_rpc");
RpcMetrics {
_rpc_available_permits: meter
.u64_value_observer("rpc.available_permits", move |observer| {
observer.observe(sem.available_permits() as u64, &[])
})
.with_description("Number of available RPC permits")
.init(),
rpc_counter: meter
.u64_counter("rpc.request_counter")
.with_description("Number of RPC requests emitted")
.init(),
rpc_timeout_counter: meter
.u64_counter("rpc.timeout_counter")
.with_description("Number of RPC timeouts")
.init(),
rpc_netapp_error_counter: meter
.u64_counter("rpc.netapp_error_counter")
.with_description("Number of communication errors (errors in the Netapp library)")
.init(),
rpc_garage_error_counter: meter
.u64_counter("rpc.garage_error_counter")
.with_description("Number of RPC errors (errors happening when handling the RPC)")
.init(),
rpc_duration: meter
.f64_value_recorder("rpc.duration")
.with_description("Duration of RPCs")
.init(),
rpc_queueing_time: meter
.f64_value_recorder("rpc.queueing_time")
.with_description("Time RPC requests were queued for before being sent")
.init(),
}
}
}

View file

@ -9,6 +9,12 @@ use futures_util::future::FutureExt;
use tokio::select;
use tokio::sync::{watch, Semaphore};
use opentelemetry::KeyValue;
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, Span, TraceContextExt, Tracer},
Context,
};
pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::proto::*;
@ -17,7 +23,9 @@ pub use netapp::{NetApp, NodeID};
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
use crate::metrics::RpcMetrics;
use crate::ring::Ring;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
@ -76,7 +84,8 @@ struct RpcHelperInner {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
request_buffer_semaphore: Semaphore,
request_buffer_semaphore: Arc<Semaphore>,
metrics: RpcMetrics,
}
impl RpcHelper {
@ -86,12 +95,17 @@ impl RpcHelper {
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
) -> Self {
let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE));
let metrics = RpcMetrics::new(sem.clone());
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
background,
ring,
request_buffer_semaphore: Semaphore::new(REQUEST_BUFFER_SIZE),
request_buffer_semaphore: sem,
metrics,
}))
}
@ -120,21 +134,45 @@ impl RpcHelper {
M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
let metric_tags = [
KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
KeyValue::new("from", format!("{:?}", self.0.our_node_id)),
KeyValue::new("to", format!("{:?}", to)),
];
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
let permit = self
.0
.request_buffer_semaphore
.acquire_many(msg_size)
.record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
.await?;
self.0.metrics.rpc_counter.add(1, &metric_tags);
let node_id = to.into();
let rpc_call = endpoint
.call(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
select! {
res = endpoint.call(&node_id, &msg, strat.rs_priority) => {
res = rpc_call => {
drop(permit);
Ok(res??)
if res.is_err() {
self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
}
let res = res?;
if res.is_err() {
self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags);
}
Ok(res?)
}
_ = tokio::time::sleep(strat.rs_timeout) => {
drop(permit);
self.0.metrics.rpc_timeout_counter.add(1, &metric_tags);
Err(Error::Timeout)
}
}
@ -195,7 +233,47 @@ impl RpcHelper {
where
M: Rpc<Response = Result<S, Error>> + 'static,
H: EndpointHandler<M> + 'static,
S: Send,
S: Send + 'static,
{
let quorum = strategy.rs_quorum.unwrap_or(to.len());
let tracer = opentelemetry::global::tracer("garage");
let span_name = if strategy.rs_interrupt_after_quorum {
format!("RPC {} to {} of {}", endpoint.path(), quorum, to.len())
} else {
format!(
"RPC {} to {} (quorum {})",
endpoint.path(),
to.len(),
quorum
)
};
let mut span = tracer.start(span_name);
span.set_attribute(KeyValue::new("from", format!("{:?}", self.0.our_node_id)));
span.set_attribute(KeyValue::new("to", format!("{:?}", to)));
span.set_attribute(KeyValue::new("quorum", quorum as i64));
span.set_attribute(KeyValue::new(
"interrupt_after_quorum",
strategy.rs_interrupt_after_quorum.to_string(),
));
self.try_call_many_internal(endpoint, to, msg, strategy, quorum)
.with_context(Context::current_with_span(span))
.await
}
async fn try_call_many_internal<M, H, S>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
msg: M,
strategy: RequestStrategy,
quorum: usize,
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
H: EndpointHandler<M> + 'static,
S: Send + 'static,
{
let msg = Arc::new(msg);
@ -210,7 +288,6 @@ impl RpcHelper {
self2.call_arc(&endpoint2, to, msg, strategy).await
})
});
let quorum = strategy.rs_quorum.unwrap_or(to.len());
// Vectors in which success results and errors will be collected
let mut successes = vec![];
@ -245,8 +322,7 @@ impl RpcHelper {
let peer_avg_ping = peer_list
.iter()
.find(|x| x.id.as_ref() == to.as_slice())
.map(|pi| pi.avg_ping)
.flatten()
.and_then(|pi| pi.avg_ping)
.unwrap_or_else(|| Duration::from_secs(1));
(
to != self.0.our_node_id,
@ -274,8 +350,12 @@ impl RpcHelper {
// If the current set of requests that are running is not enough to possibly
// reach quorum, start some new requests.
while successes.len() + resp_stream.len() < quorum {
if let Some((_, _, _, _to, fut)) = requests.next() {
resp_stream.push(fut);
if let Some((_, _, _, req_to, fut)) = requests.next() {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", req_to));
resp_stream.push(tokio::spawn(
fut.with_context(Context::current_with_span(span)),
));
} else {
// If we have no request to add, we know that we won't ever
// reach quorum: bail out now.
@ -285,7 +365,7 @@ impl RpcHelper {
assert!(!resp_stream.is_empty()); // because of loop invariants
// Wait for one request to terminate
match resp_stream.next().await.unwrap() {
match resp_stream.next().await.unwrap().unwrap() {
Ok(msg) => {
successes.push(msg);
}

View file

@ -1,7 +1,7 @@
//! Module containing structs related to membership management
use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@ -29,6 +29,7 @@ use garage_util::persister::Persister;
use garage_util::time::*;
use crate::consul::*;
use crate::kubernetes::*;
use crate::layout::*;
use crate::ring::*;
use crate::rpc_helper::*;
@ -37,6 +38,9 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
/// Version tag used for version check upon Netapp connection
pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650007; // garage 0x0007
/// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
@ -88,6 +92,11 @@ pub struct System {
bootstrap_peers: Vec<(NodeID, SocketAddr)>,
consul_host: Option<String>,
consul_service_name: Option<String>,
kubernetes_service_name: Option<String>,
kubernetes_namespace: Option<String>,
kubernetes_skip_crd: bool,
replication_factor: usize,
/// The ring
@ -188,7 +197,10 @@ impl System {
) -> Arc<Self> {
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!("Node public key: {}", hex::encode(&node_key.public_key()));
info!(
"Node ID of this node: {}",
hex::encode(&node_key.public_key()[..8])
);
let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout");
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
@ -216,13 +228,7 @@ impl System {
let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
if let Some(addr) = config.rpc_public_addr {
println!("{}@{}", hex::encode(&node_key.public_key()), addr);
} else {
println!("{}", hex::encode(&node_key.public_key()));
}
let netapp = NetApp::new(network_key, node_key);
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(
netapp.clone(),
config.bootstrap_peers.clone(),
@ -247,6 +253,10 @@ impl System {
bootstrap_peers: config.bootstrap_peers.clone(),
consul_host: config.consul_host.clone(),
consul_service_name: config.consul_service_name.clone(),
kubernetes_service_name: config.kubernetes_service_name.clone(),
kubernetes_namespace: config.kubernetes_namespace.clone(),
kubernetes_skip_crd: config.kubernetes_skip_crd,
ring,
update_ring: Mutex::new(update_ring),
background,
@ -295,6 +305,44 @@ impl System {
.err_context("Error while publishing Consul service")
}
fn get_default_ip() -> IpAddr {
pnet::datalink::interfaces()
.iter()
.find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty())
.unwrap()
.ips
.first()
.unwrap()
.ip()
}
async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> {
let (kubernetes_service_name, kubernetes_namespace) =
match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
(Some(ch), Some(csn)) => (ch, csn),
_ => return Ok(()),
};
let rpc_public_addr =
match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("No rpc_public_addr configured, using first address on first network interface");
SocketAddr::new(Self::get_default_ip(), self.rpc_listen_addr.port())
}
};
publish_kubernetes_node(
kubernetes_service_name,
kubernetes_namespace,
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
)
.await
.err_context("Error while publishing node to kubernetes")
}
/// Save network configuration to disc
async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
@ -470,6 +518,11 @@ impl System {
_ => None,
};
let kubernetes_config = match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
(Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
_ => None,
};
while !*stop_signal.borrow() {
let not_configured = !self.ring.borrow().layout.check();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
@ -503,6 +556,28 @@ impl System {
}
}
// Fetch peer list from Kubernetes
if let Some((kubernetes_service_name, kubernetes_namespace)) = &kubernetes_config {
if !self.kubernetes_skip_crd {
match create_kubernetes_crd().await {
Ok(()) => (),
Err(e) => {
error!("Failed to create kubernetes custom resource: {}", e)
}
};
}
match get_kubernetes_nodes(kubernetes_service_name, kubernetes_namespace).await
{
Ok(node_list) => {
ping_list.extend(node_list);
}
Err(e) => {
warn!("Could not retrieve node list from Kubernetes: {}", e);
}
}
}
for (node_id, node_addr) in ping_list {
tokio::spawn(
self.netapp
@ -518,6 +593,8 @@ impl System {
}
self.background.spawn(self.clone().advertise_to_consul());
self.background
.spawn(self.clone().advertise_to_kubernetes());
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {

View file

@ -1,6 +1,6 @@
[package]
name = "garage_table"
version = "0.6.0"
version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@ -14,13 +14,15 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
garage_rpc = { version = "0.6.0", path = "../rpc" }
garage_util = { version = "0.6.0", path = "../util" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_util = { version = "0.7.0", path = "../util" }
opentelemetry = "0.17"
async-trait = "0.1.7"
bytes = "1.0"
hexdump = "0.1"
log = "0.4"
tracing = "0.1.30"
rand = "0.8"
sled = "0.34"

View file

@ -1,18 +1,19 @@
use core::borrow::Borrow;
use std::sync::Arc;
use log::warn;
use serde_bytes::ByteBuf;
use sled::Transactional;
use tokio::sync::Notify;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::sled_counter::SledCountedTree;
use garage_rpc::system::System;
use crate::crdt::Crdt;
use crate::gc::GcTodoEntry;
use crate::metrics::*;
use crate::replication::*;
use crate::schema::*;
@ -27,7 +28,9 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_tree: sled::Tree,
pub(crate) merkle_todo: sled::Tree,
pub(crate) merkle_todo_notify: Notify,
pub(crate) gc_todo: sled::Tree,
pub(crate) gc_todo: SledCountedTree,
pub(crate) metrics: TableMetrics,
}
impl<F, R> TableData<F, R>
@ -50,6 +53,9 @@ where
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open DB tree");
let gc_todo = SledCountedTree::new(gc_todo);
let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone());
Arc::new(Self {
system,
@ -60,6 +66,7 @@ where
merkle_todo,
merkle_todo_notify: Notify::new(),
gc_todo,
metrics,
})
}
@ -165,6 +172,8 @@ where
})?;
if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1);
let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry, Some(new_entry));
self.merkle_todo_notify.notify_one();
@ -199,6 +208,8 @@ where
})?;
if removed {
self.metrics.internal_delete_counter.add(1);
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None);
self.merkle_todo_notify.notify_one();

View file

@ -14,6 +14,7 @@ use tokio::sync::watch;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*;
use garage_rpc::system::System;
@ -362,7 +363,7 @@ impl GcTodoEntry {
}
/// Saves the GcTodoEntry in the gc_todo tree
pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
pub(crate) fn save(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
@ -372,7 +373,7 @@ impl GcTodoEntry {
/// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &SledCountedTree) -> Result<(), Error> {
let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
&self.todo_table_key()[..],
Some(self.value_hash),

View file

@ -2,8 +2,9 @@
#![allow(clippy::comparison_chain)]
#[macro_use]
extern crate log;
extern crate tracing;
mod metrics;
pub mod schema;
pub mod util;

View file

@ -3,7 +3,6 @@ use std::time::Duration;
use futures::select;
use futures_util::future::*;
use log::{debug, warn};
use serde::{Deserialize, Serialize};
use sled::transaction::{
ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,

96
src/table/metrics.rs Normal file
View file

@ -0,0 +1,96 @@
use opentelemetry::{global, metrics::*, KeyValue};
use garage_util::sled_counter::SledCountedTree;
/// TableMetrics reference all counter used for metrics
pub struct TableMetrics {
pub(crate) _merkle_todo_len: ValueObserver<u64>,
pub(crate) _gc_todo_len: ValueObserver<u64>,
pub(crate) get_request_counter: BoundCounter<u64>,
pub(crate) get_request_duration: BoundValueRecorder<f64>,
pub(crate) put_request_counter: BoundCounter<u64>,
pub(crate) put_request_duration: BoundValueRecorder<f64>,
pub(crate) internal_update_counter: BoundCounter<u64>,
pub(crate) internal_delete_counter: BoundCounter<u64>,
pub(crate) sync_items_sent: Counter<u64>,
pub(crate) sync_items_received: Counter<u64>,
}
impl TableMetrics {
pub fn new(
table_name: &'static str,
merkle_todo: sled::Tree,
gc_todo: SledCountedTree,
) -> Self {
let meter = global::meter(table_name);
TableMetrics {
_merkle_todo_len: meter
.u64_value_observer(
"table.merkle_updater_todo_queue_length",
move |observer| {
observer.observe(
merkle_todo.len() as u64,
&[KeyValue::new("table_name", table_name)],
)
},
)
.with_description("Merkle tree updater TODO queue length")
.init(),
_gc_todo_len: meter
.u64_value_observer(
"table.gc_todo_queue_length",
move |observer| {
observer.observe(
gc_todo.len() as u64,
&[KeyValue::new("table_name", table_name)],
)
},
)
.with_description("Table garbage collector TODO queue length")
.init(),
get_request_counter: meter
.u64_counter("table.get_request_counter")
.with_description("Number of get/get_range requests internally made on this table")
.init()
.bind(&[KeyValue::new("table_name", table_name)]),
get_request_duration: meter
.f64_value_recorder("table.get_request_duration")
.with_description("Duration of get/get_range requests internally made on this table, in seconds")
.init()
.bind(&[KeyValue::new("table_name", table_name)]),
put_request_counter: meter
.u64_counter("table.put_request_counter")
.with_description("Number of insert/insert_many requests internally made on this table")
.init()
.bind(&[KeyValue::new("table_name", table_name)]),
put_request_duration: meter
.f64_value_recorder("table.put_request_duration")
.with_description("Duration of insert/insert_many requests internally made on this table, in seconds")
.init()
.bind(&[KeyValue::new("table_name", table_name)]),
internal_update_counter: meter
.u64_counter("table.internal_update_counter")
.with_description("Number of value updates where the value actually changes (includes creation of new key and update of existing key)")
.init()
.bind(&[KeyValue::new("table_name", table_name)]),
internal_delete_counter: meter
.u64_counter("table.internal_delete_counter")
.with_description("Number of value deletions in the tree (due to GC or repartitioning)")
.init()
.bind(&[KeyValue::new("table_name", table_name)]),
sync_items_sent: meter
.u64_counter("table.sync_items_sent")
.with_description("Number of data items sent to other nodes during resync procedures")
.init(),
sync_items_received: meter
.u64_counter("table.sync_items_received")
.with_description("Number of data items received from other nodes during resync procedures")
.init(),
}
}
}

View file

@ -6,6 +6,7 @@ use async_trait::async_trait;
use futures::select;
use futures_util::future::*;
use futures_util::stream::*;
use opentelemetry::KeyValue;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@ -312,6 +313,16 @@ where
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
for to in nodes.iter() {
self.data.metrics.sync_items_sent.add(
values.len() as u64,
&[
KeyValue::new("table_name", F::TABLE_NAME),
KeyValue::new("to", format!("{:?}", to)),
],
);
}
self.system
.rpc
.try_call_many(
@ -500,6 +511,14 @@ where
.map(|x| Arc::new(ByteBuf::from(x)))
.collect::<Vec<_>>();
self.data.metrics.sync_items_sent.add(
values.len() as u64,
&[
KeyValue::new("table_name", F::TABLE_NAME),
KeyValue::new("to", format!("{:?}", who)),
],
);
let rpc_resp = self
.system
.rpc
@ -527,7 +546,7 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> Result<SyncRpc, Error> {
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
@ -539,6 +558,17 @@ where
Ok(SyncRpc::Node(k.clone(), node))
}
SyncRpc::Items(items) => {
self.data.metrics.sync_items_received.add(
items.len() as u64,
&[
KeyValue::new("table_name", F::TABLE_NAME),
KeyValue::new(
"from",
format!("{:?}", Uuid::try_from(from.as_ref()).unwrap()),
),
],
);
self.data.update_many(items)?;
Ok(SyncRpc::Ok)
}

View file

@ -7,8 +7,14 @@ use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
Context,
};
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
use garage_rpc::system::System;
use garage_rpc::*;
@ -81,6 +87,20 @@ where
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert", F::TABLE_NAME));
self.insert_internal(e)
.bound_record_duration(&self.data.metrics.put_request_duration)
.with_context(Context::current_with_span(span))
.await?;
self.data.metrics.put_request_counter.add(1);
Ok(())
}
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
@ -99,10 +119,25 @@ where
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
Ok(())
}
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len()));
self.insert_many_internal(entries)
.bound_record_duration(&self.data.metrics.put_request_duration)
.with_context(Context::current_with_span(span))
.await?;
self.data.metrics.put_request_counter.add(1);
Ok(())
}
async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> {
let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
for entry in entries.iter() {
@ -148,10 +183,28 @@ where
self: &Arc<Self>,
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} get", F::TABLE_NAME));
let res = self
.get_internal(partition_key, sort_key)
.bound_record_duration(&self.data.metrics.get_request_duration)
.with_context(Context::current_with_span(span))
.await?;
self.data.metrics.get_request_counter.add(1);
Ok(res)
}
async fn get_internal(
self: &Arc<Self>,
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
//eprintln!("get who: {:?}", who);
let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
@ -198,6 +251,7 @@ where
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
}
}
Ok(ret)
}
@ -207,6 +261,27 @@ where
begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} get_range", F::TABLE_NAME));
let res = self
.get_range_internal(partition_key, begin_sort_key, filter, limit)
.bound_record_duration(&self.data.metrics.get_request_duration)
.with_context(Context::current_with_span(span))
.await?;
self.data.metrics.get_request_counter.add(1);
Ok(res)
}
async fn get_range_internal(
self: &Arc<Self>,
partition_key: &F::P,
begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);

View file

@ -1,6 +1,6 @@
[package]
name = "garage_util"
version = "0.6.0"
version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@ -18,7 +18,7 @@ blake2 = "0.9"
err-derive = "0.3"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4"
log = "0.4"
tracing = "0.1.30"
rand = "0.8"
sha2 = "0.9"
@ -34,7 +34,13 @@ futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
netapp = "0.3.0"
#netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4.1"
http = "0.2"
hyper = "0.14"
kube = { version = "0.62", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.13", features = ["v1_22"] }
opentelemetry = "0.17"

View file

@ -52,6 +52,13 @@ pub struct Config {
pub consul_host: Option<String>,
/// Consul service name to use
pub consul_service_name: Option<String>,
/// Kubernetes namespace the service discovery resources are be created in
pub kubernetes_namespace: Option<String>,
/// Service name to filter for in k8s custom resources
pub kubernetes_service_name: Option<String>,
/// Skip creation of the garagenodes CRD
#[serde(default)]
pub kubernetes_skip_crd: bool,
/// Sled cache size, in bytes
#[serde(default = "default_sled_cache_capacity")]
@ -66,6 +73,10 @@ pub struct Config {
/// Configuration for serving files as normal web server
pub s3_web: WebConfig,
/// Configuration for the admin API endpoint
#[serde(default = "Default::default")]
pub admin: AdminConfig,
}
/// Configuration for S3 api
@ -89,6 +100,15 @@ pub struct WebConfig {
pub root_domain: String,
}
/// Configuration for the admin and monitoring HTTP API
#[derive(Deserialize, Debug, Clone, Default)]
pub struct AdminConfig {
/// Address and port to bind for admin API serving
pub api_bind_addr: Option<SocketAddr>,
/// OTLP server to where to export traces
pub trace_sink: Option<String>,
}
fn default_sled_cache_capacity() -> u64 {
128 * 1024 * 1024
}

View file

@ -22,7 +22,7 @@ impl std::convert::AsRef<[u8]> for FixedBytes32 {
impl fmt::Debug for FixedBytes32 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", hex::encode(&self.0[..8]))
write!(f, "{}", hex::encode(&self.0[..8]))
}
}

View file

@ -23,6 +23,9 @@ pub enum Error {
#[error(display = "Invalid HTTP header value: {}", _0)]
HttpHeader(#[error(source)] http::header::ToStrError),
#[error(display = "kubernetes error: {}", _0)]
Kubernetes(#[error(source)] kube::Error),
#[error(display = "Netapp error: {}", _0)]
Netapp(#[error(source)] netapp::error::Error),

View file

@ -1,14 +1,16 @@
//! Crate containing common functions and types used in Garage
#[macro_use]
extern crate log;
extern crate tracing;
pub mod background;
pub mod config;
pub mod crdt;
pub mod data;
pub mod error;
pub mod metrics;
pub mod persister;
pub mod sled_counter;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;

57
src/util/metrics.rs Normal file
View file

@ -0,0 +1,57 @@
use std::time::SystemTime;
use futures::{future::BoxFuture, Future, FutureExt};
use rand::Rng;
use opentelemetry::{metrics::*, trace::TraceId, KeyValue};
pub trait RecordDuration<'a>: 'a {
type Output;
fn record_duration(
self,
r: &'a ValueRecorder<f64>,
attributes: &'a [KeyValue],
) -> BoxFuture<'a, Self::Output>;
fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output>;
}
impl<'a, T, O> RecordDuration<'a> for T
where
T: Future<Output = O> + Send + 'a,
{
type Output = O;
fn record_duration(
self,
r: &'a ValueRecorder<f64>,
attributes: &'a [KeyValue],
) -> BoxFuture<'a, Self::Output> {
async move {
let request_start = SystemTime::now();
let res = self.await;
r.record(
request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()),
attributes,
);
res
}
.boxed()
}
fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output> {
async move {
let request_start = SystemTime::now();
let res = self.await;
r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
res
}
.boxed()
}
}
// ----
pub fn gen_trace_id() -> TraceId {
rand::thread_rng().gen::<[u8; 16]>().into()
}

100
src/util/sled_counter.rs Normal file
View file

@ -0,0 +1,100 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use sled::{CompareAndSwapError, IVec, Iter, Result, Tree};
#[derive(Clone)]
pub struct SledCountedTree(Arc<SledCountedTreeInternal>);
struct SledCountedTreeInternal {
tree: Tree,
len: AtomicUsize,
}
impl SledCountedTree {
pub fn new(tree: Tree) -> Self {
let len = tree.len();
Self(Arc::new(SledCountedTreeInternal {
tree,
len: AtomicUsize::new(len),
}))
}
pub fn len(&self) -> usize {
self.0.len.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.0.tree.is_empty()
}
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
self.0.tree.get(key)
}
pub fn iter(&self) -> Iter {
self.0.tree.iter()
}
// ---- writing functions ----
pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
where
K: AsRef<[u8]>,
V: Into<IVec>,
{
let res = self.0.tree.insert(key, value);
if res == Ok(None) {
self.0.len.fetch_add(1, Ordering::Relaxed);
}
res
}
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
let res = self.0.tree.remove(key);
if matches!(res, Ok(Some(_))) {
self.0.len.fetch_sub(1, Ordering::Relaxed);
}
res
}
pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
let res = self.0.tree.pop_min();
if let Ok(Some(_)) = &res {
self.0.len.fetch_sub(1, Ordering::Relaxed);
};
res
}
pub fn compare_and_swap<K, OV, NV>(
&self,
key: K,
old: Option<OV>,
new: Option<NV>,
) -> Result<std::result::Result<(), CompareAndSwapError>>
where
K: AsRef<[u8]>,
OV: AsRef<[u8]>,
NV: Into<IVec>,
{
let old_some = old.is_some();
let new_some = new.is_some();
let res = self.0.tree.compare_and_swap(key, old, new);
if res == Ok(Ok(())) {
match (old_some, new_some) {
(false, true) => {
self.0.len.fetch_add(1, Ordering::Relaxed);
}
(true, false) => {
self.0.len.fetch_sub(1, Ordering::Relaxed);
}
_ => (),
}
}
res
}
}

View file

@ -1,6 +1,6 @@
[package]
name = "garage_web"
version = "0.6.0"
version = "0.7.0"
authors = ["Alex Auvolat <alex@adnab.me>", "Quentin Dufour <quentin@dufour.io>"]
edition = "2018"
license = "AGPL-3.0"
@ -14,16 +14,18 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
garage_api = { version = "0.6.0", path = "../api" }
garage_model = { version = "0.6.0", path = "../model" }
garage_util = { version = "0.6.0", path = "../util" }
garage_table = { version = "0.6.0", path = "../table" }
garage_api = { version = "0.7.0", path = "../api" }
garage_model = { version = "0.7.0", path = "../model" }
garage_util = { version = "0.7.0", path = "../util" }
garage_table = { version = "0.7.0", path = "../table" }
err-derive = "0.3"
log = "0.4"
tracing = "0.1.30"
percent-encoding = "2.1.0"
futures = "0.3"
http = "0.2"
hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
opentelemetry = "0.17"

View file

@ -1,6 +1,6 @@
//! Crate for handling web serving of s3 bucket
#[macro_use]
extern crate log;
extern crate tracing;
mod error;
pub use error::Error;

View file

@ -9,17 +9,51 @@ use hyper::{
Body, Method, Request, Response, Server,
};
use opentelemetry::{
global,
metrics::{Counter, ValueRecorder},
trace::{FutureExt, TraceContextExt, Tracer},
Context, KeyValue,
};
use crate::error::*;
use garage_api::error::{Error as ApiError, OkOrBadRequest, OkOrInternalError};
use garage_api::helpers::{authority_to_host, host_to_bucket};
use garage_api::s3_cors::{add_cors_headers, find_matching_cors_rule, handle_options};
use garage_api::s3_cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket};
use garage_api::s3_get::{handle_get, handle_head};
use garage_model::garage::Garage;
use garage_table::*;
use garage_util::error::Error as GarageError;
use garage_util::metrics::{gen_trace_id, RecordDuration};
struct WebMetrics {
request_counter: Counter<u64>,
error_counter: Counter<u64>,
request_duration: ValueRecorder<f64>,
}
impl WebMetrics {
fn new() -> Self {
let meter = global::meter("garage/web");
Self {
request_counter: meter
.u64_counter("web.request_counter")
.with_description("Number of requests to the web endpoint")
.init(),
error_counter: meter
.u64_counter("web.error_counter")
.with_description("Number of requests to the web endpoint resulting in errors")
.init(),
request_duration: meter
.f64_value_recorder("web.request_duration")
.with_description("Duration of requests to the web endpoint")
.init(),
}
}
}
/// Run a web server
pub async fn run_web_server(
@ -28,13 +62,19 @@ pub async fn run_web_server(
) -> Result<(), GarageError> {
let addr = &garage.config.s3_web.bind_addr;
let metrics = Arc::new(WebMetrics::new());
let service = make_service_fn(|conn: &AddrStream| {
let garage = garage.clone();
let metrics = metrics.clone();
let client_addr = conn.remote_addr();
async move {
Ok::<_, Error>(service_fn(move |req: Request<Body>| {
let garage = garage.clone();
handle_request(garage, req, client_addr)
let metrics = metrics.clone();
handle_request(garage, metrics, req, client_addr)
}))
}
});
@ -49,23 +89,55 @@ pub async fn run_web_server(
async fn handle_request(
garage: Arc<Garage>,
metrics: Arc<WebMetrics>,
req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, Infallible> {
info!("{} {} {}", addr, req.method(), req.uri());
match serve_file(garage, &req).await {
// Lots of instrumentation
let tracer = opentelemetry::global::tracer("garage");
let span = tracer
.span_builder(format!("Web {} request", req.method()))
.with_trace_id(gen_trace_id())
.with_attributes(vec![
KeyValue::new("method", format!("{}", req.method())),
KeyValue::new("uri", req.uri().to_string()),
])
.start(&tracer);
let metrics_tags = &[KeyValue::new("method", req.method().to_string())];
// The actual handler
let res = serve_file(garage, &req)
.with_context(Context::current_with_span(span))
.record_duration(&metrics.request_duration, &metrics_tags[..])
.await;
// More instrumentation
metrics.request_counter.add(1, &metrics_tags[..]);
// Returning the result
match res {
Ok(res) => {
debug!("{} {} {}", req.method(), req.uri(), res.status());
debug!("{} {} {}", req.method(), res.status(), req.uri());
Ok(res)
}
Err(error) => {
info!(
"{} {} {} {}",
req.method(),
req.uri(),
error.http_status_code(),
req.uri(),
error
);
metrics.error_counter.add(
1,
&[
metrics_tags[0].clone(),
KeyValue::new("status_code", error.http_status_code().to_string()),
],
);
Ok(error_to_res(error))
}
}
@ -103,8 +175,7 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
.bucket_alias_table
.get(&EmptyKey, &bucket_name.to_string())
.await?
.map(|x| x.state.take())
.flatten()
.and_then(|x| x.state.take())
.ok_or(Error::NotFound)?;
// Check bucket isn't deleted and has website access enabled
@ -133,7 +204,7 @@ async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response
);
let ret_doc = match *req.method() {
Method::OPTIONS => handle_options(req, &bucket).await,
Method::OPTIONS => handle_options_for_bucket(req, &bucket),
Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await,
Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await,
_ => Err(ApiError::BadRequest("HTTP method not supported".into())),