Garage v0.9 #473

Merged
lx merged 175 commits from next into main 2023-10-10 13:28:29 +00:00
40 changed files with 3593 additions and 2600 deletions
Showing only changes of commit 28d7a49f63 - Show all commits

View file

@ -19,9 +19,11 @@ steps:
- name: unit + func tests
image: nixpkgs/nix:nixos-22.05
environment:
GARAGE_TEST_INTEGRATION_EXE: result/bin/garage
GARAGE_TEST_INTEGRATION_EXE: result-bin/bin/garage
commands:
- nix-build --no-build-output --attr clippy.amd64 --argstr git_version ${DRONE_TAG:-$DRONE_COMMIT}
- nix-build --no-build-output --attr test.amd64
- ./result/bin/garage_db-*
- ./result/bin/garage_api-*
- ./result/bin/garage_model-*
- ./result/bin/garage_rpc-*
@ -30,6 +32,7 @@ steps:
- ./result/bin/garage_web-*
- ./result/bin/garage-*
- ./result/bin/integration-*
- rm result
- name: integration tests
image: nixpkgs/nix:nixos-22.05
@ -58,7 +61,7 @@ steps:
image: nixpkgs/nix:nixos-22.05
commands:
- nix-build --no-build-output --attr pkgs.amd64.release --argstr git_version ${DRONE_TAG:-$DRONE_COMMIT}
- nix-shell --attr rust --run "./script/not-dynamic.sh result/bin/garage"
- nix-shell --attr rust --run "./script/not-dynamic.sh result-bin/bin/garage"
- name: integration
image: nixpkgs/nix:nixos-22.05
@ -109,7 +112,7 @@ steps:
image: nixpkgs/nix:nixos-22.05
commands:
- nix-build --no-build-output --attr pkgs.i386.release --argstr git_version ${DRONE_TAG:-$DRONE_COMMIT}
- nix-shell --attr rust --run "./script/not-dynamic.sh result/bin/garage"
- nix-shell --attr rust --run "./script/not-dynamic.sh result-bin/bin/garage"
- name: integration
image: nixpkgs/nix:nixos-22.05
@ -159,7 +162,7 @@ steps:
image: nixpkgs/nix:nixos-22.05
commands:
- nix-build --no-build-output --attr pkgs.arm64.release --argstr git_version ${DRONE_TAG:-$DRONE_COMMIT}
- nix-shell --attr rust --run "./script/not-dynamic.sh result/bin/garage"
- nix-shell --attr rust --run "./script/not-dynamic.sh result-bin/bin/garage"
- name: push static binary
image: nixpkgs/nix:nixos-22.05
@ -204,7 +207,7 @@ steps:
image: nixpkgs/nix:nixos-22.05
commands:
- nix-build --no-build-output --attr pkgs.arm.release --argstr git_version ${DRONE_TAG:-$DRONE_COMMIT}
- nix-shell --attr rust --run "./script/not-dynamic.sh result/bin/garage"
- nix-shell --attr rust --run "./script/not-dynamic.sh result-bin/bin/garage"
- name: push static binary
image: nixpkgs/nix:nixos-22.05
@ -280,6 +283,6 @@ trigger:
---
kind: signature
hmac: 103a04785c98f5376a63ce22865c2576963019bbc4d828f200d2a470a3c821ea
hmac: ac09a5a8c82502f67271f93afa1e1e21ce66383b8e24a6deb26b285cc1c378ba
...

747
Cargo.lock generated

File diff suppressed because it is too large Load diff

4153
Cargo.nix

File diff suppressed because it is too large Load diff

View file

@ -3,5 +3,5 @@ FROM scratch
ENV RUST_BACKTRACE=1
ENV RUST_LOG=garage=info
COPY result/bin/garage /
COPY result-bin/bin/garage /
CMD [ "/garage", "server"]

View file

@ -5,13 +5,26 @@
with import ./nix/common.nix;
let
let
pkgs = import pkgsSrc { };
compile = import ./nix/compile.nix;
build_debug_and_release = (target: {
debug = (compile { inherit target git_version; release = false; }).workspace.garage { compileMode = "build"; };
release = (compile { inherit target git_version; release = true; }).workspace.garage { compileMode = "build"; };
debug = (compile {
inherit target git_version;
release = false;
}).workspace.garage {
compileMode = "build";
};
release = (compile {
inherit target git_version;
release = true;
}).workspace.garage {
compileMode = "build";
};
});
test = (rustPkgs: pkgs.symlinkJoin {
name ="garage-tests";
paths = builtins.map (key: rustPkgs.workspace.${key} { compileMode = "test"; }) (builtins.attrNames rustPkgs.workspace);
@ -25,9 +38,25 @@ in {
arm = build_debug_and_release "armv6l-unknown-linux-musleabihf";
};
test = {
amd64 = test (compile { inherit git_version; target = "x86_64-unknown-linux-musl"; });
amd64 = test (compile {
inherit git_version;
target = "x86_64-unknown-linux-musl";
features = [
"garage/bundled-libs"
"garage/k2v"
"garage/sled"
"garage/lmdb"
"garage/sqlite"
];
});
};
clippy = {
amd64 = (compile { inherit git_version; compiler = "clippy"; }).workspace.garage { compileMode = "build"; } ;
amd64 = (compile {
inherit git_version;
target = "x86_64-unknown-linux-musl";
compiler = "clippy";
}).workspace.garage {
compileMode = "build";
};
};
}

View file

@ -9,7 +9,7 @@ In this section, we cover the following web applications:
|------|--------|------|
| [Nextcloud](#nextcloud) | ✅ | Both Primary Storage and External Storage are supported |
| [Peertube](#peertube) | ✅ | Must be configured with the website endpoint |
| [Mastodon](#mastodon) | ❓ | Not yet tested |
| [Mastodon](#mastodon) | ✅ | Natively supported |
| [Matrix](#matrix) | ✅ | Tested with `synapse-s3-storage-provider` |
| [Pixelfed](#pixelfed) | ❓ | Not yet tested |
| [Pleroma](#pleroma) | ❓ | Not yet tested |
@ -224,7 +224,135 @@ You can now reload the page and see in your browser console that data are fetche
## Mastodon
https://docs.joinmastodon.org/admin/config/#cdn
Mastodon natively supports the S3 protocol to store media files, and it works out-of-the-box with Garage.
You will need to expose your Garage bucket as a website: that way, media files will be served directly from Garage.
### Performance considerations
Mastodon tends to store many small objects over time: expect hundreds of thousands of objects,
with average object size ranging from 50 KB to 150 KB.
As such, your Garage cluster should be configured appropriately for good performance:
- use Garage v0.8.0 or higher with the [LMDB database engine](@documentation/reference-manual/configuration.md#db-engine-since-v0-8-0).
With the default Sled database engine, your database could quickly end up taking tens of GB of disk space.
- the Garage database should be stored on a SSD
### Creating your bucket
This is the usual Garage setup:
```bash
garage key new --name mastodon-key
garage bucket create mastodon-data
garage bucket allow mastodon-data --read --write --key mastodon-key
```
Note the Key ID and Secret Key.
### Exposing your bucket as a website
Create a DNS name to serve your media files, such as `my-social-media.mydomain.tld`.
This name will be publicly exposed to the users of your Mastodon instance: they
will load images directly from this DNS name.
As [documented here](@/documentation/cookbook/exposing-websites.md),
add this DNS name as alias to your bucket, and expose it as a website:
```bash
garage bucket alias mastodon-data my-social-media.mydomain.tld
garage bucket website --allow mastodon-data
```
Then you will likely need to [setup a reverse proxy](@/documentation/cookbook/reverse-proxy.md)
in front of it to serve your media files over HTTPS.
### Cleaning up old media files before migration
Mastodon instance quickly accumulate a lot of media files from the federation.
Most of them are not strictly necessary because they can be fetched again from
other servers. As such, it is highly recommended to clean them up before
migration, this will greatly reduce the migration time.
From the [official Mastodon documentation](https://docs.joinmastodon.org/admin/tootctl/#media):
```bash
$ RAILS_ENV=production bin/tootctl media remove --days 3
$ RAILS_ENV=production bin/tootctl media remove-orphans
$ RAILS_ENV=production bin/tootctl preview_cards remove --days 15
```
Here is a typical disk usage for a small but multi-year instance after cleanup:
```bash
$ RAILS_ENV=production bin/tootctl media usage
Attachments: 5.67 GB (1.14 GB local)
Custom emoji: 295 MB (0 Bytes local)
Preview cards: 154 MB
Avatars: 3.77 GB (127 KB local)
Headers: 8.72 GB (242 KB local)
Backups: 0 Bytes
Imports: 1.7 KB
Settings: 0 Bytes
```
Unfortunately, [old avatars and headers cannot currently be cleaned up](https://github.com/mastodon/mastodon/issues/9567).
### Migrating your data
Data migration should be done with an efficient S3 client.
The [minio client](@documentation/connect/cli.md#minio-client) is a good choice
thanks to its mirror mode:
```bash
mc mirror ./public/system/ garage/mastodon-data
```
Here is a typical bucket usage after all data has been migrated:
```bash
$ garage bucket info mastodon-data
Size: 20.3 GiB (21.8 GB)
Objects: 175968
```
### Configuring Mastodon
In your `.env.production` configuration file:
```bash
S3_ENABLED=true
# Internal access to Garage
S3_ENDPOINT=http://my-garage-instance.mydomain.tld:3900
S3_REGION=garage
S3_BUCKET=mastodon-data
# Change this (Key ID and Secret Key of your Garage key)
AWS_ACCESS_KEY_ID=GKe88df__CHANGETHIS__c5145
AWS_SECRET_ACCESS_KEY=a2f7__CHANGETHIS__77fcfcf7a58f47a4aa4431f2e675c56da37821a1070000
# What name gets exposed to users (HTTPS is implicit)
S3_ALIAS_HOST=my-social-media.mydomain.tld
```
For more details, see the [reference Mastodon documentation](https://docs.joinmastodon.org/admin/config/#cdn).
Restart all Mastodon services and everything should now be using Garage!
You can check the URLs of images in the Mastodon web client, they should start
with `https://my-social-media.mydomain.tld`.
### Last migration sync
After Mastodon is successfully using Garage, you can run a last sync from the local filesystem to Garage:
```bash
mc mirror --newer-than "3h" ./public/system/ garage/mastodon-data
```
### References
[cybrespace's guide to migrate to S3](https://github.com/cybrespace/cybrespace-meta/blob/master/s3.md)
(the guide is for Amazon S3, so the configuration is a bit different, but the rest is similar)
## Matrix

View file

@ -13,6 +13,9 @@ db_engine = "lmdb"
block_size = 1048576
sled_cache_capacity = 134217728
sled_flush_every_ms = 2000
replication_mode = "3"
compression_level = 1
@ -28,15 +31,20 @@ bootstrap_peers = [
"212fd62eeaca72c122b45a7f4fa0f55e012aa5e24ac384a72a3016413fa724ff@[fc00:F::1]:3901",
]
consul_host = "consul.service"
consul_service_name = "garage-daemon"
kubernetes_namespace = "garage"
kubernetes_service_name = "garage-daemon"
kubernetes_skip_crd = false
[consul_discovery]
consul_http_addr = "http://127.0.0.1:8500"
service_name = "garage-daemon"
ca_cert = "/etc/consul/consul-ca.crt"
client_cert = "/etc/consul/consul-client.crt"
client_key = "/etc/consul/consul-key.crt"
tls_skip_verify = false
[kubernetes_discovery]
namespace = "garage"
service_name = "garage-daemon"
skip_crd = false
sled_cache_capacity = 134217728
sled_flush_every_ms = 2000
[s3_api]
api_bind_addr = "[::]:3900"
@ -129,6 +137,21 @@ files will remain available. This however means that chunks from existing files
will not be deduplicated with chunks from newly uploaded files, meaning you
might use more storage space that is optimally possible.
### `sled_cache_capacity`
This parameter can be used to tune the capacity of the cache used by
[sled](https://sled.rs), the database Garage uses internally to store metadata.
Tune this to fit the RAM you wish to make available to your Garage instance.
This value has a conservative default (128MB) so that Garage doesn't use too much
RAM by default, but feel free to increase this for higher performance.
### `sled_flush_every_ms`
This parameters can be used to tune the flushing interval of sled.
Increase this if sled is thrashing your SSD, at the risk of losing more data in case
of a power outage (though this should not matter much as data is replicated on other
nodes). The default value, 2000ms, should be appropriate for most use cases.
### `replication_mode`
Garage supports the following replication modes:
@ -276,48 +299,58 @@ be obtained by running `garage node id` and then included directly in the
key will be returned by `garage node id` and you will have to add the IP
yourself.
### `consul_host` and `consul_service_name`
## The `[consul_discovery]` section
Garage supports discovering other nodes of the cluster using Consul. For this
to work correctly, nodes need to know their IP address by which they can be
reached by other nodes of the cluster, which should be set in `rpc_public_addr`.
The `consul_host` parameter should be set to the hostname of the Consul server,
and `consul_service_name` should be set to the service name under which Garage's
### `consul_http_addr` and `service_name`
The `consul_http_addr` parameter should be set to the full HTTP(S) address of the Consul server.
### `service_name`
`service_name` should be set to the service name under which Garage's
RPC ports are announced.
Garage does not yet support talking to Consul over TLS.
### `client_cert`, `client_key`
### `kubernetes_namespace`, `kubernetes_service_name` and `kubernetes_skip_crd`
TLS client certificate and client key to use when communicating with Consul over TLS. Both are mandatory when doing so.
### `ca_cert`
TLS CA certificate to use when communicating with Consul over TLS.
### `tls_skip_verify`
Skip server hostname verification in TLS handshake.
`ca_cert` is ignored when this is set.
## The `[kubernetes_discovery]` section
Garage supports discovering other nodes of the cluster using kubernetes custom
resources. For this to work `kubernetes_namespace` and `kubernetes_service_name`
need to be configured.
resources. For this to work, a `[kubernetes_discovery]` section must be present
with at least the `namespace` and `service_name` parameters.
`kubernetes_namespace` sets the namespace in which the custom resources are
configured. `kubernetes_service_name` is added as a label to these resources to
### `namespace`
`namespace` sets the namespace in which the custom resources are
configured.
### `service_name`
`service_name` is added as a label to the advertised resources to
filter them, to allow for multiple deployments in a single namespace.
`kubernetes_skip_crd` can be set to true to disable the automatic creation and
### `skip_crd`
`skip_crd` can be set to true to disable the automatic creation and
patching of the `garagenodes.deuxfleurs.fr` CRD. You will need to create the CRD
manually.
### `sled_cache_capacity`
This parameter can be used to tune the capacity of the cache used by
[sled](https://sled.rs), the database Garage uses internally to store metadata.
Tune this to fit the RAM you wish to make available to your Garage instance.
This value has a conservative default (128MB) so that Garage doesn't use too much
RAM by default, but feel free to increase this for higher performance.
### `sled_flush_every_ms`
This parameters can be used to tune the flushing interval of sled.
Increase this if sled is thrashing your SSD, at the risk of losing more data in case
of a power outage (though this should not matter much as data is replicated on other
nodes). The default value, 2000ms, should be appropriate for most use cases.
## The `[s3_api]` section

View file

@ -106,7 +106,7 @@ to be manually connected to one another.
### Support for changing IP addresses
As long as all of your nodes don't thange their IP address at the same time,
As long as all of your nodes don't change their IP address at the same time,
Garage should be able to tolerate nodes with changing/dynamic IP addresses,
as nodes will regularly exchange the IP addresses of their peers and try to
reconnect using newer addresses when existing connections are broken.

View file

@ -206,8 +206,8 @@ and responses need to be translated.
Query parameters:
| name | default value | meaning |
| - | - | - |
| name | default value | meaning |
|------------|---------------|----------------------------------|
| `sort_key` | **mandatory** | The sort key of the item to read |
Returns the item with specified partition key and sort key. Values can be
@ -317,11 +317,11 @@ an HTTP 304 NOT MODIFIED is returned.
Query parameters:
| name | default value | meaning |
| - | - | - |
| `sort_key` | **mandatory** | The sort key of the item to read |
| `causality_token` | **mandatory** | The causality token of the last known value or set of values |
| `timeout` | 300 | The timeout before 304 NOT MODIFIED is returned if the value isn't updated |
| name | default value | meaning |
|-------------------|---------------|----------------------------------------------------------------------------|
| `sort_key` | **mandatory** | The sort key of the item to read |
| `causality_token` | **mandatory** | The causality token of the last known value or set of values |
| `timeout` | 300 | The timeout before 304 NOT MODIFIED is returned if the value isn't updated |
The timeout can be set to any number of seconds, with a maximum of 600 seconds (10 minutes).
@ -346,7 +346,7 @@ myblobblahblahblah
Example response:
```
HTTP/1.1 200 OK
HTTP/1.1 204 No Content
```
**DeleteItem: `DELETE /<bucket>/<partition key>?sort_key=<sort_key>`**
@ -382,13 +382,13 @@ as these values are asynchronously updated, and thus eventually consistent.
Query parameters:
| name | default value | meaning |
| - | - | - |
| `prefix` | `null` | Restrict listing to partition keys that start with this prefix |
| `start` | `null` | First partition key to list, in lexicographical order |
| `end` | `null` | Last partition key to list (excluded) |
| `limit` | `null` | Maximum number of partition keys to list |
| `reverse` | `false` | Iterate in reverse lexicographical order |
| name | default value | meaning |
|-----------|---------------|----------------------------------------------------------------|
| `prefix` | `null` | Restrict listing to partition keys that start with this prefix |
| `start` | `null` | First partition key to list, in lexicographical order |
| `end` | `null` | Last partition key to list (excluded) |
| `limit` | `null` | Maximum number of partition keys to list |
| `reverse` | `false` | Iterate in reverse lexicographical order |
The response consists in a JSON object that repeats the parameters of the query and gives the result (see below).
@ -512,7 +512,7 @@ POST /my_bucket HTTP/1.1
Example response:
```
HTTP/1.1 200 OK
HTTP/1.1 204 NO CONTENT
```
@ -525,17 +525,17 @@ The request body is a JSON list of searches, that each specify a range of
items to get (to get single items, set `singleItem` to `true`). A search is a
JSON struct with the following fields:
| name | default value | meaning |
| - | - | - |
| `partitionKey` | **mandatory** | The partition key in which to search |
| `prefix` | `null` | Restrict items to list to those whose sort keys start with this prefix |
| `start` | `null` | The sort key of the first item to read |
| `end` | `null` | The sort key of the last item to read (excluded) |
| `limit` | `null` | The maximum number of items to return |
| `reverse` | `false` | Iterate in reverse lexicographical order on sort keys |
| `singleItem` | `false` | Whether to return only the item with sort key `start` |
| `conflictsOnly` | `false` | Whether to return only items that have several concurrent values |
| `tombstones` | `false` | Whether or not to return tombstone lines to indicate the presence of old deleted items |
| name | default value | meaning |
|-----------------|---------------|----------------------------------------------------------------------------------------|
| `partitionKey` | **mandatory** | The partition key in which to search |
| `prefix` | `null` | Restrict items to list to those whose sort keys start with this prefix |
| `start` | `null` | The sort key of the first item to read |
| `end` | `null` | The sort key of the last item to read (excluded) |
| `limit` | `null` | The maximum number of items to return |
| `reverse` | `false` | Iterate in reverse lexicographical order on sort keys |
| `singleItem` | `false` | Whether to return only the item with sort key `start` |
| `conflictsOnly` | `false` | Whether to return only items that have several concurrent values |
| `tombstones` | `false` | Whether or not to return tombstone lines to indicate the presence of old deleted items |
For each of the searches, triplets are listed and returned separately. The
@ -683,7 +683,7 @@ POST /my_bucket?delete HTTP/1.1
Example response:
```
```json
HTTP/1.1 200 OK
[

View file

@ -3,20 +3,20 @@ rec {
* Fixed dependencies
*/
pkgsSrc = fetchTarball {
# As of 2021-10-04
url = "https://github.com/NixOS/nixpkgs/archive/b27d18a412b071f5d7991d1648cfe78ee7afe68a.tar.gz";
sha256 = "1xy9zpypqfxs5gcq5dcla4bfkhxmh5nzn9dyqkr03lqycm9wg5cr";
# As of 2022-10-13
url = "https://github.com/NixOS/nixpkgs/archive/a3073c49bc0163fea6a121c276f526837672b555.zip";
sha256 = "1bz632psfbpmicyzjb8b4265y50shylccvfm6ry6mgnv5hvz324s";
};
cargo2nixSrc = fetchGit {
# As of 2022-08-29, stacking two patches: superboum@dedup_propagate and Alexis211@fix_fetchcrategit
# As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection
url = "https://github.com/Alexis211/cargo2nix";
ref = "fix_fetchcrategit";
rev = "4b31c0cc05b6394916d46e9289f51263d81973b9";
ref = "custom_unstable";
rev = "a7a61179b66054904ef6a195d8da736eaaa06c36";
};
/*
* Shared objects
*/
cargo2nix = import cargo2nixSrc;
cargo2nixOverlay = import "${cargo2nixSrc}/overlay";
cargo2nixOverlay = cargo2nix.overlays.default;
}

View file

@ -1,9 +1,10 @@
{
system ? builtins.currentSystem,
target ? null,
target,
compiler ? "rustc",
release ? false,
git_version ? null,
features ? null,
}:
with import ./common.nix;
@ -12,71 +13,41 @@ let
log = v: builtins.trace v v;
pkgs = import pkgsSrc {
inherit system;
${ if target == null then null else "crossSystem" } = { config = target; };
inherit system;
crossSystem = {
config = target;
isStatic = true;
};
overlays = [ cargo2nixOverlay ];
};
/*
Rust and Nix triples are not the same. Cargo2nix has a dedicated library
to convert Nix triples to Rust ones. We need this conversion as we want to
set later options linked to our (rust) target in a generic way. Not only
the triple terminology is different, but also the "roles" are named differently.
Nix uses a build/host/target terminology where Nix's "host" maps to Cargo's "target".
*/
rustTarget = log (pkgs.rustBuilder.rustLib.rustTriple pkgs.stdenv.hostPlatform);
/*
Cargo2nix is built for rustOverlay which installs Rust from Mozilla releases.
We want our own Rust to avoid incompatibilities, like we had with musl 1.2.0.
rustc was built with musl < 1.2.0 and nix shipped musl >= 1.2.0 which lead to compilation breakage.
This is fine for 64-bit platforms, but for 32-bit platforms, we need our own Rust
to avoid incompatibilities with time_t between different versions of musl
(>= 1.2.0 shipped by NixOS, < 1.2.0 with which rustc was built), which lead to compilation breakage.
So we want a Rust release that is bound to our Nix repository to avoid these problems.
See here for more info: https://musl.libc.org/time64.html
Because Cargo2nix does not support the Rust environment shipped by NixOS,
we emulate the structure of the Rust object created by rustOverlay.
In practise, rustOverlay ships rustc+cargo in a single derivation while
NixOS ships them in separate ones. We reunite them with symlinkJoin.
*/
rustChannel = {
rustc = pkgs.symlinkJoin {
name = "rust-channel";
paths = [
pkgs.rustPlatform.rust.cargo
pkgs.rustPlatform.rust.rustc
];
*/
toolchainOptions =
if target == "x86_64-unknown-linux-musl" || target == "aarch64-unknown-linux-musl" then {
rustVersion = "1.63.0";
extraRustComponents = [ "clippy" ];
} else {
rustToolchain = pkgs.symlinkJoin {
name = "rust-static-toolchain-${target}";
paths = [
pkgs.rustPlatform.rust.cargo
pkgs.rustPlatform.rust.rustc
# clippy not needed, it only runs on amd64
];
};
};
clippy = pkgs.symlinkJoin {
name = "clippy-channel";
paths = [
pkgs.rustPlatform.rust.cargo
pkgs.rustPlatform.rust.rustc
pkgs.clippy
];
};
}.${compiler};
clippyBuilder = pkgs.writeScriptBin "clippy" ''
#!${pkgs.stdenv.shell}
. ${cargo2nixSrc + "/overlay/utils.sh"}
isBuildScript=
args=("$@")
for i in "''${!args[@]}"; do
if [ "xmetadata=" = "x''${args[$i]::9}" ]; then
args[$i]=metadata=$NIX_RUST_METADATA
elif [ "x--crate-name" = "x''${args[$i]}" ] && [ "xbuild_script_" = "x''${args[$i+1]::13}" ]; then
isBuildScript=1
fi
done
if [ "$isBuildScript" ]; then
args+=($NIX_RUST_BUILD_LINK_FLAGS)
else
args+=($NIX_RUST_LINK_FLAGS)
fi
touch invoke.log
echo "''${args[@]}" >>invoke.log
exec ${rustChannel}/bin/clippy-driver --deny warnings "''${args[@]}"
'';
buildEnv = (drv: {
rustc = drv.setBuildEnv;
@ -84,9 +55,10 @@ let
${drv.setBuildEnv or "" }
echo
echo --- BUILDING WITH CLIPPY ---
echo
echo
export RUSTC=${clippyBuilder}/bin/clippy
export NIX_RUST_BUILD_FLAGS="''${NIX_RUST_BUILD_FLAGS} --deny warnings"
export RUSTC="''${CLIPPY_DRIVER}"
'';
}.${compiler});
@ -97,7 +69,7 @@ let
You can have a complete list of the available options by looking at the overriden object, mkcrate:
https://github.com/cargo2nix/cargo2nix/blob/master/overlay/mkcrate.nix
*/
overrides = pkgs.rustBuilder.overrides.all ++ [
packageOverrides = pkgs: pkgs.rustBuilder.overrides.all ++ [
/*
[1] We add some logic to compile our crates with clippy, it provides us many additional lints
@ -113,12 +85,7 @@ let
As we do not want to consider the .git folder as part of the input source,
we ask the user (the CI often) to pass the value to Nix.
[4] We ship some parts of the code disabled by default by putting them behind a flag.
It speeds up the compilation (when the feature is not required) and released crates have less dependency by default (less attack surface, disk space, etc.).
But we want to ship these additional features when we release Garage.
In the end, we chose to exclude all features from debug builds while putting (all of) them in the release builds.
[5] We don't want libsodium-sys and zstd-sys to try to use pkgconfig to build against a system library.
[4] We don't want libsodium-sys and zstd-sys to try to use pkgconfig to build against a system library.
However the features to do so get activated for some reason (due to a bug in cargo2nix?),
so disable them manually here.
*/
@ -136,10 +103,6 @@ let
/* [1] */ setBuildEnv = (buildEnv drv);
/* [2] */ hardeningDisable = [ "pie" ];
};
overrideArgs = old: {
/* [4] */ features = [ "bundled-libs" "sled" "metrics" "k2v" ]
++ (if release then [ "kubernetes-discovery" "telemetry-otlp" "lmdb" "sqlite" ] else []);
};
})
(pkgs.rustBuilder.rustLib.makeOverride {
@ -190,18 +153,39 @@ let
(pkgs.rustBuilder.rustLib.makeOverride {
name = "libsodium-sys";
overrideArgs = old: {
features = [ ]; /* [5] */
features = [ ]; /* [4] */
};
})
(pkgs.rustBuilder.rustLib.makeOverride {
name = "zstd-sys";
overrideArgs = old: {
features = [ ]; /* [5] */
features = [ ]; /* [4] */
};
})
];
/*
We ship some parts of the code disabled by default by putting them behind a flag.
It speeds up the compilation (when the feature is not required) and released crates have less dependency by default (less attack surface, disk space, etc.).
But we want to ship these additional features when we release Garage.
In the end, we chose to exclude all features from debug builds while putting (all of) them in the release builds.
*/
rootFeatures = if features != null then features else
([
"garage/bundled-libs"
"garage/sled"
"garage/k2v"
] ++ (if release then [
"garage/consul-discovery"
"garage/kubernetes-discovery"
"garage/metrics"
"garage/telemetry-otlp"
"garage/lmdb"
"garage/sqlite"
] else []));
packageFun = import ../Cargo.nix;
/*
@ -222,23 +206,15 @@ let
"x86_64-unknown-linux-musl" = [ "target-feature=+crt-static" "link-arg=-static-pie" ];
};
in
/*
The following definition is not elegant as we use a low level function of Cargo2nix
that enables us to pass our custom rustChannel object. We need this low level definition
to pass Nix's Rust toolchains instead of Mozilla's one.
NixOS and Rust/Cargo triples do not match for ARM, fix it here.
*/
rustTarget = if target == "armv6l-unknown-linux-musleabihf"
then "arm-unknown-linux-musleabihf"
else target;
target is mandatory but must be kept to null to allow cargo2nix to set it to the appropriate value
for each crate.
*/
pkgs.rustBuilder.makePackageSet {
inherit packageFun rustChannel release codegenOpts;
packageOverrides = overrides;
target = null;
buildRustPackages = pkgs.buildPackages.rustBuilder.makePackageSet {
inherit rustChannel packageFun codegenOpts;
packageOverrides = overrides;
target = null;
};
}
in
pkgs.rustBuilder.makePackageSet ({
inherit release packageFun packageOverrides codegenOpts rootFeatures;
target = rustTarget;
} // toolchainOptions)

View file

@ -6,19 +6,24 @@ with import ./common.nix;
let
platforms = [
"x86_64-unknown-linux-musl"
#"x86_64-unknown-linux-musl"
"i686-unknown-linux-musl"
"aarch64-unknown-linux-musl"
#"aarch64-unknown-linux-musl"
"armv6l-unknown-linux-musleabihf"
];
pkgsList = builtins.map (target: import pkgsSrc {
inherit system;
crossSystem = { config = target; };
crossSystem = {
config = target;
isStatic = true;
};
overlays = [ cargo2nixOverlay ];
}) platforms;
pkgsHost = import pkgsSrc {};
lib = pkgsHost.lib;
kaniko = (import ./kaniko.nix) pkgsHost;
winscp = (import ./winscp.nix) pkgsHost;
manifestTool = (import ./manifest-tool.nix) pkgsHost;
in
lib.flatten (builtins.map (pkgs: [
pkgs.rustPlatform.rust.rustc
@ -27,5 +32,6 @@ in
]) pkgsList) ++ [
kaniko
winscp
manifestTool
]

View file

@ -15,9 +15,10 @@ data:
bootstrap_peers = {{ .Values.garage.bootstrapPeers }}
kubernetes_namespace = "{{ .Release.Namespace }}"
kubernetes_service_name = "{{ include "garage.fullname" . }}"
kubernetes_skip_crd = {{ .Values.garage.kubernetesSkipCrd }}
[kubernetes_discovery]
namespace = "{{ .Release.Namespace }}"
service_name = "{{ include "garage.fullname" . }}"
skip_crd = {{ .Values.garage.kubernetesSkipCrd }}
[s3_api]
s3_region = "{{ .Values.garage.s3.api.region }}"
@ -27,4 +28,4 @@ data:
[s3_web]
bind_addr = "[::]:3902"
root_domain = "{{ .Values.garage.s3.web.rootDomain }}"
index = "{{ .Values.garage.s3.web.index }}"
index = "{{ .Values.garage.s3.web.index }}"

View file

@ -8,7 +8,7 @@ SCRIPT_FOLDER="`dirname \"$0\"`"
REPO_FOLDER="${SCRIPT_FOLDER}/../"
GARAGE_DEBUG="${REPO_FOLDER}/target/debug/"
GARAGE_RELEASE="${REPO_FOLDER}/target/release/"
NIX_RELEASE="${REPO_FOLDER}/result/bin/"
NIX_RELEASE="${REPO_FOLDER}/result/bin/:${REPO_FOLDER}/result-bin/bin/"
PATH="${GARAGE_DEBUG}:${GARAGE_RELEASE}:${NIX_RELEASE}:$PATH"
CMDOUT=/tmp/garage.cmd.tmp

View file

@ -15,20 +15,10 @@ let
in
{
/* --- Rust Shell ---
* Use it to compile Garage
*/
rust = pkgs.mkShell {
shellHook = ''
function refresh_toolchain {
nix copy \
--to 's3://nix?endpoint=garage.deuxfleurs.fr&region=garage&secret-key=/etc/nix/signing-key.sec' \
$(nix-store -qR \
$(nix-build --quiet --no-build-output --no-out-link nix/toolchain.nix))
}
'';
nativeBuildInputs = [
#pkgs.rustPlatform.rust.rustc
pkgs.rustPlatform.rust.cargo
@ -67,12 +57,33 @@ function refresh_toolchain {
*/
release = pkgs.mkShell {
shellHook = ''
function refresh_toolchain {
pass show deuxfleurs/nix_priv_key > /tmp/nix-signing-key.sec
nix copy \
--to 's3://nix?endpoint=garage.deuxfleurs.fr&region=garage&secret-key=/tmp/nix-signing-key.sec' \
$(nix-store -qR \
$(nix-build --no-build-output --no-out-link nix/toolchain.nix))
rm /tmp/nix-signing-key.sec
}
function refresh_cache {
pass show deuxfleurs/nix_priv_key > /tmp/nix-signing-key.sec
for attr in clippy.amd64 test.amd64 pkgs.{amd64,i386,arm,arm64}.{debug,release}; do
echo "Updating cache for ''${attr}"
derivation=$(nix-instantiate --attr ''${attr})
nix copy \
--to 's3://nix?endpoint=garage.deuxfleurs.fr&region=garage&secret-key=/tmp/nix-signing-key.sec' \
$(nix-store -qR ''${derivation%\!bin})
done
rm /tmp/nix-signing-key.sec
}
function to_s3 {
aws \
--endpoint-url https://garage.deuxfleurs.fr \
--region garage \
s3 cp \
./result/bin/garage \
./result-bin/bin/garage \
s3://garagehq.deuxfleurs.fr/_releases/''${DRONE_TAG:-$DRONE_COMMIT}/''${TARGET}/garage
}

View file

@ -36,7 +36,7 @@ sha2 = "0.10"
futures = "0.3"
futures-util = "0.3"
pin-project = "1.0"
pin-project = "1.0.11"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-stream = "0.1"

View file

@ -5,7 +5,7 @@ use async_trait::async_trait;
use futures::future::Future;
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
use hyper::{Body, Request, Response};
use hyper::{Body, Request, Response, StatusCode};
use opentelemetry::trace::SpanRef;
@ -69,7 +69,7 @@ impl AdminApiServer {
fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> {
Ok(Response::builder()
.status(204)
.status(StatusCode::NO_CONTENT)
.header(ALLOW, "OPTIONS, GET, POST")
.header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST")
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
@ -94,7 +94,7 @@ impl AdminApiServer {
.ok_or_internal_error("Could not serialize metrics")?;
Ok(Response::builder()
.status(200)
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))?)
}

View file

@ -151,7 +151,7 @@ pub async fn handle_update_cluster_layout(
garage.system.update_cluster_layout(&layout).await?;
Ok(Response::builder()
.status(StatusCode::OK)
.status(StatusCode::NO_CONTENT)
.body(Body::empty())?)
}
@ -171,7 +171,7 @@ pub async fn handle_apply_cluster_layout(
garage.system.update_cluster_layout(&layout).await?;
Ok(Response::builder()
.status(StatusCode::OK)
.status(StatusCode::NO_CONTENT)
.body(Body::empty())?)
}
@ -186,7 +186,7 @@ pub async fn handle_revert_cluster_layout(
garage.system.update_cluster_layout(&layout).await?;
Ok(Response::builder()
.status(StatusCode::OK)
.status(StatusCode::NO_CONTENT)
.body(Body::empty())?)
}

View file

@ -42,7 +42,7 @@ pub async fn handle_insert_batch(
garage.k2v.rpc.insert_batch(bucket_id, items2).await?;
Ok(Response::builder()
.status(StatusCode::OK)
.status(StatusCode::NO_CONTENT)
.body(Body::empty())?)
}

View file

@ -153,7 +153,7 @@ pub async fn handle_insert_item(
.await?;
Ok(Response::builder()
.status(StatusCode::OK)
.status(StatusCode::NO_CONTENT)
.body(Body::empty())?)
}

View file

@ -1,9 +1,5 @@
use crate::*;
use crate::lmdb_adapter::LmdbDb;
use crate::sled_adapter::SledDb;
use crate::sqlite_adapter::SqliteDb;
fn test_suite(db: Db) {
let tree = db.open_tree("tree").unwrap();
@ -80,7 +76,10 @@ fn test_suite(db: Db) {
}
#[test]
#[cfg(feature = "lmdb")]
fn test_lmdb_db() {
use crate::lmdb_adapter::LmdbDb;
let path = mktemp::Temp::new_dir().unwrap();
let db = heed::EnvOpenOptions::new()
.max_dbs(100)
@ -92,7 +91,10 @@ fn test_lmdb_db() {
}
#[test]
#[cfg(feature = "sled")]
fn test_sled_db() {
use crate::sled_adapter::SledDb;
let path = mktemp::Temp::new_dir().unwrap();
let db = SledDb::init(sled::open(path.to_path_buf()).unwrap());
test_suite(db);
@ -100,7 +102,10 @@ fn test_sled_db() {
}
#[test]
#[cfg(feature = "sqlite")]
fn test_sqlite_db() {
use crate::sqlite_adapter::SqliteDb;
let db = SqliteDb::init(rusqlite::Connection::open_in_memory().unwrap());
test_suite(db);
}

View file

@ -30,9 +30,11 @@ garage_table = { version = "0.8.0", path = "../table" }
garage_util = { version = "0.8.0", path = "../util" }
garage_web = { version = "0.8.0", path = "../web" }
backtrace = "0.3"
bytes = "1.0"
bytesize = "1.1"
timeago = "0.3"
parse_duration = "2.1"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
@ -58,7 +60,7 @@ opentelemetry-otlp = { version = "0.10", optional = true }
prometheus = { version = "0.13", optional = true }
[dev-dependencies]
aws-sdk-s3 = "0.8"
aws-sdk-s3 = "0.19"
chrono = "0.4"
http = "0.2"
hmac = "0.12"
@ -81,6 +83,8 @@ sled = [ "garage_model/sled" ]
lmdb = [ "garage_model/lmdb" ]
sqlite = [ "garage_model/sqlite" ]
# Automatic registration and discovery via Consul API
consul-discovery = [ "garage_rpc/consul-discovery" ]
# Automatic registration and discovery via Kubernetes API
kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ]
# Prometheus exporter (/metrics endpoint).

View file

@ -85,6 +85,9 @@ impl AdminRpcHandler {
BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
BucketOperation::Website(query) => self.handle_bucket_website(query).await,
BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
BucketOperation::CleanupIncompleteUploads(query) => {
self.handle_bucket_cleanup_incomplete_uploads(query).await
}
}
}
@ -512,6 +515,42 @@ impl AdminRpcHandler {
)))
}
async fn handle_bucket_cleanup_incomplete_uploads(
&self,
query: &CleanupIncompleteUploadsOpt,
) -> Result<AdminRpc, Error> {
let mut bucket_ids = vec![];
for b in query.buckets.iter() {
bucket_ids.push(
self.garage
.bucket_helper()
.resolve_global_bucket_name(b)
.await?
.ok_or_bad_request(format!("Bucket not found: {}", b))?,
);
}
let duration = parse_duration::parse::parse(&query.older_than)
.ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
let mut ret = String::new();
for bucket in bucket_ids {
let count = self
.garage
.bucket_helper()
.cleanup_incomplete_uploads(&bucket, duration)
.await?;
writeln!(
&mut ret,
"Bucket {:?}: {} incomplete uploads aborted",
bucket, count
)
.unwrap();
}
Ok(AdminRpc::Ok(ret))
}
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => self.handle_list_keys().await,

View file

@ -200,6 +200,10 @@ pub enum BucketOperation {
/// Set the quotas for this bucket
#[structopt(name = "set-quotas", version = garage_version())]
SetQuotas(SetQuotasOpt),
/// Clean up (abort) old incomplete multipart uploads
#[structopt(name = "cleanup-incomplete-uploads", version = garage_version())]
CleanupIncompleteUploads(CleanupIncompleteUploadsOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
@ -301,6 +305,17 @@ pub struct SetQuotasOpt {
pub max_objects: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct CleanupIncompleteUploadsOpt {
/// Abort multipart uploads older than this value
#[structopt(long = "older-than", default_value = "1d")]
pub older_than: String,
/// Name of bucket(s) to clean up
#[structopt(required = true)]
pub buckets: Vec<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum KeyOperation {
/// List keys

View file

@ -65,21 +65,6 @@ struct Opt {
#[tokio::main]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "netapp=info,garage=info")
}
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
.init();
sodiumoxide::init().expect("Unable to init sodiumoxide");
// Abort on panic (same behavior as in Go)
std::panic::set_hook(Box::new(|panic_info| {
error!("{}", panic_info.to_string());
std::process::abort();
}));
// Initialize version and features info
let features = &[
#[cfg(feature = "k2v")]
@ -90,6 +75,8 @@ async fn main() {
"lmdb",
#[cfg(feature = "sqlite")]
"sqlite",
#[cfg(feature = "consul-discovery")]
"consul-discovery",
#[cfg(feature = "kubernetes-discovery")]
"kubernetes-discovery",
#[cfg(feature = "metrics")]
@ -106,12 +93,51 @@ async fn main() {
}
garage_util::version::init_features(features);
// Parse arguments
let version = format!(
"{} [features: {}]",
garage_util::version::garage_version(),
features.join(", ")
);
// Initialize panic handler that aborts on panic and shows a nice message.
// By default, Tokio continues runing normally when a task panics. We want
// to avoid this behavior in Garage as this would risk putting the process in an
// unknown/uncontrollable state. We prefer to exit the process and restart it
// from scratch, so that it boots back into a fresh, known state.
let panic_version_info = version.clone();
std::panic::set_hook(Box::new(move |panic_info| {
eprintln!("======== PANIC (internal Garage error) ========");
eprintln!("{}", panic_info);
eprintln!();
eprintln!("Panics are internal errors that Garage is unable to handle on its own.");
eprintln!("They can be caused by bugs in Garage's code, or by corrupted data in");
eprintln!("the node's storage. If you feel that this error is likely to be a bug");
eprintln!("in Garage, please report it on our issue tracker a the following address:");
eprintln!();
eprintln!(" https://git.deuxfleurs.fr/Deuxfleurs/garage/issues");
eprintln!();
eprintln!("Please include the last log messages and the the full backtrace below in");
eprintln!("your bug report, as well as any relevant information on the context in");
eprintln!("which Garage was running when this error occurred.");
eprintln!();
eprintln!("GARAGE VERSION: {}", panic_version_info);
eprintln!();
eprintln!("BACKTRACE:");
eprintln!("{:?}", backtrace::Backtrace::new());
std::process::abort();
}));
// Initialize logging as well as other libraries used in Garage
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "netapp=info,garage=info")
}
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
.init();
sodiumoxide::init().expect("Unable to init sodiumoxide");
// Parse arguments and dispatch command line
let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches());
let res = match opt.cmd {

View file

@ -6,7 +6,7 @@ use assert_json_diff::assert_json_eq;
use serde_json::json;
use super::json_body;
use hyper::Method;
use hyper::{Method, StatusCode};
#[tokio::test]
async fn test_batch() {
@ -49,7 +49,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
for sk in ["a", "b", "c", "d.1", "d.2", "e"] {
let res = ctx
@ -62,7 +62,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
@ -104,7 +104,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
let json_res = json_body(res).await;
assert_json_eq!(
json_res,
@ -266,7 +266,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
for sk in ["b", "c", "d.1", "d.2"] {
let res = ctx
@ -280,9 +280,9 @@ async fn test_batch() {
.await
.unwrap();
if sk == "b" {
assert_eq!(res.status(), 204);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
} else {
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
}
ct.insert(
sk,
@ -317,7 +317,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
let json_res = json_body(res).await;
assert_json_eq!(
json_res,
@ -478,7 +478,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
let json_res = json_body(res).await;
assert_json_eq!(
json_res,
@ -514,7 +514,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 204);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
@ -547,7 +547,7 @@ async fn test_batch() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
let json_res = json_body(res).await;
assert_json_eq!(
json_res,

View file

@ -1,13 +1,13 @@
use crate::common;
use hyper::Method;
use hyper::{Method, StatusCode};
#[tokio::test]
async fn test_error_codes() {
let ctx = common::context();
let bucket = ctx.create_bucket("test-k2v-error-codes");
// Regular insert should work (code 200)
// Regular insert should work (code 204)
let res = ctx
.k2v
.request
@ -19,7 +19,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Insert with trash causality token: invalid request
let res = ctx
@ -34,7 +34,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 400);
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Search without partition key: invalid request
let res = ctx
@ -52,7 +52,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 400);
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Search with start that is not in prefix: invalid request
let res = ctx
@ -70,7 +70,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 400);
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Search with invalid json: 400
let res = ctx
@ -88,7 +88,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 400);
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Batch insert with invalid causality token: 400
let res = ctx
@ -105,7 +105,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 400);
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Batch insert with invalid data: 400
let res = ctx
@ -122,7 +122,7 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 400);
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
// Poll with invalid causality token: 400
let res = ctx
@ -137,5 +137,5 @@ async fn test_error_codes() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 400);
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
}

View file

@ -6,7 +6,7 @@ use assert_json_diff::assert_json_eq;
use serde_json::json;
use super::json_body;
use hyper::Method;
use hyper::{Method, StatusCode};
#[tokio::test]
async fn test_items_and_indices() {
@ -56,7 +56,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Get value back
let res = ctx
@ -69,7 +69,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
@ -132,7 +132,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Get value back
let res = ctx
@ -145,7 +145,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
@ -201,7 +201,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Get value back
let res = ctx
@ -214,7 +214,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@ -271,7 +271,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
let ct = res
.headers()
.get("x-garage-causality-token")
@ -292,7 +292,7 @@ async fn test_items_and_indices() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 204);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// ReadIndex -- now there should be some stuff
tokio::time::sleep(Duration::from_secs(1)).await;
@ -364,7 +364,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// f0: either
let res = ctx
@ -377,7 +377,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
@ -405,7 +405,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@ -424,7 +424,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/octet-stream"
@ -446,7 +446,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@ -466,7 +466,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// f0: either
let res = ctx
@ -479,7 +479,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@ -503,7 +503,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@ -528,7 +528,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 409); // CONFLICT
assert_eq!(res.status(), StatusCode::CONFLICT); // CONFLICT
// f3: json
let res = ctx
@ -541,7 +541,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@ -568,7 +568,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 204);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// f0: either
let res = ctx
@ -581,7 +581,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@ -599,7 +599,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@ -625,7 +625,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 409); // CONFLICT
assert_eq!(res.status(), StatusCode::CONFLICT); // CONFLICT
// f3: json
let res = ctx
@ -638,7 +638,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@ -658,7 +658,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 204);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// f0: either
let res = ctx
@ -671,7 +671,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 204); // NO CONTENT
assert_eq!(res.status(), StatusCode::NO_CONTENT); // NO CONTENT
// f1: not specified
let res = ctx
@ -683,7 +683,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"
@ -702,7 +702,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 204); // NO CONTENT
assert_eq!(res.status(), StatusCode::NO_CONTENT); // NO CONTENT
// f3: json
let res = ctx
@ -715,7 +715,7 @@ async fn test_item_return_format() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers().get("content-type").unwrap().to_str().unwrap(),
"application/json"

View file

@ -1,4 +1,4 @@
use hyper::Method;
use hyper::{Method, StatusCode};
use std::time::Duration;
use crate::common;
@ -20,7 +20,7 @@ async fn test_poll() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Retrieve initial value to get its causality token
let res2 = ctx
@ -33,7 +33,7 @@ async fn test_poll() {
.send()
.await
.unwrap();
assert_eq!(res2.status(), 200);
assert_eq!(res2.status(), StatusCode::OK);
let ct = res2
.headers()
.get("x-garage-causality-token")
@ -80,7 +80,7 @@ async fn test_poll() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// Check poll finishes with correct value
let poll_res = tokio::select! {
@ -88,7 +88,7 @@ async fn test_poll() {
res = poll => res.unwrap().unwrap(),
};
assert_eq!(poll_res.status(), 200);
assert_eq!(poll_res.status(), StatusCode::OK);
let poll_res_body = hyper::body::to_bytes(poll_res.into_body())
.await

View file

@ -1,6 +1,6 @@
use crate::common;
use hyper::Method;
use hyper::{Method, StatusCode};
#[tokio::test]
async fn test_simple() {
@ -18,7 +18,7 @@ async fn test_simple() {
.send()
.await
.unwrap();
assert_eq!(res.status(), 200);
assert_eq!(res.status(), StatusCode::NO_CONTENT);
let res2 = ctx
.k2v
@ -30,7 +30,7 @@ async fn test_simple() {
.send()
.await
.unwrap();
assert_eq!(res2.status(), 200);
assert_eq!(res2.status(), StatusCode::OK);
let res2_body = hyper::body::to_bytes(res2.into_body())
.await

View file

@ -4,7 +4,7 @@ use aws_sdk_s3::{
model::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
types::ByteStream,
};
use http::Request;
use http::{Request, StatusCode};
use hyper::{
body::{to_bytes, Body},
Client,
@ -43,7 +43,7 @@ async fn test_website() {
let mut resp = client.request(req()).await.unwrap();
assert_eq!(resp.status(), 404);
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
@ -56,7 +56,7 @@ async fn test_website() {
.expect_success_status("Could not allow website on bucket");
resp = client.request(req()).await.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
@ -69,7 +69,7 @@ async fn test_website() {
.expect_success_status("Could not deny website on bucket");
resp = client.request(req()).await.unwrap();
assert_eq!(resp.status(), 404);
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
@ -175,7 +175,7 @@ async fn test_website_s3_api() {
let mut resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get("access-control-allow-origin").unwrap(),
"*"
@ -200,7 +200,7 @@ async fn test_website_s3_api() {
let mut resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 404);
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_eq!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY_ERR.as_ref()
@ -220,7 +220,7 @@ async fn test_website_s3_api() {
let mut resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 200);
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get("access-control-allow-origin").unwrap(),
"*"
@ -244,7 +244,7 @@ async fn test_website_s3_api() {
let mut resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 403);
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
@ -285,7 +285,7 @@ async fn test_website_s3_api() {
let mut resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 403);
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
@ -311,7 +311,7 @@ async fn test_website_s3_api() {
let mut resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 404);
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_ne!(
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY_ERR.as_ref()

View file

@ -12,7 +12,7 @@ readme = "../../README.md"
base64 = "0.13.0"
http = "0.2.6"
log = "0.4"
rusoto_core = "0.48.0"
rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] }
rusoto_credential = "0.48.0"
rusoto_signature = "0.48.0"
serde = "1.0.137"

View file

@ -1,3 +1,5 @@
use std::time::Duration;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage};
@ -12,7 +14,7 @@ use crate::helper::error::*;
use crate::helper::key::KeyHelper;
use crate::key_table::*;
use crate::permission::BucketKeyPerm;
use crate::s3::object_table::ObjectFilter;
use crate::s3::object_table::*;
pub struct BucketHelper<'a>(pub(crate) &'a Garage);
@ -472,4 +474,69 @@ impl<'a> BucketHelper<'a> {
Ok(true)
}
// ----
/// Deletes all incomplete multipart uploads that are older than a certain time.
/// Returns the number of uploads aborted
pub async fn cleanup_incomplete_uploads(
&self,
bucket_id: &Uuid,
older_than: Duration,
) -> Result<usize, Error> {
let older_than = now_msec() - older_than.as_millis() as u64;
let mut ret = 0usize;
let mut start = None;
loop {
let objects = self
.0
.object_table
.get_range(
bucket_id,
start,
Some(ObjectFilter::IsUploading),
1000,
EnumerationOrder::Forward,
)
.await?;
let abortions = objects
.iter()
.filter_map(|object| {
let aborted_versions = object
.versions()
.iter()
.filter(|v| v.is_uploading() && v.timestamp < older_than)
.map(|v| ObjectVersion {
state: ObjectVersionState::Aborted,
uuid: v.uuid,
timestamp: v.timestamp,
})
.collect::<Vec<_>>();
if !aborted_versions.is_empty() {
Some(Object::new(
object.bucket_id,
object.key.clone(),
aborted_versions,
))
} else {
None
}
})
.collect::<Vec<_>>();
ret += abortions.len();
self.0.object_table.insert_many(abortions).await?;
if objects.len() < 1000 {
break;
} else {
start = Some(objects.last().unwrap().key.clone());
}
}
Ok(ret)
}
}

View file

@ -30,12 +30,13 @@ rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
serde_json = "1.0"
err-derive = { version = "0.3", optional = true }
# newer version requires rust edition 2021
kube = { version = "0.62", features = ["runtime", "derive"], optional = true }
k8s-openapi = { version = "0.13", features = ["v1_22"], optional = true }
openssl = { version = "0.10", features = ["vendored"], optional = true }
kube = { version = "0.75", default-features = false, features = ["runtime", "derive", "client", "rustls-tls"], optional = true }
k8s-openapi = { version = "0.16", features = ["v1_22"], optional = true }
schemars = { version = "0.8", optional = true }
reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls-manual-roots", "json"] }
# newer version requires rust edition 2021
pnet_datalink = "0.28"
@ -48,9 +49,7 @@ opentelemetry = "0.17"
netapp = { version = "0.5.2", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
[features]
kubernetes-discovery = [ "kube", "k8s-openapi", "openssl", "schemars" ]
kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ]
consul-discovery = [ "reqwest", "err-derive" ]
system-libs = [ "sodiumoxide/use-pkg-config" ]

View file

@ -1,16 +1,14 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::net::{IpAddr, SocketAddr};
use hyper::client::Client;
use hyper::StatusCode;
use hyper::{Body, Method, Request};
use err_derive::Error;
use serde::{Deserialize, Serialize};
use netapp::NodeID;
use garage_util::error::Error;
// ---- READING FROM CONSUL CATALOG ----
use garage_util::config::ConsulDiscoveryConfig;
#[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry {
@ -22,53 +20,6 @@ struct ConsulQueryEntry {
node_meta: HashMap<String, String>,
}
pub async fn get_consul_nodes(
consul_host: &str,
consul_service_name: &str,
) -> Result<Vec<(NodeID, SocketAddr)>, Error> {
let url = format!(
"http://{}/v1/catalog/service/{}",
consul_host, consul_service_name
);
let req = Request::builder()
.uri(url)
.method(Method::GET)
.body(Body::default())?;
let client = Client::new();
let resp = client.request(req).await?;
if resp.status() != StatusCode::OK {
return Err(Error::Message(format!("HTTP error {}", resp.status())));
}
let body = hyper::body::to_bytes(resp.into_body()).await?;
let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?;
let mut ret = vec![];
for ent in entries {
let ip = ent.address.parse::<IpAddr>().ok();
let pubkey = ent
.node_meta
.get("pubkey")
.and_then(|k| hex::decode(&k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
} else {
warn!(
"Could not process node spec from Consul: {:?} (invalid IP or public key)",
ent
);
}
}
debug!("Got nodes from Consul: {:?}", ret);
Ok(ret)
}
// ---- PUBLISHING TO CONSUL CATALOG ----
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishEntry {
#[serde(rename = "Node")]
@ -95,57 +46,134 @@ struct ConsulPublishService {
port: u16,
}
pub async fn publish_consul_service(
consul_host: &str,
consul_service_name: &str,
node_id: NodeID,
hostname: &str,
rpc_public_addr: SocketAddr,
) -> Result<(), Error> {
let node = format!("garage:{}", hex::encode(&node_id[..8]));
// ----
let advertisment = ConsulPublishEntry {
node: node.clone(),
address: rpc_public_addr.ip(),
node_meta: [
("pubkey".to_string(), hex::encode(node_id)),
("hostname".to_string(), hostname.to_string()),
]
.iter()
.cloned()
.collect(),
service: ConsulPublishService {
service_id: node.clone(),
service_name: consul_service_name.to_string(),
tags: vec!["advertised-by-garage".into(), hostname.into()],
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
},
};
pub struct ConsulDiscovery {
config: ConsulDiscoveryConfig,
client: reqwest::Client,
}
let url = format!("http://{}/v1/catalog/register", consul_host);
let req_body = serde_json::to_string(&advertisment)?;
debug!("Request body for consul adv: {}", req_body);
impl ConsulDiscovery {
pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> {
let client = match (&config.client_cert, &config.client_key) {
(Some(client_cert), Some(client_key)) => {
let mut client_cert_buf = vec![];
File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
let req = Request::builder()
.uri(url)
.method(Method::PUT)
.body(Body::from(req_body))?;
let mut client_key_buf = vec![];
File::open(client_key)?.read_to_end(&mut client_key_buf)?;
let client = Client::new();
let identity = reqwest::Identity::from_pem(
&[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
)?;
let resp = client.request(req).await?;
debug!("Response of advertising to Consul: {:?}", resp);
let resp_code = resp.status();
let resp_bytes = &hyper::body::to_bytes(resp.into_body()).await?;
debug!(
"{}",
std::str::from_utf8(resp_bytes).unwrap_or("<invalid utf8>")
);
if config.tls_skip_verify {
reqwest::Client::builder()
.use_rustls_tls()
.danger_accept_invalid_certs(true)
.identity(identity)
.build()?
} else if let Some(ca_cert) = &config.ca_cert {
let mut ca_cert_buf = vec![];
File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
if resp_code != StatusCode::OK {
return Err(Error::Message(format!("HTTP error {}", resp_code)));
reqwest::Client::builder()
.use_rustls_tls()
.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
.identity(identity)
.build()?
} else {
reqwest::Client::builder()
.use_rustls_tls()
.identity(identity)
.build()?
}
}
(None, None) => reqwest::Client::new(),
_ => return Err(ConsulError::InvalidTLSConfig),
};
Ok(Self { client, config })
}
Ok(())
// ---- READING FROM CONSUL CATALOG ----
pub async fn get_consul_nodes(&self) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> {
let url = format!(
"{}/v1/catalog/service/{}",
self.config.consul_http_addr, self.config.service_name
);
let http = self.client.get(&url).send().await?;
let entries: Vec<ConsulQueryEntry> = http.json().await?;
let mut ret = vec![];
for ent in entries {
let ip = ent.address.parse::<IpAddr>().ok();
let pubkey = ent
.node_meta
.get("pubkey")
.and_then(|k| hex::decode(&k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
} else {
warn!(
"Could not process node spec from Consul: {:?} (invalid IP or public key)",
ent
);
}
}
debug!("Got nodes from Consul: {:?}", ret);
Ok(ret)
}
// ---- PUBLISHING TO CONSUL CATALOG ----
pub async fn publish_consul_service(
&self,
node_id: NodeID,
hostname: &str,
rpc_public_addr: SocketAddr,
) -> Result<(), ConsulError> {
let node = format!("garage:{}", hex::encode(&node_id[..8]));
let advertisement = ConsulPublishEntry {
node: node.clone(),
address: rpc_public_addr.ip(),
node_meta: [
("pubkey".to_string(), hex::encode(node_id)),
("hostname".to_string(), hostname.to_string()),
]
.iter()
.cloned()
.collect(),
service: ConsulPublishService {
service_id: node.clone(),
service_name: self.config.service_name.clone(),
tags: vec!["advertised-by-garage".into(), hostname.into()],
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
},
};
let url = format!("{}/v1/catalog/register", self.config.consul_http_addr);
let http = self.client.put(&url).json(&advertisement).send().await?;
http.error_for_status()?;
Ok(())
}
}
/// Regroup all Consul discovery errors
#[derive(Debug, Error)]
pub enum ConsulError {
#[error(display = "IO error: {}", _0)]
Io(#[error(source)] std::io::Error),
#[error(display = "HTTP error: {}", _0)]
Reqwest(#[error(source)] reqwest::Error),
#[error(display = "Invalid Consul TLS configuration")]
InvalidTLSConfig,
}

View file

@ -12,6 +12,8 @@ use serde::{Deserialize, Serialize};
use netapp::NodeID;
use garage_util::config::KubernetesDiscoveryConfig;
static K8S_GROUP: &str = "deuxfleurs.fr";
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
@ -41,15 +43,14 @@ pub async fn create_kubernetes_crd() -> Result<(), kube::Error> {
}
pub async fn get_kubernetes_nodes(
kubernetes_service_name: &str,
kubernetes_namespace: &str,
kubernetes_config: &KubernetesDiscoveryConfig,
) -> Result<Vec<(NodeID, SocketAddr)>, kube::Error> {
let client = Client::try_default().await?;
let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
let nodes: Api<GarageNode> = Api::namespaced(client.clone(), &kubernetes_config.namespace);
let lp = ListParams::default().labels(&format!(
"garage.{}/service={}",
K8S_GROUP, kubernetes_service_name
K8S_GROUP, kubernetes_config.service_name
));
let nodes = nodes.list(&lp).await?;
@ -73,8 +74,7 @@ pub async fn get_kubernetes_nodes(
}
pub async fn publish_kubernetes_node(
kubernetes_service_name: &str,
kubernetes_namespace: &str,
kubernetes_config: &KubernetesDiscoveryConfig,
node_id: NodeID,
hostname: &str,
rpc_public_addr: SocketAddr,
@ -93,13 +93,13 @@ pub async fn publish_kubernetes_node(
let labels = node.metadata.labels.insert(BTreeMap::new());
labels.insert(
format!("garage.{}/service", K8S_GROUP),
kubernetes_service_name.to_string(),
kubernetes_config.service_name.to_string(),
);
debug!("Node object to be applied: {:#?}", node);
let client = Client::try_default().await?;
let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
let nodes: Api<GarageNode> = Api::namespaced(client.clone(), &kubernetes_config.namespace);
if let Ok(old_node) = nodes.get(&node_pubkey).await {
node.metadata.resource_version = old_node.metadata.resource_version;

View file

@ -3,6 +3,7 @@
#[macro_use]
extern crate tracing;
#[cfg(feature = "consul-discovery")]
mod consul;
#[cfg(feature = "kubernetes-discovery")]
mod kubernetes;

View file

@ -23,12 +23,15 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
use garage_util::time::*;
use crate::consul::*;
#[cfg(feature = "consul-discovery")]
use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::*;
@ -90,12 +93,14 @@ pub struct System {
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<String>,
consul_discovery: Option<ConsulDiscoveryParam>,
#[cfg(feature = "consul-discovery")]
consul_discovery: Option<ConsulDiscovery>,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryParam>,
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
replication_factor: usize,
@ -285,29 +290,21 @@ impl System {
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
let consul_discovery = match (&config.consul_host, &config.consul_service_name) {
(Some(ch), Some(csn)) => Some(ConsulDiscoveryParam {
consul_host: ch.to_string(),
service_name: csn.to_string(),
}),
_ => None,
};
#[cfg(feature = "kubernetes-discovery")]
let kubernetes_discovery = match (
&config.kubernetes_service_name,
&config.kubernetes_namespace,
) {
(Some(ksn), Some(kn)) => Some(KubernetesDiscoveryParam {
service_name: ksn.to_string(),
namespace: kn.to_string(),
skip_crd: config.kubernetes_skip_crd,
}),
_ => None,
#[cfg(feature = "consul-discovery")]
let consul_discovery = match &config.consul_discovery {
Some(cfg) => Some(
ConsulDiscovery::new(cfg.clone())
.ok_or_message("Invalid Consul discovery configuration")?,
),
None => None,
};
#[cfg(not(feature = "consul-discovery"))]
if config.consul_discovery.is_some() {
warn!("Consul discovery is not enabled in this build.");
}
#[cfg(not(feature = "kubernetes-discovery"))]
if config.kubernetes_service_name.is_some() || config.kubernetes_namespace.is_some() {
if config.kubernetes_discovery.is_some() {
warn!("Kubernetes discovery is not enabled in this build.");
}
@ -329,11 +326,13 @@ impl System {
system_endpoint,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(),
#[cfg(feature = "consul-discovery")]
consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery,
kubernetes_discovery: config.kubernetes_discovery.clone(),
ring,
update_ring: Mutex::new(update_ring),
@ -432,6 +431,7 @@ impl System {
// ---- INTERNALS ----
#[cfg(feature = "consul-discovery")]
async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
let c = match &self.consul_discovery {
Some(c) => c,
@ -446,9 +446,7 @@ impl System {
}
};
publish_consul_service(
&c.consul_host,
&c.service_name,
c.publish_consul_service(
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
@ -473,8 +471,7 @@ impl System {
};
publish_kubernetes_node(
&k.service_name,
&k.namespace,
k,
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
@ -644,8 +641,9 @@ impl System {
}
// Fetch peer list from Consul
#[cfg(feature = "consul-discovery")]
if let Some(c) = &self.consul_discovery {
match get_consul_nodes(&c.consul_host, &c.service_name).await {
match c.get_consul_nodes().await {
Ok(node_list) => {
ping_list.extend(node_list);
}
@ -667,7 +665,7 @@ impl System {
};
}
match get_kubernetes_nodes(&k.service_name, &k.namespace).await {
match get_kubernetes_nodes(k).await {
Ok(node_list) => {
ping_list.extend(node_list);
}
@ -691,6 +689,7 @@ impl System {
warn!("Could not save peer list to file: {}", e);
}
#[cfg(feature = "consul-discovery")]
self.background.spawn(self.clone().advertise_to_consul());
#[cfg(feature = "kubernetes-discovery")]
@ -785,15 +784,3 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
ret
}
struct ConsulDiscoveryParam {
consul_host: String,
service_name: String,
}
#[cfg(feature = "kubernetes-discovery")]
struct KubernetesDiscoveryParam {
service_name: String,
namespace: String,
skip_crd: bool,
}

View file

@ -46,20 +46,17 @@ pub struct Config {
/// Timeout for Netapp RPC calls
pub rpc_timeout_msec: Option<u64>,
// -- Bootstraping and discovery
/// Bootstrap peers RPC address
#[serde(default)]
pub bootstrap_peers: Vec<String>,
/// Consul host to connect to to discover more peers
pub consul_host: Option<String>,
/// Consul service name to use
pub consul_service_name: Option<String>,
/// Kubernetes namespace the service discovery resources are be created in
pub kubernetes_namespace: Option<String>,
/// Service name to filter for in k8s custom resources
pub kubernetes_service_name: Option<String>,
/// Skip creation of the garagenodes CRD
/// Configuration for automatic node discovery through Consul
#[serde(default)]
pub kubernetes_skip_crd: bool,
pub consul_discovery: Option<ConsulDiscoveryConfig>,
/// Configuration for automatic node discovery through Kubernetes
#[serde(default)]
pub kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
// -- DB
/// Database engine to use for metadata (options: sled, sqlite, lmdb)
@ -129,6 +126,34 @@ pub struct AdminConfig {
pub trace_sink: Option<String>,
}
#[derive(Deserialize, Debug, Clone)]
pub struct ConsulDiscoveryConfig {
/// Consul http or https address to connect to to discover more peers
pub consul_http_addr: String,
/// Consul service name to use
pub service_name: String,
/// CA TLS certificate to use when connecting to Consul
pub ca_cert: Option<String>,
/// Client TLS certificate to use when connecting to Consul
pub client_cert: Option<String>,
/// Client TLS key to use when connecting to Consul
pub client_key: Option<String>,
/// Skip TLS hostname verification
#[serde(default)]
pub tls_skip_verify: bool,
}
#[derive(Deserialize, Debug, Clone)]
pub struct KubernetesDiscoveryConfig {
/// Kubernetes namespace the service discovery resources are be created in
pub namespace: String,
/// Service name to filter for in k8s custom resources
pub service_name: String,
/// Skip creation of the garagenodes CRD
#[serde(default)]
pub skip_crd: bool,
}
fn default_db_engine() -> String {
"sled".into()
}

View file

@ -318,7 +318,7 @@ fn path_to_key<'a>(path: &'a str, index: &str) -> Result<Cow<'a, str>, Error> {
}
Some(_) => match path_utf8 {
Cow::Borrowed(pu8) => Ok((&pu8[1..]).into()),
Cow::Owned(pu8) => Ok((&pu8[1..]).to_string().into()),
Cow::Owned(pu8) => Ok(pu8[1..].to_string().into()),
},
}
}