Compare commits

...

64 commits

Author SHA1 Message Date
0e05b9bc77
cargo fmt 2021-04-17 20:30:43 +02:00
163ee977a0
Small improvements to compression code 2021-04-14 23:37:41 +02:00
307858bf0a Merge pull request 'Compress with zstd' (#44) from trinity-1686a/garage:zstd-block into main
Reviewed-on: Deuxfleurs/garage#44
2021-04-14 23:27:33 +02:00
76efb4999d
re-disable drone caches 2021-04-08 20:13:28 +02:00
6e28ea6bdc
Use pre-prepared Docker image in CI to speed things up 2021-04-08 20:06:48 +02:00
7ba80b6108
Update drone CI signature 2021-04-08 19:55:20 +02:00
e2e8ea4df5 Merge pull request 'Make CI faster' (#57) from trinity-1686a/garage:improve-ci into main
Reviewed-on: Deuxfleurs/garage#57
2021-04-08 19:55:07 +02:00
992284d545 use volume for cargo home
so it gets preserved between steps, and registry download is done only once
2021-04-08 17:08:26 +02:00
d4fd074000 use zstd checksumming 2021-04-08 15:44:08 +02:00
aeadb071c6 delete plain block when getting a compressed one 2021-04-08 15:42:14 +02:00
4d5263ccf3 change extention for compressed files
set_extension don't behave verywell with extensions containing multiple dots
2021-04-08 15:42:14 +02:00
e4b66a9e28 add testing on compressible data in smoke test 2021-04-08 15:42:14 +02:00
9215d63e3a add checksum to end of file instead of decompressing 2021-04-08 15:42:12 +02:00
01c623e391 use BlockData instead of Vec+bool 2021-04-08 15:41:27 +02:00
de8e47435f compress blocs with zstd 2021-04-08 15:38:31 +02:00
7560a41ab1
fix drone CI signature 2021-04-08 15:30:07 +02:00
898fd1dc81
Skip Drone CI caching 2021-04-08 15:29:22 +02:00
af6774a511
change a few comments 2021-04-08 15:13:02 +02:00
c35c472dc9 Merge pull request 'add doc comments' (#53) from trinity-1686a/garage:doc-comments into main
Reviewed-on: Deuxfleurs/garage#53
2021-04-08 15:01:13 +02:00
718ae00548 change some more comments and revert changes on TableSchema 2021-04-07 13:39:34 +02:00
c3bd672d58
Reorganize code in s3_put.rs 2021-04-07 10:00:47 +02:00
93a9f96130
remove useless comment 2021-04-06 23:38:30 +02:00
7380f3855c Merge pull request 'Use content defined chunking' (#43) from trinity-1686a/garage:content-defined-chunking into main
Reviewed-on: Deuxfleurs/garage#43
2021-04-06 22:18:41 +02:00
6cbc8d6ec9 mark branch as unreachable 2021-04-06 16:53:39 +02:00
c8906f200b make most requested changes 2021-04-06 05:28:47 +02:00
653d3d588f document garage crate 2021-04-06 05:26:48 +02:00
a74eccfd6e document api crate 2021-04-06 05:26:48 +02:00
ee9b685994 document web crate 2021-04-06 05:26:48 +02:00
92d54770bb attempt at documenting model crate 2021-04-06 05:26:48 +02:00
30bec0758b attempt at documenting table crate 2021-04-06 05:26:48 +02:00
b476b702c8 run cargo fmt on util and make missing doc warning 2021-04-06 05:26:47 +02:00
7d3a951836 document rpc crate 2021-04-06 05:26:36 +02:00
f4bf987627 document util crate 2021-04-06 05:25:35 +02:00
b3b0b20d72 run fmt 2021-04-06 02:54:05 +02:00
47d0aee9f8 change crate used for cdc
previous one seemed to output incorrect results
2021-04-06 02:50:07 +02:00
6e0cb2dfb6 add content defined chuking 2021-04-06 02:50:07 +02:00
c4c4b7dedc
Update documentation 2021-04-06 00:01:49 +02:00
8225fa2e4b Merge pull request 'Improved bootstraping procedure' (#56) from better_bootstrap into main
Reviewed-on: Deuxfleurs/garage#56
2021-04-05 23:50:33 +02:00
ab67bd88de
Try to fix Drone 2021-04-05 23:41:50 +02:00
0f192a96b5
small simplify 2021-04-05 23:21:25 +02:00
7b85056942
Merge discovery loop with consul 2021-04-05 23:04:08 +02:00
7fd1f9a869
cargo fmt 2021-04-05 20:42:57 +02:00
c5d8dc7d6d
Print stats 2021-04-05 20:42:46 +02:00
fa11cb746a
Cargo fmt 2021-04-05 20:35:26 +02:00
f11bd80d2a
Keep old data 2021-04-05 20:33:24 +02:00
595dc0ed0d
Persist directly and not in background 2021-04-05 20:26:01 +02:00
78eeaab5ed
Install rustfmt 2021-04-05 20:00:48 +02:00
22fbb3b892
Fix Drone CI signature 2021-04-05 19:58:42 +02:00
0eb5baea1a
Improve bootstraping: do it regularly; persist peer list 2021-04-05 19:55:53 +02:00
7d772737a5 Add signature to Drone CI 2021-04-05 12:10:28 +02:00
4c2d8f5a96 test2 2021-04-05 12:09:20 +02:00
0325086dac test 2021-04-05 12:08:05 +02:00
700925263f
Drone CI badge on branch main 2021-04-05 11:45:20 +02:00
6c83f66700 fix command for adding node 2021-04-02 23:23:14 +02:00
55e4a93bad Section on recovering from failures 2021-04-02 19:30:29 +02:00
LUXEY Adrien
9cb9945131 [doc] Added mention that GarageHQ is hosted with Garage 2021-03-22 16:19:58 +01:00
7f9c1d5595 publish website only in correct repo 2021-03-22 12:49:21 +01:00
991cf1b818 fix typos 2021-03-19 17:55:10 +01:00
e26cb2640d Merge remote-tracking branch 'origin/feature/mdbook' 2021-03-19 17:49:57 +01:00
22041e924b Merge pull request 'intro.md: fix some typos, errors & dead links, plus some stylistic stuff' (#51) from adrien/garage:feature/mdbook into feature/mdbook
Reviewed-on: Deuxfleurs/garage#51
2021-03-19 17:14:41 +01:00
LUXEY Adrien
b119e9d3c4 intro.md: fix some typos, errors & dead links, plus some stylistic stuff
modifié :         doc/book/src/intro.md
2021-03-19 16:51:05 +01:00
3e6534d7a8 Fix garage_util description 2021-03-18 11:46:29 +01:00
2dae4a25d6 Fix some typos 2021-03-18 11:35:50 +01:00
fcd566e89d Fix a table in the doc 2021-03-18 11:26:02 +01:00
61 changed files with 1385 additions and 520 deletions

View file

@ -1,12 +1,20 @@
---
kind: pipeline
name: default
workspace:
base: /drone/garage
volumes:
- name: cargo_home
temp: {}
steps:
- name: restore-cache
image: meltwater/drone-cache:dev
volumes:
- name: cargo_home
path: /drone/cargo
environment:
AWS_ACCESS_KEY_ID:
from_secret: cache_aws_access_key_id
@ -20,35 +28,41 @@ steps:
cache_key: '{{ .Repo.Name }}_{{ checksum "Cargo.lock" }}_{{ arch }}_{{ os }}_gzip'
region: garage
mount:
- '/drone/cargo'
- 'target'
- '/drone/cargo/registry/index'
- '/drone/cargo/registry/cache'
- '/drone/cargo/bin'
- '/drone/cargo/git/db'
path_style: true
endpoint: https://garage.deuxfleurs.fr
when:
branch:
- nonexistent_skip_this_step
- name: build
image: rust:buster
image: lxpz/garage_builder_amd64:1
volumes:
- name: cargo_home
path: /drone/cargo
environment:
CARGO_HOME: /drone/cargo
commands:
- apt-get update
- apt-get install --yes libsodium-dev
- pwd
- cargo fmt -- --check
- cargo build
- name: cargo-test
image: rust:buster
image: lxpz/garage_builder_amd64:1
volumes:
- name: cargo_home
path: /drone/cargo
environment:
CARGO_HOME: /drone/cargo
commands:
- apt-get update
- apt-get install --yes libsodium-dev
- cargo test
- name: rebuild-cache
image: meltwater/drone-cache:dev
volumes:
- name: cargo_home
path: /drone/cargo
environment:
AWS_ACCESS_KEY_ID:
from_secret: cache_aws_access_key_id
@ -62,22 +76,22 @@ steps:
cache_key: '{{ .Repo.Name }}_{{ checksum "Cargo.lock" }}_{{ arch }}_{{ os }}_gzip'
region: garage
mount:
- '/drone/cargo'
- 'target'
- '/drone/cargo/registry/index'
- '/drone/cargo/registry/cache'
- '/drone/cargo/git/db'
- '/drone/cargo/bin'
path_style: true
endpoint: https://garage.deuxfleurs.fr
when:
branch:
- nonexistent_skip_this_step
- name: smoke-test
image: rust:buster
image: lxpz/garage_builder_amd64:1
volumes:
- name: cargo_home
path: /drone/cargo
environment:
CARGO_HOME: /drone/cargo
commands:
- apt-get update
- apt-get install --yes libsodium-dev awscli python-pip
- pip install s3cmd
- ./script/test-smoke.sh || (cat /tmp/garage.log; false)
---
@ -106,6 +120,15 @@ steps:
endpoint: https://garage.deuxfleurs.fr
region: garage
when:
event:
- push
branch:
- main
repo:
- Deuxfleurs/garage
---
kind: signature
hmac: de82026387bd09e547dbc9cc5d232fd865204b4f393d32508c50b58f8e60611d
...

130
Cargo.lock generated
View file

@ -95,6 +95,9 @@ name = "cc"
version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd"
dependencies = [
"jobserver",
]
[[package]]
name = "cfg-if"
@ -221,6 +224,12 @@ dependencies = [
"synstructure",
]
[[package]]
name = "fastcdc"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5afa29be46b12c8c380b997def8d1ac77c2665da93eb0a768fab0bf4db79333f"
[[package]]
name = "fnv"
version = "1.0.7"
@ -365,7 +374,7 @@ dependencies = [
"hex",
"log",
"pretty_env_logger",
"rand",
"rand 0.8.3",
"rmp-serde",
"serde",
"sled",
@ -383,6 +392,7 @@ dependencies = [
"chrono",
"crypto-mac 0.10.0",
"err-derive",
"fastcdc",
"futures",
"futures-util",
"garage_model",
@ -397,6 +407,7 @@ dependencies = [
"log",
"md-5",
"percent-encoding",
"rand 0.7.3",
"roxmltree",
"sha2",
"tokio",
@ -415,12 +426,13 @@ dependencies = [
"garage_util",
"hex",
"log",
"rand",
"rand 0.8.3",
"rmp-serde",
"serde",
"serde_bytes",
"sled",
"tokio",
"zstd",
]
[[package]]
@ -459,7 +471,7 @@ dependencies = [
"garage_util",
"hexdump",
"log",
"rand",
"rand 0.8.3",
"rmp-serde",
"serde",
"serde_bytes",
@ -479,7 +491,7 @@ dependencies = [
"http",
"hyper",
"log",
"rand",
"rand 0.8.3",
"rmp-serde",
"rustls",
"serde",
@ -529,6 +541,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "getrandom"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce"
dependencies = [
"cfg-if",
"libc",
"wasi 0.9.0+wasi-snapshot-preview1",
]
[[package]]
name = "getrandom"
version = "0.2.2"
@ -537,7 +560,7 @@ checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8"
dependencies = [
"cfg-if",
"libc",
"wasi",
"wasi 0.10.2+wasi-snapshot-preview1",
]
[[package]]
@ -760,6 +783,15 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
[[package]]
name = "jobserver"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.49"
@ -1043,6 +1075,19 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [
"getrandom 0.1.16",
"libc",
"rand_chacha 0.2.2",
"rand_core 0.5.1",
"rand_hc 0.2.0",
]
[[package]]
name = "rand"
version = "0.8.3"
@ -1050,9 +1095,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
"rand_chacha 0.3.0",
"rand_core 0.6.2",
"rand_hc 0.3.0",
]
[[package]]
name = "rand_chacha"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core 0.5.1",
]
[[package]]
@ -1062,7 +1117,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d"
dependencies = [
"ppv-lite86",
"rand_core",
"rand_core 0.6.2",
]
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom 0.1.16",
]
[[package]]
@ -1071,7 +1135,16 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7"
dependencies = [
"getrandom",
"getrandom 0.2.2",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core 0.5.1",
]
[[package]]
@ -1080,7 +1153,7 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73"
dependencies = [
"rand_core",
"rand_core 0.6.2",
]
[[package]]
@ -1581,6 +1654,12 @@ dependencies = [
"try-lock",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
@ -1703,3 +1782,32 @@ name = "xxhash-rust"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575e15bedf6e57b5c2d763ffc6c3c760143466cbd09d762d539680ab5992ded"
[[package]]
name = "zstd"
version = "0.6.1+zstd.1.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de55e77f798f205d8561b8fe2ef57abfb6e0ff2abe7fd3c089e119cdb5631a3"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "3.0.1+zstd.1.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1387cabcd938127b30ce78c4bf00b30387dddf704e3f0881dbc4ff62b5566f8c"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "1.4.20+zstd.1.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebd5b733d7cf2d9447e2c3e76a5589b4f5e5ae065c22a2bc0b023cbc331b6c8e"
dependencies = [
"cc",
"libc",
]

View file

@ -2,7 +2,7 @@ BIN=target/release/garage
DOCKER=lxpz/garage_amd64
all:
clear; RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --no-default-features
clear; cargo build
$(BIN):
RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --release --no-default-features

View file

@ -1,4 +1,4 @@
Garage [![Build Status](https://drone.deuxfleurs.fr/api/badges/Deuxfleurs/garage/status.svg)](https://drone.deuxfleurs.fr/Deuxfleurs/garage)
Garage [![Build Status](https://drone.deuxfleurs.fr/api/badges/Deuxfleurs/garage/status.svg?ref=refs/heads/main)](https://drone.deuxfleurs.fr/Deuxfleurs/garage)
===
<p align="center" style="text-align:center;">

View file

@ -14,6 +14,7 @@
- [Host a website](./cookbook/website.md)
- [Integrate as a media backend]()
- [Operate a cluster]()
- [Recovering from failures](./cookbook/recovering.md)
- [Reference Manual](./reference_manual/index.md)
- [Garage CLI]()

View file

@ -2,4 +2,4 @@
A cookbook, when you cook, is a collection of recipes.
Similarly, Garage's cookbook contains a collection of recipes that are known to works well!
This chapter could also be referred as "Tutorials" or "Best practises".
This chapter could also be referred as "Tutorials" or "Best practices".

View file

@ -0,0 +1,99 @@
# Recovering from failures
Garage is meant to work on old, second-hand hardware.
In particular, this makes it likely that some of your drives will fail, and some manual intervention will be needed.
Fear not! For Garage is fully equipped to handle drive failures, in most common cases.
## A note on availability of Garage
With nodes dispersed in 3 datacenters or more, here are the guarantees Garage provides with the default replication strategy (3 copies of all data, which is the recommended value):
- The cluster remains fully functional as long as the machines that fail are in only one datacenter. This includes a whole datacenter going down due to power/Internet outage.
- No data is lost as long as the machines that fail are in at most two datacenters.
Of course this only works if your Garage nodes are correctly configured to be aware of the datacenter in which they are located.
Make sure this is the case using `garage status` to check on the state of your cluster's configuration.
## First option: removing a node
If you don't have spare parts (HDD, SDD) to replace the failed component, and if there are enough remaining nodes in your cluster
(at least 3), you can simply remove the failed node from Garage's configuration.
Note that if you **do** intend to replace the failed parts by new ones, using this method followed by adding back the node is **not recommended** (although it should work),
and you should instead use one of the methods detailed in the next sections.
Removing a node is done with the following command:
```
garage node remove --yes <node_id>
```
(you can get the `node_id` of the failed node by running `garage status`)
This will repartition the data and ensure that 3 copies of everything are present on the nodes that remain available.
## Replacement scenario 1: only data is lost, metadata is fine
The recommended deployment for Garage uses an SSD to store metadata, and an HDD to store blocks of data.
In the case where only a single HDD crashes, the blocks of data are lost but the metadata is still fine.
This is very easy to recover by setting up a new HDD to replace the failed one.
The node does not need to be fully replaced and the configuration doesn't need to change.
We just need to tell Garage to get back all the data blocks and store them on the new HDD.
First, set up a new HDD to store Garage's data directory on the failed node, and restart Garage using
the existing configuration. Then, run:
```
garage repair -a --yes blocks
```
This will re-synchronize blocks of data that are missing to the new HDD, reading them from copies located on other nodes.
You can check on the advancement of this process by doing the following command:
```
garage stats -a
```
Look out for the following output:
```
Block manager stats:
resync queue length: 26541
```
This indicates that one of the Garage node is in the process of retrieving missing data from other nodes.
This number decreases to zero when the node is fully synchronized.
## Replacement scenario 2: metadata (and possibly data) is lost
This scenario covers the case where a full node fails, i.e. both the metadata directory and
the data directory are lost, as well as the case where only the metadata directory is lost.
To replace the lost node, we will start from an empty metadata directory, which means
Garage will generate a new node ID for the replacement node.
We will thus need to remove the previous node ID from Garage's configuration and replace it by the ID of the new node.
If your data directory is stored on a separate drive and is still fine, you can keep it, but it is not necessary to do so.
In all cases, the data will be rebalanced and the replacement node will not store the same pieces of data
as were originally stored on the one that failed. So if you keep the data files, the rebalancing
might be faster but most of the pieces will be deleted anyway from the disk and replaced by other ones.
First, set up a new drive to store the metadata directory for the replacement node (a SSD is recommended),
and for the data directory if necessary. You can then start Garage on the new node.
The restarted node should generate a new node ID, and it should be shown as `NOT CONFIGURED` in `garage status`.
The ID of the lost node should be shown in `garage status` in the section for disconnected/unavailable nodes.
Then, replace the broken node by the new one, using:
```
garage node configure --replace <old_node_id> \
-c <capacity> -d <datacenter> -t <node_tag> <new_node_id>
```
Garage will then start synchronizing all required data on the new node.
This process can be monitored using the `garage stats -a` command.

View file

@ -7,14 +7,14 @@ We did not test other architecture/operating system but, as long as your archite
## From Docker
Our docker image is currently named `lxpz/garage_amd64` and is stored on the [Docker Hub](https://hub.docker.com/r/lxpz/garage_amd64/tags?page=1&ordering=last_updated).
We encourage you to use a fixed tag (eg. `v0.1.1d`) and not the `latest` tag.
For this example, we will use the latest published version at the time of the writing which is `v0.1.1d` but it's up to you
We encourage you to use a fixed tag (eg. `v0.2.1`) and not the `latest` tag.
For this example, we will use the latest published version at the time of the writing which is `v0.2.1` but it's up to you
to check [the most recent versions on the Docker Hub](https://hub.docker.com/r/lxpz/garage_amd64/tags?page=1&ordering=last_updated).
For example:
```
sudo docker pull lxpz/garage_amd64:v0.1.1d
sudo docker pull lxpz/garage_amd64:v0.2.1
```
## From source

View file

@ -40,22 +40,18 @@ garagectl key new --name nextcloud-app-key
You will have the following output (this one is fake, `key_id` and `secret_key` were generated with the openssl CLI tool):
```javascript
Key {
key_id: "GK3515373e4c851ebaad366558",
secret_key: "7d37d093435a41f2aab8f13c19ba067d9776c90215f56614adad6ece597dbb34",
name: "nextcloud-app-key",
name_timestamp: 1603280506694,
deleted: false,
authorized_buckets: []
}
```
Key name: nextcloud-app-key
Key ID: GK3515373e4c851ebaad366558
Secret key: 7d37d093435a41f2aab8f13c19ba067d9776c90215f56614adad6ece597dbb34
Authorized buckets:
```
Check that everything works as intended (be careful, info works only with your key identifier and not with its friendly name!):
Check that everything works as intended:
```
garagectl key list
garagectl key info GK3515373e4c851ebaad366558
garagectl key info nextcloud-app-key
```
## Allow a key to access a bucket
@ -67,7 +63,7 @@ garagectl bucket allow \
--read \
--write
nextcloud-bucket \
--key GK3515373e4c851ebaad366558
--key nextcloud-app-key
```
You can check at any times allowed keys on your bucket with:

View file

@ -11,20 +11,20 @@ As this part is not relevant for a test cluster, you can use this one-liner to c
```bash
garagectl status | grep UNCONFIGURED | grep -Po '^[0-9a-f]+' | while read id; do
garagectl node configure -d dc1 -n 10 $id
garagectl node configure -d dc1 -c 1 $id
done
```
## Real-world cluster
For our example, we will suppose we have the following infrastructure (Tokens, Identifier and Datacenter are specific values to garage described in the following):
For our example, we will suppose we have the following infrastructure (Capacity, Identifier and Datacenter are specific values to garage described in the following):
| Location | Name | Disk Space | `Tokens` | `Identifier` | `Datacenter` |
|----------|---------|------------|----------|--------------|--------------|
| Paris | Mercury | 1 To | `100` | `8781c5` | `par1` |
| Paris | Venus | 2 To | `200` | `2a638e` | `par1` |
| London | Earth | 2 To | `200` | `68143d` | `lon1` |
| Brussels | Mars | 1.5 To | `150` | `212f75` | `bru1` |
| Location | Name | Disk Space | `Capacity` | `Identifier` | `Datacenter` |
|----------|---------|------------|------------|--------------|--------------|
| Paris | Mercury | 1 To | `2` | `8781c5` | `par1` |
| Paris | Venus | 2 To | `4` | `2a638e` | `par1` |
| London | Earth | 2 To | `4` | `68143d` | `lon1` |
| Brussels | Mars | 1.5 To | `3` | `212f75` | `bru1` |
### Identifier
@ -45,14 +45,15 @@ garagectl status
It will display the IP address associated with each node; from the IP address you will be able to recognize the node.
### Tokens
### Capacity
Garage reasons on an arbitrary metric about disk storage that is named "tokens".
The number of tokens must be proportional to the disk space dedicated to the node.
Additionaly, ideally the number of tokens must be in the order of magnitude of 100
to provide a good trade-off between data load balancing and performances (*this sentence must be verified, it may be wrong*).
Garage reasons on an arbitrary metric about disk storage that is named the *capacity* of a node.
The capacity configured in Garage must be proportional to the disk space dedicated to the node.
Additionaly, the capacity values used in Garage should be as small as possible, with
1 ideally representing the size of your smallest server.
Here we chose 1 token = 10 Go but you are free to select the value that best fit your needs.
Here we chose that 1 unit of capacity = 0.5 To, so that we can express servers of size
1 To and 2 To, as wel as the intermediate size 1.5 To.
### Datacenter
@ -65,8 +66,8 @@ Behind the scene, garage will try to store the same data on different sites to p
Given the information above, we will configure our cluster as follow:
```
garagectl node configure --datacenter par1 -n 100 -t mercury 8781c5
garagectl node configure --datacenter par1 -n 200 -t venus 2a638e
garagectl node configure --datacenter lon1 -n 200 -t earth 68143d
garagectl node configure --datacenter bru1 -n 150 -t mars 212f75
garagectl node configure --datacenter par1 -c 2 -t mercury 8781c5
garagectl node configure --datacenter par1 -c 4 -t venus 2a638e
garagectl node configure --datacenter lon1 -c 4 -t earth 68143d
garagectl node configure --datacenter bru1 -c 3 -t mars 212f75
```

View file

@ -124,8 +124,8 @@ For our example, we will suppose the following infrastructure:
|----------|---------|------------|------------|
| Paris | Mercury | fc00:1::1 | 1 To |
| Paris | Venus | fc00:1::2 | 2 To |
| London | Earth | fc00:1::2 | 2 To |
| Brussels | Mars | fc00:B::1 | 1.5 To |
| London | Earth | fc00:B::1 | 2 To |
| Brussels | Mars | fc00:F::1 | 1.5 To |
On each machine, we will have a similar setup, especially you must consider the following folders/files:
- `/etc/garage/pki`: Garage certificates, must be generated on your computer and copied on the servers

View file

@ -4,63 +4,68 @@
</a>
</p>
```
This very website is hosted using Garage. In other words: the doc is the PoC!
```
# The Garage Geo-Distributed Data Store
Garage is a lightweight geo-distributed data store.
It comes from the observation that despite numerous object stores
many people have broken data management policies (backup/replication on a single site or none at all).
To promote better data management policies, with focused on the following desirable properties:
To promote better data management policies, we focused on the following **desirable properties**:
- **Self-contained & lightweight**: works everywhere and integrates well in existing environments to target hyperconverged infrastructures
- **Highly resilient**: highly resilient to network failures, network latency, disk failures, sysadmin failures
- **Simple**: simple to understand, simple to operate, simple to debug
- **Internet enabled**: made for multi-sites (eg. datacenter, offices, etc.) interconnected through a regular internet connection.
- **Self-contained & lightweight**: works everywhere and integrates well in existing environments to target [hyperconverged infrastructures](https://en.wikipedia.org/wiki/Hyper-converged_infrastructure).
- **Highly resilient**: highly resilient to network failures, network latency, disk failures, sysadmin failures.
- **Simple**: simple to understand, simple to operate, simple to debug.
- **Internet enabled**: made for multi-sites (eg. datacenters, offices, households, etc.) interconnected through regular Internet connections.
We also noted that the pursuit of some other goals are detrimental to our initial goals.
The following have been identified has non-goals, if it matters to you, you should not use Garage:
The following has been identified as **non-goals** (if these points matter to you, you should not use Garage):
- **Extreme performances**: high performances constrain a lot the design and the infrastructure; we seek performances through minimalism only.
- **Feature extensiveness**: complete implementation of the S3 API or any other API to make garage a drop-in replacement is not targeted as it could lead to decisions impacting our desirable properties.
- **Storage optimizations**: erasure coding or any other coding technique both increase the difficulty of placing data and synchronizing; we limit ourselves to duplication.
- **POSIX/Filesystem compatibility**: we do not aim at being POSIX compatible or to emulate any kind of filesystem. Indeed, in a distributed environment, such syncronizations are translated in network messages that impose severe constraints on the deployment.
- **POSIX/Filesystem compatibility**: we do not aim at being POSIX compatible or to emulate any kind of filesystem. Indeed, in a distributed environment, such synchronizations are translated in network messages that impose severe constraints on the deployment.
## Supported and planned protocols
Garage speaks (or will speak) the following protocols:
- [S3](https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html) - *SUPPORTED* - Enable applications to store large blobs such as pictures, video, images, documents, etc. S3 is versatile enough to also be used to publish a static website.
- [IMAP](https://github.com/go-pluto/pluto) - *PLANNED* - email storage is quite complex to get good oerformances.
To keep performances optimals, most imap servers only support on-disk storage.
- [IMAP](https://github.com/go-pluto/pluto) - *PLANNED* - email storage is quite complex to get good performances.
To keep performances optimal, most IMAP servers only support on-disk storage.
We plan to add logic to Garage to make it a viable solution for email storage.
- *More to come*
## Use Cases
**[Deuxfleurs](https://deuxfleurs.fr):** Garage is used by Deuxfleurs which is a non-profit hosting organization.
Especially, it is used to host their main website, this documentation and some of its members's blogs. Additionally,
Garage is used as a [backend for Nextcloud](https://docs.nextcloud.com/server/20/admin_manual/configuration_files/primary_storage.html). Deuxfleurs also plans to use Garage as their [Matrix's media backend](https://github.com/matrix-org/synapse-s3-storage-provider) and has the backend of [OCIS](https://github.com/owncloud/ocis).
Especially, it is used to host their main website, this documentation and some of its members' blogs.
Additionally, Garage is used as a [backend for Nextcloud](https://docs.nextcloud.com/server/20/admin_manual/configuration_files/primary_storage.html).
Deuxfleurs also plans to use Garage as their [Matrix's media backend](https://github.com/matrix-org/synapse-s3-storage-provider) and as the backend of [OCIS](https://github.com/owncloud/ocis).
*Are you using Garage? [Open a pull request](https://git.deuxfleurs.fr/Deuxfleurs/garage/) to add your organization here!*
## Comparison to existing software
**[Minio](https://min.io/) :** Minio shares our *self-contained & lightweight* goal but selected two of our non-goals: *storage optimizations* through erasure coding and *POSIX/Filesystem compatibility* through strong consistency.
However, by pursuing these two non-goals, minio do not reach our desirable properties.
First, it fails on the *simple* property: due to the erasure coding, minio has severe limitations on how drives can be added or deleted from a cluster.
Second, it fails on the *interned enabled* property: due to its strong consistency, minio is latency sensitive.
Furthermore, minio has no knowledge of "sites" and thus can not distribute data to minimize the failure of a given site.
**[MinIO](https://min.io/):** MinIO shares our *Self-contained & lightweight* goal but selected two of our non-goals: *Storage optimizations* through erasure coding and *POSIX/Filesystem compatibility* through strong consistency.
However, by pursuing these two non-goals, MinIO do not reach our desirable properties.
Firstly, it fails on the *Simple* property: due to the erasure coding, MinIO has severe limitations on how drives can be added or deleted from a cluster.
Secondly, it fails on the *Internet enabled* property: due to its strong consistency, MinIO is latency sensitive.
Furthermore, MinIO has no knowledge of "sites" and thus can not distribute data to minimize the failure of a given site.
**[Openstack Swift](https://docs.openstack.org/swift/latest/):**
OpenStack Swift at least fails on the *self-contained & lightweight* goal.
Starting it requires around 8Gb of RAM, which is too much especially in an hyperconverged infrastructure.
It seems also to be far from *Simple*.
OpenStack Swift at least fails on the *Self-contained & lightweight* goal.
Starting it requires around 8GB of RAM, which is too much especially in an hyperconverged infrastructure.
We also do not classify Swift as *Simple*.
**[Ceph](https://ceph.io/ceph-storage/object-storage/):**
This review holds for the whole Ceph stack, including the RADOS paper, Ceph Object Storage module, the RADOS Gateway, etc.
At is core, Ceph has been designed to provide *POSIX/Filesystem compatibility* which requires strong consistency, which in turn
makes Ceph latency sensitive and fails our *Internet enabled* goal.
Due to its industry oriented design, Ceph is also far from being *Simple* to operate and from being *self-contained & lightweight* which makes it hard to integrate it in an hyperconverged infrastructure.
In a certain way, Ceph and Minio are closer togethers than they are from Garage or OpenStack Swift.
At its core, Ceph has been designed to provide *POSIX/Filesystem compatibility* which requires strong consistency, which in turn
makes Ceph latency-sensitive and fails our *Internet enabled* goal.
Due to its industry oriented design, Ceph is also far from being *Simple* to operate and from being *Self-contained & lightweight* which makes it hard to integrate it in an hyperconverged infrastructure.
In a certain way, Ceph and MinIO are closer together than they are from Garage or OpenStack Swift.
*More comparisons are available in our [Related Work](design/related_work.md) chapter.*
@ -71,29 +76,29 @@ We reference here other places on the Internet where you can learn more about Ga
### Rust API (docs.rs)
If you encounter a specific bug in Garage or plan to patch it, you may jump directly to the source code documentation!
If you encounter a specific bug in Garage or plan to patch it, you may jump directly to the source code's documentation!
- [garage\_api](https://docs.rs/garage_api/latest/garage_api/) - contains the S3 standard API endpoint
- [garage\_model](https://docs.rs/garage_model/latest/garage_model/) - contains Garage's model built on the table abstraction
- [garage\_rpc](https://docs.rs/garage_rpc/latest/garage_rpc/) - contains Garage's federation protocol
- [garage\_table](https://docs.rs/garage_table/latest/garage_table/) - contains core Garage's CRDT datatypes
- [garage\_util](https://docs.rs/garage_util/latest/garage_util/) - contains garage entrypoints (daemon, cli)
- [garage\_util](https://docs.rs/garage_util/latest/garage_util/) - contains garage helpers
- [garage\_web](https://docs.rs/garage_web/latest/garage_web/) - contains the S3 website endpoint
### Talks
We love to talk and hear about Garage, that's why we keep a log here:
- [(fr, 2020-12-02) Garage : jouer dans la cour des grands quand on est un hébergeur associatif](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/master/doc/20201202_talk/talk.pdf)
- [(fr, 2020-12-02) Garage : jouer dans la cour des grands quand on est un hébergeur associatif](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/main/doc/20201202_talk/talk.pdf)
*Did you write or talk about Garage? [Open a pull request](https://git.deuxfleurs.fr/Deuxfleurs/garage/) to add a link here!*
## Community
If you want to discuss with us, you can join our Matrix channel at [#garage:deuxfleurs.fr](https://matrix.to/#/#garage:deuxfleurs.fr).
Our code and our issue tracker, which is the place where you should report bugs, are managed on [Deuxfleurs' Gitea](https://git.deuxfleurs.fr/Deuxfleurs/garage).
Our code repository and issue tracker, which is the place where you should report bugs, is managed on [Deuxfleurs' Gitea](https://git.deuxfleurs.fr/Deuxfleurs/garage).
## License
Garage, all the source code, is released under the [AGPL v3 License](https://www.gnu.org/licenses/agpl-3.0.en.html).
Garage's source code, is released under the [AGPL v3 License](https://www.gnu.org/licenses/agpl-3.0.en.html).
Please note that if you patch Garage and then use it to provide any service over a network, you must share your code!

View file

@ -0,0 +1,6 @@
FROM rust:buster
RUN apt-get update && \
apt-get install --yes libsodium-dev awscli python-pip && \
rm -rf /var/lib/apt/lists/*
RUN rustup component add rustfmt
RUN pip install s3cmd

View file

@ -0,0 +1,8 @@
DOCKER=lxpz/garage_builder_amd64
docker:
docker build -t $(DOCKER):$(TAG) .
docker push $(DOCKER):$(TAG)
docker tag $(DOCKER):$(TAG) $(DOCKER):latest
docker push $(DOCKER):latest

View file

@ -35,6 +35,9 @@ data_replication_factor = 3
meta_replication_factor = 3
meta_epidemic_fanout = 3
enable_compression = true
compressin_level = 10
[s3_api]
api_bind_addr = "0.0.0.0:$((3910+$count))" # the S3 API port, HTTP without TLS. Add a reverse proxy for the TLS part.
s3_region = "garage" # set this to anything. S3 API calls will fail if they are not made against the region set here.

View file

@ -23,39 +23,43 @@ dd if=/dev/urandom of=/tmp/garage.1.rnd bs=1k count=2 # < INLINE_THRESHOLD = 307
dd if=/dev/urandom of=/tmp/garage.2.rnd bs=1M count=5
dd if=/dev/urandom of=/tmp/garage.3.rnd bs=1M count=10
dd if=/dev/urandom bs=1k count=2 | base64 -w0 > /tmp/garage.1.b64
dd if=/dev/urandom bs=1M count=5 | base64 -w0 > /tmp/garage.2.b64
dd if=/dev/urandom bs=1M count=10 | base64 -w0 > /tmp/garage.3.b64
echo "s3 api testing..."
for idx in $(seq 1 3); do
for idx in {1,2,3}.{rnd,b64}; do
# AWS sends
awsgrg cp /tmp/garage.$idx.rnd s3://eprouvette/garage.$idx.aws
awsgrg cp /tmp/garage.$idx s3://eprouvette/garage.$idx.aws
awsgrg ls s3://eprouvette
awsgrg cp s3://eprouvette/garage.$idx.aws /tmp/garage.$idx.dl
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl
diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl
s3grg get s3://eprouvette/garage.$idx.aws /tmp/garage.$idx.dl
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl
diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl
awsgrg rm s3://eprouvette/garage.$idx.aws
# S3CMD sends
s3grg put /tmp/garage.$idx.rnd s3://eprouvette/garage.$idx.s3cmd
s3grg put /tmp/garage.$idx s3://eprouvette/garage.$idx.s3cmd
s3grg ls s3://eprouvette
s3grg get s3://eprouvette/garage.$idx.s3cmd /tmp/garage.$idx.dl
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl
diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl
awsgrg cp s3://eprouvette/garage.$idx.s3cmd /tmp/garage.$idx.dl
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl
diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl
s3grg rm s3://eprouvette/garage.$idx.s3cmd
done
rm /tmp/garage.{1,2,3}.rnd
rm /tmp/garage.{1,2,3}.{rnd,b64}
echo "website testing"
echo "<h1>hello world</h1>" > /tmp/garage-index.html

View file

@ -22,10 +22,12 @@ bytes = "1.0"
chrono = "0.4"
crypto-mac = "0.10"
err-derive = "0.3"
fastcdc = "1.0.5"
hex = "0.4"
hmac = "0.10"
log = "0.4"
md-5 = "0.9"
rand = "0.7"
sha2 = "0.9"
futures = "0.3"

View file

@ -20,6 +20,7 @@ use crate::s3_get::*;
use crate::s3_list::*;
use crate::s3_put::*;
/// Run the S3 API server
pub async fn run_api_server(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,

View file

@ -1,9 +1,13 @@
//! Module containing various helpers for encoding
/// Escape &str for xml inclusion
pub fn xml_escape(s: &str) -> String {
s.replace("<", "&lt;")
.replace(">", "&gt;")
.replace("\"", "&quot;")
}
/// Encode &str for use in a URI
pub fn uri_encode(string: &str, encode_slash: bool) -> String {
let mut result = String::with_capacity(string.len() * 2);
for c in string.chars() {
@ -24,6 +28,7 @@ pub fn uri_encode(string: &str, encode_slash: bool) -> String {
result
}
/// Encode &str either as an uri, or a valid string for xml inclusion
pub fn xml_encode_key(k: &str, urlencode: bool) -> String {
if urlencode {
uri_encode(k, true)

View file

@ -3,44 +3,57 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
// Category: internal error
/// Error related to deeper parts of Garage
#[error(display = "Internal error: {}", _0)]
InternalError(#[error(source)] GarageError),
/// Error related to Hyper
#[error(display = "Internal error (Hyper error): {}", _0)]
Hyper(#[error(source)] hyper::Error),
/// Error related to HTTP
#[error(display = "Internal error (HTTP error): {}", _0)]
HTTP(#[error(source)] http::Error),
// Category: cannot process
/// No proper api key was used, or the signature was invalid
#[error(display = "Forbidden: {}", _0)]
Forbidden(String),
/// The object requested don't exists
#[error(display = "Not found")]
NotFound,
// Category: bad request
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8Str(#[error(source)] std::str::Utf8Error),
/// The request used an invalid path
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8String(#[error(source)] std::string::FromUtf8Error),
/// Some base64 encoded data was badly encoded
#[error(display = "Invalid base64: {}", _0)]
InvalidBase64(#[error(source)] base64::DecodeError),
/// The client sent invalid XML data
#[error(display = "Invalid XML: {}", _0)]
InvalidXML(String),
/// The client sent a header with invalid value
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
/// The client sent a range header with invalid value
#[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] http_range::HttpRangeParseError),
/// The client sent an invalid request
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
}
@ -52,6 +65,7 @@ impl From<roxmltree::Error> for Error {
}
impl Error {
/// Get the HTTP status code that best represents the meaning of the error for the client
pub fn http_status_code(&self) -> StatusCode {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
@ -65,6 +79,7 @@ impl Error {
}
}
/// Trait to map error to the Bad Request error code
pub trait OkOrBadRequest {
type S2;
fn ok_or_bad_request(self, reason: &'static str) -> Self::S2;
@ -93,6 +108,7 @@ impl<T> OkOrBadRequest for Option<T> {
}
}
/// Trait to map an error to an Internal Error code
pub trait OkOrInternalError {
type S2;
fn ok_or_internal_error(self, reason: &'static str) -> Self::S2;

View file

@ -1,15 +1,19 @@
//! Crate for serving a S3 compatible API
#[macro_use]
extern crate log;
pub mod error;
mod error;
pub use error::Error;
pub mod encoding;
mod encoding;
pub mod api_server;
pub mod signature;
mod api_server;
pub use api_server::run_api_server;
pub mod s3_copy;
pub mod s3_delete;
mod signature;
mod s3_copy;
mod s3_delete;
pub mod s3_get;
pub mod s3_list;
pub mod s3_put;
mod s3_list;
mod s3_put;

View file

@ -1,3 +1,4 @@
//! Function related to GET and HEAD requests
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
@ -79,6 +80,7 @@ fn try_answer_cached(
}
}
/// Handle HEAD request
pub async fn handle_head(
garage: Arc<Garage>,
req: &Request<Body>,
@ -118,6 +120,7 @@ pub async fn handle_head(
Ok(response)
}
/// Handle GET request
pub async fn handle_get(
garage: Arc<Garage>,
req: &Request<Body>,
@ -224,7 +227,7 @@ pub async fn handle_get(
}
}
pub async fn handle_get_range(
async fn handle_get_range(
garage: Arc<Garage>,
version: &ObjectVersion,
version_data: &ObjectVersionData,

View file

@ -2,6 +2,7 @@ use std::collections::{BTreeMap, VecDeque};
use std::fmt::Write;
use std::sync::Arc;
use fastcdc::{Chunk, FastCDC};
use futures::stream::*;
use hyper::{Body, Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
@ -22,6 +23,8 @@ use crate::encoding::*;
use crate::error::*;
use crate::signature::verify_signed_content;
// ---- PutObject call ----
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
@ -150,159 +153,6 @@ pub async fn handle_put(
Ok(put_response(version_uuid, md5sum_hex))
}
/// Validate MD5 sum against content-md5 header
/// and sha256sum against signed content-sha256
fn ensure_checksum_matches(
data_md5sum: &[u8],
data_sha256sum: garage_util::data::FixedBytes32,
content_md5: Option<&str>,
content_sha256: Option<garage_util::data::FixedBytes32>,
) -> Result<(), Error> {
if let Some(expected_sha256) = content_sha256 {
if expected_sha256 != data_sha256sum {
return Err(Error::BadRequest(format!(
"Unable to validate x-amz-content-sha256"
)));
} else {
trace!("Successfully validated x-amz-content-sha256");
}
}
if let Some(expected_md5) = content_md5 {
if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
return Err(Error::BadRequest(format!("Unable to validate content-md5")));
} else {
trace!("Successfully validated content-md5");
}
}
Ok(())
}
async fn read_and_put_blocks(
garage: &Garage,
version: &Version,
part_number: u64,
first_block: Vec<u8>,
first_block_hash: Hash,
chunker: &mut BodyChunker,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
let mut md5hasher = Md5::new();
let mut sha256hasher = Sha256::new();
md5hasher.update(&first_block[..]);
sha256hasher.update(&first_block[..]);
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
&garage,
&version,
part_number,
0,
first_block_hash,
first_block.len() as u64,
);
let mut put_curr_block = garage
.block_manager
.rpc_put_block(first_block_hash, first_block);
loop {
let (_, _, next_block) =
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
sha256hasher.update(&block[..]);
let block_hash = blake2sum(&block[..]);
let block_len = block.len();
put_curr_version_block = put_block_meta(
&garage,
&version,
part_number,
next_offset as u64,
block_hash,
block_len as u64,
);
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
next_offset += block_len;
} else {
break;
}
}
let total_size = next_offset as u64;
let data_md5sum = md5hasher.finalize();
let data_sha256sum = sha256hasher.finalize();
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
Ok((total_size, data_md5sum, data_sha256sum))
}
async fn put_block_meta(
garage: &Garage,
version: &Version,
part_number: u64,
offset: u64,
hash: Hash,
size: u64,
) -> Result<(), GarageError> {
let mut version = version.clone();
version.blocks.put(
VersionBlockKey {
part_number,
offset,
},
VersionBlock { hash, size },
);
let block_ref = BlockRef {
block: hash,
version: version.uuid,
deleted: false.into(),
};
futures::try_join!(
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
)?;
Ok(())
}
struct BodyChunker {
body: Body,
read_all: bool,
block_size: usize,
buf: VecDeque<u8>,
}
impl BodyChunker {
fn new(body: Body, block_size: usize) -> Self {
Self {
body,
read_all: false,
block_size,
buf: VecDeque::with_capacity(2 * block_size),
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
while !self.read_all && self.buf.len() < self.block_size {
if let Some(block) = self.body.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
self.buf.extend(&bytes[..]);
} else {
self.read_all = true;
}
}
if self.buf.len() == 0 {
Ok(None)
} else if self.buf.len() <= self.block_size {
let block = self.buf.drain(..).collect::<Vec<u8>>();
Ok(Some(block))
} else {
let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>();
Ok(Some(block))
}
}
}
pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> {
Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid))
@ -311,6 +161,8 @@ pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> {
.unwrap()
}
// ---- Mutlipart upload calls ----
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
req: &Request<Body>,
@ -575,59 +427,7 @@ pub async fn handle_abort_multipart_upload(
Ok(Response::new(Body::from(vec![])))
}
fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
Ok(req
.headers()
.get(hyper::header::CONTENT_TYPE)
.map(|x| x.to_str())
.unwrap_or(Ok("blob"))?
.to_string())
}
pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let content_type = get_mime_type(req)?;
let mut other = BTreeMap::new();
// Preserve standard headers
let standard_header = vec![
hyper::header::CACHE_CONTROL,
hyper::header::CONTENT_DISPOSITION,
hyper::header::CONTENT_ENCODING,
hyper::header::CONTENT_LANGUAGE,
hyper::header::EXPIRES,
];
for h in standard_header.iter() {
if let Some(v) = req.headers().get(h) {
match v.to_str() {
Ok(v_str) => {
other.insert(h.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", h, e);
}
}
}
}
// Preserve x-amz-meta- headers
for (k, v) in req.headers().iter() {
if k.as_str().starts_with("x-amz-meta-") {
match v.to_str() {
Ok(v_str) => {
other.insert(k.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", k, e);
}
}
}
}
Ok(ObjectVersionHeaders {
content_type,
other,
})
}
// ---- Parsing input to multipart upload calls ----
fn decode_upload_id(id: &str) -> Result<UUID, Error> {
let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?;
@ -674,3 +474,224 @@ fn parse_complete_multpart_upload_body(
Some(parts)
}
// ---- Common code ----
pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let content_type = req
.headers()
.get(hyper::header::CONTENT_TYPE)
.map(|x| x.to_str())
.unwrap_or(Ok("blob"))?
.to_string();
let mut other = BTreeMap::new();
// Preserve standard headers
let standard_header = vec![
hyper::header::CACHE_CONTROL,
hyper::header::CONTENT_DISPOSITION,
hyper::header::CONTENT_ENCODING,
hyper::header::CONTENT_LANGUAGE,
hyper::header::EXPIRES,
];
for h in standard_header.iter() {
if let Some(v) = req.headers().get(h) {
match v.to_str() {
Ok(v_str) => {
other.insert(h.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", h, e);
}
}
}
}
// Preserve x-amz-meta- headers
for (k, v) in req.headers().iter() {
if k.as_str().starts_with("x-amz-meta-") {
match v.to_str() {
Ok(v_str) => {
other.insert(k.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", k, e);
}
}
}
}
Ok(ObjectVersionHeaders {
content_type,
other,
})
}
struct BodyChunker {
body: Body,
read_all: bool,
min_block_size: usize,
avg_block_size: usize,
max_block_size: usize,
buf: VecDeque<u8>,
}
impl BodyChunker {
fn new(body: Body, block_size: usize) -> Self {
let min_block_size = block_size / 4 * 3;
let avg_block_size = block_size;
let max_block_size = block_size * 2;
Self {
body,
read_all: false,
min_block_size,
avg_block_size,
max_block_size,
buf: VecDeque::with_capacity(2 * max_block_size),
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
while !self.read_all && self.buf.len() < self.max_block_size {
if let Some(block) = self.body.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
self.buf.extend(&bytes[..]);
} else {
self.read_all = true;
}
}
if self.buf.len() == 0 {
Ok(None)
} else {
let mut iter = FastCDC::with_eof(
self.buf.make_contiguous(),
self.min_block_size,
self.avg_block_size,
self.max_block_size,
self.read_all,
);
if let Some(Chunk { length, .. }) = iter.next() {
let block = self.buf.drain(..length).collect::<Vec<u8>>();
Ok(Some(block))
} else {
unreachable!("FastCDC returned not chunk")
}
}
}
}
async fn read_and_put_blocks(
garage: &Garage,
version: &Version,
part_number: u64,
first_block: Vec<u8>,
first_block_hash: Hash,
chunker: &mut BodyChunker,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
let mut md5hasher = Md5::new();
let mut sha256hasher = Sha256::new();
md5hasher.update(&first_block[..]);
sha256hasher.update(&first_block[..]);
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
&garage,
&version,
part_number,
0,
first_block_hash,
first_block.len() as u64,
);
let mut put_curr_block = garage
.block_manager
.rpc_put_block(first_block_hash, first_block);
loop {
let (_, _, next_block) =
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
sha256hasher.update(&block[..]);
let block_hash = blake2sum(&block[..]);
let block_len = block.len();
put_curr_version_block = put_block_meta(
&garage,
&version,
part_number,
next_offset as u64,
block_hash,
block_len as u64,
);
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
next_offset += block_len;
} else {
break;
}
}
let total_size = next_offset as u64;
let data_md5sum = md5hasher.finalize();
let data_sha256sum = sha256hasher.finalize();
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
Ok((total_size, data_md5sum, data_sha256sum))
}
async fn put_block_meta(
garage: &Garage,
version: &Version,
part_number: u64,
offset: u64,
hash: Hash,
size: u64,
) -> Result<(), GarageError> {
let mut version = version.clone();
version.blocks.put(
VersionBlockKey {
part_number,
offset,
},
VersionBlock { hash, size },
);
let block_ref = BlockRef {
block: hash,
version: version.uuid,
deleted: false.into(),
};
futures::try_join!(
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
)?;
Ok(())
}
/// Validate MD5 sum against content-md5 header
/// and sha256sum against signed content-sha256
fn ensure_checksum_matches(
data_md5sum: &[u8],
data_sha256sum: garage_util::data::FixedBytes32,
content_md5: Option<&str>,
content_sha256: Option<garage_util::data::FixedBytes32>,
) -> Result<(), Error> {
if let Some(expected_sha256) = content_sha256 {
if expected_sha256 != data_sha256sum {
return Err(Error::BadRequest(format!(
"Unable to validate x-amz-content-sha256"
)));
} else {
trace!("Successfully validated x-amz-content-sha256");
}
}
if let Some(expected_md5) = content_md5 {
if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
return Err(Error::BadRequest(format!("Unable to validate content-md5")));
} else {
trace!("Successfully validated content-md5");
}
}
Ok(())
}

View file

@ -246,15 +246,13 @@ impl AdminRpcHandler {
)))
}
KeyOperation::Import(query) => {
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id)
.await?;
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
if prev_key.is_some() {
return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
}
let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
self.garage.key_table.insert(&imported_key).await?;
Ok(AdminRPC::KeyInfo(imported_key))
}
}
}

View file

@ -5,8 +5,8 @@ use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use garage_util::error::Error;
use garage_util::data::UUID;
use garage_util::error::Error;
use garage_util::time::*;
use garage_rpc::membership::*;
@ -80,7 +80,7 @@ pub struct ConfigureNodeOpt {
#[structopt(short = "c", long = "capacity")]
capacity: Option<u32>,
/// Optionnal node tag
/// Optional node tag
#[structopt(short = "t", long = "tag")]
tag: Option<String>,
@ -384,7 +384,10 @@ pub async fn cmd_status(
Ok(())
}
pub fn find_matching_node(cand: impl std::iter::Iterator<Item=UUID>, pattern: &str) -> Result<UUID, Error> {
pub fn find_matching_node(
cand: impl std::iter::Iterator<Item = UUID>,
pattern: &str,
) -> Result<UUID, Error> {
let mut candidates = vec![];
for c in cand {
if hex::encode(&c).starts_with(&pattern) {
@ -428,7 +431,10 @@ pub async fn cmd_configure(
for replaced in args.replace.iter() {
let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
if config.members.remove(&replaced_node).is_none() {
return Err(Error::Message(format!("Cannot replace node {:?} as it is not in current configuration", replaced_node)));
return Err(Error::Message(format!(
"Cannot replace node {:?} as it is not in current configuration",
replaced_node
)));
}
}

View file

@ -1,4 +1,5 @@
#![recursion_limit = "1024"]
//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage instance
#[macro_use]
extern crate log;
@ -25,7 +26,7 @@ use cli::*;
#[derive(StructOpt, Debug)]
#[structopt(name = "garage")]
pub struct Opt {
struct Opt {
/// RPC connect to this host to execute client operations
#[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")]
pub rpc_host: SocketAddr,

View file

@ -8,10 +8,10 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
use garage_api::api_server;
use garage_api::run_api_server;
use garage_model::garage::Garage;
use garage_rpc::rpc_server::RpcServer;
use garage_web::web_server;
use garage_web::run_web_server;
use crate::admin_rpc::*;
@ -52,7 +52,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing Garage main data store...");
let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
let bootstrap = garage.system.clone().bootstrap(
&config.bootstrap_peers[..],
config.bootstrap_peers,
config.consul_host,
config.consul_service_name,
);
@ -62,8 +62,8 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing RPC and API servers...");
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone()));
let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
let web_server = run_web_server(garage, wait_from(watch_cancel.clone()));
futures::try_join!(
bootstrap.map(|rv| {

View file

@ -21,8 +21,8 @@ arc-swap = "1.0"
hex = "0.4"
log = "0.4"
rand = "0.8"
sled = "0.34"
zstd = "0.6.1"
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }

View file

@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
use zstd::stream::{decode_all as zstd_decode, Encoder};
use garage_util::data::*;
use garage_util::error::Error;
@ -18,12 +19,13 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use garage_table::replication::{sharded::TableShardedReplication, TableReplication};
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block_ref_table::*;
use crate::garage::Garage;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
@ -33,28 +35,56 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
Ok,
/// Message to ask for a block of data, by hash
GetBlock(Hash),
PutBlock(PutBlockMessage),
/// Message to send a block of data, either because requested, of for first delivery of new
/// block
PutBlock {
hash: Hash,
data: BlockData,
},
/// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash),
/// Response : whether the node do require that block
NeedBlockReply(bool),
}
/// A possibly compressed block of data
#[derive(Debug, Serialize, Deserialize)]
pub struct PutBlockMessage {
pub hash: Hash,
pub enum BlockData {
Plain(#[serde(with = "serde_bytes")] Vec<u8>),
Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
}
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
impl BlockData {
pub fn is_compressed(&self) -> bool {
match self {
BlockData::Plain(_) => false,
BlockData::Compressed(_) => true,
}
}
pub fn buffer(&self) -> &Vec<u8> {
match self {
BlockData::Plain(b) => b,
BlockData::Compressed(b) => b,
}
}
}
impl RpcMessage for Message {}
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
/// Directory in which block are stored
pub data_dir: PathBuf,
/// Lock to prevent concurrent edition of the directory
pub data_dir_lock: Mutex<()>,
rc: sled::Tree,
@ -120,7 +150,7 @@ impl BlockManager {
async fn handle(self: Arc<Self>, msg: &Message) -> Result<Message, Error> {
match msg {
Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
Message::PutBlock { hash, data } => self.write_block(&hash, &data).await,
Message::GetBlock(h) => self.read_block(h).await,
Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply),
_ => Err(Error::BadRPC(format!("Unexpected RPC message"))),
@ -128,7 +158,8 @@ impl BlockManager {
}
pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing
// Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
// launches only one worker with current value of BACKGROUND_WORKERS
for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone();
let background = self.system.background.clone();
@ -141,70 +172,124 @@ impl BlockManager {
}
}
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
/// Write a block to disk
pub async fn write_block(&self, hash: &Hash, data: &BlockData) -> Result<Message, Error> {
let mut path = self.block_dir(hash);
let _lock = self.data_dir_lock.lock().await;
let mut path = self.block_dir(hash);
fs::create_dir_all(&path).await?;
let clean_plain = match self.is_block_compressed(hash).await {
Ok(true) => return Ok(Message::Ok),
Ok(false) if !data.is_compressed() => return Ok(Message::Ok), // we have a plain block, and the provided block is not compressed either
Ok(false) => true,
Err(_) => false,
};
fs::create_dir_all(&path).await?;
path.push(hex::encode(hash));
if fs::metadata(&path).await.is_ok() {
return Ok(Message::Ok);
if data.is_compressed() {
path.set_extension("zst");
}
let mut f = fs::File::create(path).await?;
f.write_all(data).await?;
let buffer = data.buffer();
let mut f = fs::File::create(path.clone()).await?;
f.write_all(&buffer).await?;
drop(f);
if clean_plain {
path.set_extension("");
fs::remove_file(path).await?;
}
Ok(Message::Ok)
}
/// Read block from disk, verifying it's integrity
pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
let path = self.block_path(hash);
let mut path = self.block_path(hash);
let mut f = match fs::File::open(&path).await {
Ok(f) => f,
Err(e) => {
let mut data = vec![];
let block = match self.is_block_compressed(hash).await {
Ok(false) => {
let f = fs::File::open(&path).await;
f.map(|f| (f, false)).map_err(Into::into)
}
Ok(true) => {
path.set_extension("zst");
let f = fs::File::open(&path).await;
f.map(|f| (f, true)).map_err(Into::into)
}
Err(e) => Err(e),
};
let (mut f, compressed) = match block {
Ok(ok) => ok,
e => {
// Not found but maybe we should have had it ??
self.put_to_resync(hash, Duration::from_millis(0))?;
return Err(Into::into(e));
e?
}
};
let mut data = vec![];
f.read_to_end(&mut data).await?;
drop(f);
if blake2sum(&data[..]) != *hash {
let sum_ok = if compressed {
zstd_check_checksum(&data[..])
} else {
blake2sum(&data[..]) == *hash
};
if !sum_ok {
let _lock = self.data_dir_lock.lock().await;
warn!(
"Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
hash
);
let mut path2 = path.clone();
path2.set_extension(".corrupted");
path2.set_extension("corrupted");
fs::rename(path, path2).await?;
self.put_to_resync(&hash, Duration::from_millis(0))?;
return Err(Error::CorruptData(*hash));
}
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
if compressed {
Ok(Message::PutBlock {
hash: *hash,
data: BlockData::Compressed(data),
})
} else {
Ok(Message::PutBlock {
hash: *hash,
data: BlockData::Plain(data),
})
}
}
pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
/// Check if this node should have a block, but don't actually have it
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
let needed = self
.rc
.get(hash.as_ref())?
.map(|x| u64_from_be_bytes(x) > 0)
.unwrap_or(false);
if needed {
let path = self.block_path(hash);
let exists = fs::metadata(&path).await.is_ok();
let exists = self.is_block_compressed(hash).await.is_ok();
Ok(!exists)
} else {
Ok(false)
}
}
async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
let mut path = self.block_path(hash);
path.set_extension("zst");
if fs::metadata(&path).await.is_ok() {
return Ok(true);
}
path.set_extension("");
fs::metadata(&path).await.map(|_| false).map_err(Into::into)
}
fn block_dir(&self, hash: &Hash) -> PathBuf {
let mut path = self.data_dir.clone();
path.push(hex::encode(&hash.as_slice()[0..1]));
@ -217,6 +302,8 @@ impl BlockManager {
path
}
/// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
let old_rc = self.rc.fetch_and_update(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@ -229,6 +316,7 @@ impl BlockManager {
Ok(())
}
/// Decrement the number of time a block is used
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.update_and_fetch(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@ -302,7 +390,7 @@ impl BlockManager {
let path = self.block_path(hash);
let exists = fs::metadata(&path).await.is_ok();
let exists = self.is_block_compressed(hash).await.is_ok();
let needed = self
.rc
.get(hash.as_ref())?
@ -381,14 +469,14 @@ impl BlockManager {
// TODO find a way to not do this if they are sending it to us
// Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay
// between the RC being incremented and this part being called.
let block_data = self.rpc_get_block(&hash).await?;
self.write_block(hash, &block_data[..]).await?;
let block = self.rpc_get_raw_block(&hash).await?;
self.write_block(hash, &block).await?;
}
Ok(())
}
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<BlockData, Error> {
let who = self.replication.read_nodes(&hash);
let resps = self
.rpc_client
@ -402,8 +490,9 @@ impl BlockManager {
.await?;
for resp in resps {
if let Message::PutBlock(msg) = resp {
return Ok(msg.data);
match resp {
Message::PutBlock { data, .. } => return Ok(data),
_ => {}
}
}
Err(Error::Message(format!(
@ -412,12 +501,47 @@ impl BlockManager {
)))
}
/// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
self.rpc_get_raw_block(hash)
.await
.and_then(|data| match data {
BlockData::Plain(data) => Ok(data),
BlockData::Compressed(data) => {
zstd_decode(&data[..]).map_err(|_| Error::CorruptData(*hash))
}
})
}
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let garage = self.garage.load_full().unwrap();
let compressed = if garage.config.enable_compression {
zstd_encode(&data[..], garage.config.compression_level).ok()
} else {
None
};
// If compressed data is not less than 7/8 of the size of the original data, i.e. if we
// don't gain a significant margin by compressing, then we store the plain data instead
// so that we don't lose time decompressing it on reads.
let block_data =
if compressed.is_some() && compressed.as_ref().unwrap().len() < (data.len() * 7) / 8 {
BlockData::Compressed(compressed.unwrap())
} else {
BlockData::Plain(data)
};
let message = Message::PutBlock {
hash,
data: block_data,
};
let who = self.replication.write_nodes(&hash);
self.rpc_client
.try_call_many(
&who[..],
Message::PutBlock(PutBlockMessage { hash, data }),
message,
RequestStrategy::with_quorum(self.replication.write_quorum())
.with_timeout(BLOCK_RW_TIMEOUT),
)
@ -498,6 +622,7 @@ impl BlockManager {
.boxed()
}
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len()
}
@ -513,3 +638,16 @@ fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 {
x8.copy_from_slice(bytes.as_ref());
u64::from_be_bytes(x8)
}
fn zstd_check_checksum<R: std::io::Read>(source: R) -> bool {
zstd::stream::copy_decode(source, std::io::sink()).is_ok()
}
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?;
encoder.include_checksum(true)?;
std::io::copy(&mut source, &mut encoder)?;
encoder.finish()?;
Ok(result)
}

View file

@ -10,13 +10,14 @@ use crate::block::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef {
// Primary key
/// Hash (blake2 sum) of the block, used as partition key
pub block: Hash,
// Sort key
/// Id of the Version for the object containing this block, used as sorting key
pub version: UUID,
// Keep track of deleted status
/// Is the Version that contains this block deleted
pub deleted: crdt::Bool,
}

View file

@ -12,15 +12,18 @@ use crate::key_table::PermissionSet;
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
// Primary key
/// Name of the bucket
pub name: String,
/// State, and configuration if not deleted, of the bucket
pub state: crdt::LWW<BucketState>,
}
/// State of a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum BucketState {
/// The bucket is deleted
Deleted,
/// The bucket exists
Present(BucketParams),
}
@ -37,9 +40,12 @@ impl CRDT for BucketState {
}
}
/// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
/// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::LWWMap<String, PermissionSet>,
/// Is the bucket served as http
pub website: crdt::LWW<bool>,
}
@ -51,6 +57,7 @@ impl CRDT for BucketParams {
}
impl BucketParams {
/// Create an empty BucketParams with no authorized keys and no website accesss
pub fn new() -> Self {
BucketParams {
authorized_keys: crdt::LWWMap::new(),
@ -60,15 +67,21 @@ impl BucketParams {
}
impl Bucket {
/// Initializes a new instance of the Bucket struct
pub fn new(name: String) -> Self {
Bucket {
name,
state: crdt::LWW::new(BucketState::Present(BucketParams::new())),
}
}
/// Returns true if this represents a deleted bucket
pub fn is_deleted(&self) -> bool {
*self.state.get() == BucketState::Deleted
}
/// Return the list of authorized keys, when each was updated, and the permission associated to
/// the key
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
match self.state.get() {
BucketState::Deleted => &[],

View file

@ -7,8 +7,8 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::RpcHttpClient;
use garage_rpc::rpc_server::RpcServer;
use garage_table::replication::fullcopy::*;
use garage_table::replication::sharded::*;
use garage_table::replication::TableFullReplication;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::block::*;
@ -18,15 +18,23 @@ use crate::key_table::*;
use crate::object_table::*;
use crate::version_table::*;
/// An entire Garage full of data
pub struct Garage {
/// The parsed configuration Garage is running
pub config: Config,
/// The local database
pub db: sled::Db,
/// A background job runner
pub background: Arc<BackgroundRunner>,
/// The membership manager
pub system: Arc<System>,
/// The block manager
pub block_manager: Arc<BlockManager>,
/// Table containing informations about buckets
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
/// Table containing informations about api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
@ -35,6 +43,7 @@ pub struct Garage {
}
impl Garage {
/// Create and run garage
pub fn new(
config: Config,
db: sled::Db,

View file

@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::*;
use garage_table::*;
/// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
// Primary key
/// The id of the key (immutable), used as partition key
pub key_id: String,
// Associated secret key (immutable)
/// The secret_key associated
pub secret_key: String,
// Name
/// Name for the key
pub name: crdt::LWW<String>,
// Deletion
/// Is the key deleted
pub deleted: crdt::Bool,
// Authorized keys
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
/// Buckets in which the key is authorized. Empty if `Key` is deleted
// CRDT interaction: deleted implies authorized_buckets is empty
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
}
impl Key {
/// Initialize a new Key, generating a random identifier and associated secret key
pub fn new(name: String) -> Self {
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
@ -34,6 +36,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
/// Import a key from it's parts
pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
Self {
key_id: key_id.to_string(),
@ -43,6 +47,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
/// Create a new Key which can me merged to mark an existing key deleted
pub fn delete(key_id: String) -> Self {
Self {
key_id,
@ -52,13 +58,16 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
/// Add an authorized bucket, only if it wasn't there before
/// Check if `Key` is allowed to read in bucket
pub fn allow_read(&self, bucket: &str) -> bool {
self.authorized_buckets
.get(&bucket.to_string())
.map(|x| x.allow_read)
.unwrap_or(false)
}
/// Check if `Key` is allowed to write in bucket
pub fn allow_write(&self, bucket: &str) -> bool {
self.authorized_buckets
.get(&bucket.to_string())
@ -67,9 +76,12 @@ impl Key {
}
}
/// Permission given to a key in a bucket
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct PermissionSet {
/// The key can be used to read the bucket
pub allow_read: bool,
/// The key can be used to write in the bucket
pub allow_write: bool,
}

View file

@ -6,24 +6,26 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::replication::sharded::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::version_table::*;
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
// Primary key
/// The bucket in which the object is stored, used as partition key
pub bucket: String,
// Sort key
/// The key at which the object is stored in its bucket, used as sorting key
pub key: String,
// Data
/// The list of currenty stored versions of the object
versions: Vec<ObjectVersion>,
}
impl Object {
/// Initialize an Object struct from parts
pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
let mut ret = Self {
bucket,
@ -36,6 +38,7 @@ impl Object {
}
ret
}
/// Adds a version if it wasn't already present
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
match self
@ -49,23 +52,32 @@ impl Object {
Ok(_) => Err(()),
}
}
/// Get a list of currently stored versions of `Object`
pub fn versions(&self) -> &[ObjectVersion] {
&self.versions[..]
}
}
/// Informations about a version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
pub uuid: UUID,
/// Timestamp of when the object was created
pub timestamp: u64,
/// State of the version
pub state: ObjectVersionState,
}
/// State of an object version
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
/// The version is being received
Uploading(ObjectVersionHeaders),
/// The version is fully received
Complete(ObjectVersionData),
/// The version uploaded containded errors or the upload was explicitly aborted
Aborted,
}
@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState {
}
}
/// Data stored in object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
/// The object was deleted, this Version is a tombstone to mark it as such
DeleteMarker,
/// The object is short, it's stored inlined
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
/// The object is not short, Hash of first block is stored here, next segments hashes are
/// stored in the version table
FirstBlock(ObjectVersionMeta, Hash),
}
@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true;
}
/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
/// Headers to send to the client
pub headers: ObjectVersionHeaders,
/// Size of the object
pub size: u64,
/// etag of the object
pub etag: String,
}
/// Additional headers for an object
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
/// Content type of the object
pub content_type: String,
/// Any other http headers to send
pub other: BTreeMap<String, String>,
}
@ -118,18 +142,24 @@ impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid)
}
/// Is the object version currently being uploaded
pub fn is_uploading(&self) -> bool {
match self.state {
ObjectVersionState::Uploading(_) => true,
_ => false,
}
}
/// Is the object version completely received
pub fn is_complete(&self) -> bool {
match self.state {
ObjectVersionState::Complete(_) => true,
_ => false,
}
}
/// Is the object version available (received and not a tombstone)
pub fn is_data(&self) -> bool {
match self.state {
ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,
@ -147,7 +177,9 @@ impl Entry<String, String> for Object {
&self.key
}
fn is_tombstone(&self) -> bool {
self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
self.versions.len() == 1
&& self.versions[0].state
== ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
}
}

View file

@ -5,26 +5,32 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::replication::sharded::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::block_ref_table::*;
/// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
// Primary key
/// UUID of the version, used as partition key
pub uuid: UUID,
// Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload
/// Is this version deleted
pub deleted: crdt::Bool,
/// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
/// Etag of each part in case of a multipart upload, empty otherwise
pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
/// Bucket in which the related object is stored
pub bucket: String,
/// Key in which the related object is stored
pub key: String,
}
@ -43,7 +49,9 @@ impl Version {
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlockKey {
/// Number of the part
pub part_number: u64,
/// Offset of this sub-segment in its part
pub offset: u64,
}
@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey {
}
}
/// Informations about a single block
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
/// Blake2 sum of the block
pub hash: Hash,
/// Size of the block
pub size: u64,
}

View file

@ -1,7 +1,9 @@
//! Crate containing rpc related functions and types used in Garage
#[macro_use]
extern crate log;
pub mod consul;
mod consul;
pub(crate) mod tls_util;
pub mod membership;

View file

@ -1,3 +1,4 @@
//! Module containing structs related to membership management
use std::collections::HashMap;
use std::fmt::Write as FmtWrite;
use std::io::{Read, Write};
@ -11,13 +12,13 @@ use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tokio::sync::watch;
use tokio::sync::Mutex;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::persister::Persister;
use garage_util::time::*;
use crate::consul::get_consul_nodes;
@ -26,24 +27,33 @@ use crate::rpc_client::*;
use crate::rpc_server::*;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
/// RPC endpoint used for calls related to membership
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
/// Response to successfull advertisements
Ok,
/// Message sent to detect other nodes status
Ping(PingMessage),
/// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
PullStatus,
/// Ask other node its config. Answered with AdvertiseConfig
PullConfig,
/// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
AdvertiseNodesUp(Vec<AdvertisedNode>),
/// Advertisement of nodes config. Sent spontanously or in response to PullConfig
AdvertiseConfig(NetworkConfig),
}
impl RpcMessage for Message {}
/// A ping, containing informations about status and config
#[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage {
id: UUID,
@ -55,21 +65,29 @@ pub struct PingMessage {
state_info: StateInfo,
}
/// A node advertisement
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode {
/// Id of the node this advertisement relates to
pub id: UUID,
/// IP and port of the node
pub addr: SocketAddr,
/// Is the node considered up
pub is_up: bool,
/// When was the node last seen up, in milliseconds since UNIX epoch
pub last_seen: u64,
pub state_info: StateInfo,
}
/// This node's membership manager
pub struct System {
/// The id of this node
pub id: UUID,
metadata_dir: PathBuf,
persist_config: Persister<NetworkConfig>,
persist_status: Persister<Vec<AdvertisedNode>>,
rpc_local_port: u16,
state_info: StateInfo,
@ -78,28 +96,43 @@ pub struct System {
rpc_client: Arc<RpcClient<Message>>,
pub(crate) status: watch::Receiver<Arc<Status>>,
/// The ring
pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,
update_lock: Mutex<Updaters>,
/// The job runner of this node
pub background: Arc<BackgroundRunner>,
}
struct Updaters {
update_status: watch::Sender<Arc<Status>>,
update_ring: watch::Sender<Arc<Ring>>,
}
/// The status of each nodes, viewed by this node
#[derive(Debug, Clone)]
pub struct Status {
/// Mapping of each node id to its known status
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
/// Hash of `nodes`, used to detect when nodes have different views of the cluster
pub hash: Hash,
}
/// The status of a single node
#[derive(Debug)]
pub struct StatusEntry {
/// The IP and port used to connect to this node
pub addr: SocketAddr,
/// Last time this node was seen
pub last_seen: u64,
/// Number of consecutive pings sent without reply to this node
pub num_failures: AtomicUsize,
pub state_info: StateInfo,
}
impl StatusEntry {
/// is the node associated to this entry considered up
pub fn is_up(&self) -> bool {
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
}
@ -144,6 +177,25 @@ impl Status {
debug!("END --");
self.hash = blake2sum(nodes_txt.as_bytes());
}
fn to_serializable_membership(&self, system: &System) -> Vec<AdvertisedNode> {
let mut mem = vec![];
for (node, status) in self.nodes.iter() {
let state_info = if *node == system.id {
system.state_info.clone()
} else {
status.state_info.clone()
};
mem.push(AdvertisedNode {
id: *node,
addr: status.addr,
is_up: status.is_up(),
last_seen: status.last_seen,
state_info,
});
}
mem
}
}
fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
@ -169,24 +221,8 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
}
}
fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
let mut path = metadata_dir.clone();
path.push("network_config");
let mut file = std::fs::OpenOptions::new()
.read(true)
.open(path.as_path())?;
let mut net_config_bytes = vec![];
file.read_to_end(&mut net_config_bytes)?;
let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])
.expect("Unable to parse network configuration file (has version format changed?).");
Ok(net_config)
}
impl System {
/// Create this node's membership manager
pub fn new(
metadata_dir: PathBuf,
rpc_http_client: Arc<RpcHttpClient>,
@ -196,7 +232,10 @@ impl System {
let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
info!("Node ID: {}", hex::encode(&id));
let net_config = match read_network_config(&metadata_dir) {
let persist_config = Persister::new(&metadata_dir, "network_config");
let persist_status = Persister::new(&metadata_dir, "peer_info");
let net_config = match persist_config.load() {
Ok(x) => x,
Err(e) => {
info!(
@ -206,6 +245,7 @@ impl System {
NetworkConfig::new()
}
};
let mut status = Status {
nodes: HashMap::new(),
hash: Hash::default(),
@ -231,14 +271,18 @@ impl System {
let sys = Arc::new(System {
id,
metadata_dir,
persist_config,
persist_status,
rpc_local_port: rpc_server.bind_addr.port(),
state_info,
rpc_http_client,
rpc_client,
status,
ring,
update_lock: Mutex::new((update_status, update_ring)),
update_lock: Mutex::new(Updaters {
update_status,
update_ring,
}),
background,
});
sys.clone().register_handler(rpc_server, rpc_path);
@ -263,6 +307,7 @@ impl System {
});
}
/// Get an RPC client
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
RpcClient::new(
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
@ -271,15 +316,13 @@ impl System {
)
}
/// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let mut path = self.metadata_dir.clone();
path.push("network_config");
let ring = self.ring.borrow().clone();
let data = rmp_to_vec_all_named(&ring.config)?;
let mut f = tokio::fs::File::create(path.as_path()).await?;
f.write_all(&data[..]).await?;
self.persist_config
.save_async(&ring.config)
.await
.expect("Cannot save current cluster configuration");
Ok(())
}
@ -306,28 +349,24 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await;
}
/// Perform bootstraping, starting the ping loop
pub async fn bootstrap(
self: Arc<Self>,
peers: &[SocketAddr],
peers: Vec<SocketAddr>,
consul_host: Option<String>,
consul_service_name: Option<String>,
) {
let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>();
self.clone().ping_nodes(bootstrap_peers).await;
let self2 = self.clone();
self.background
.spawn_worker(format!("discovery loop"), |stop_signal| {
self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal)
});
let self2 = self.clone();
self.background
.spawn_worker(format!("ping loop"), |stop_signal| {
self2.ping_loop(stop_signal)
});
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
self.background
.spawn_worker(format!("Consul loop"), |stop_signal| {
self2.consul_loop(stop_signal, consul_host, consul_service_name)
});
}
}
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
@ -378,6 +417,8 @@ impl System {
}
} else if let Some(id) = id_option {
if let Some(st) = status.nodes.get_mut(id) {
// we need to increment failure counter as call was done using by_addr so the
// counter was not auto-incremented
st.num_failures.fetch_add(1, Ordering::SeqCst);
if !st.is_up() {
warn!("Node {:?} seems to be down.", id);
@ -394,9 +435,7 @@ impl System {
if has_changes {
status.recalculate_hash();
}
if let Err(e) = update_locked.0.send(Arc::new(status)) {
error!("In ping_nodes: could not save status update ({})", e);
}
self.update_status(&update_locked, status).await;
drop(update_locked);
if to_advertise.len() > 0 {
@ -420,7 +459,7 @@ impl System {
let status_hash = status.hash;
let config_version = self.ring.borrow().config.version;
update_locked.0.send(Arc::new(status))?;
self.update_status(&update_locked, status).await;
drop(update_locked);
if is_new || status_hash != ping.status_hash {
@ -436,23 +475,9 @@ impl System {
}
fn handle_pull_status(&self) -> Result<Message, Error> {
let status = self.status.borrow().clone();
let mut mem = vec![];
for (node, status) in status.nodes.iter() {
let state_info = if *node == self.id {
self.state_info.clone()
} else {
status.state_info.clone()
};
mem.push(AdvertisedNode {
id: *node,
addr: status.addr,
is_up: status.is_up(),
last_seen: status.last_seen,
state_info,
});
}
Ok(Message::AdvertiseNodesUp(mem))
Ok(Message::AdvertiseNodesUp(
self.status.borrow().to_serializable_membership(self),
))
}
fn handle_pull_config(&self) -> Result<Message, Error> {
@ -502,7 +527,7 @@ impl System {
if has_changed {
status.recalculate_hash();
}
update_lock.0.send(Arc::new(status))?;
self.update_status(&update_lock, status).await;
drop(update_lock);
if to_ping.len() > 0 {
@ -522,7 +547,7 @@ impl System {
if adv.version > ring.config.version {
let ring = Ring::new(adv.clone());
update_lock.1.send(Arc::new(ring))?;
update_lock.update_ring.send(Arc::new(ring))?;
drop(update_lock);
self.background.spawn_cancellable(
@ -537,7 +562,7 @@ impl System {
}
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
while !*stop_signal.borrow() {
let restart_at = tokio::time::sleep(PING_INTERVAL);
let status = self.status.borrow().clone();
@ -552,34 +577,64 @@ impl System {
select! {
_ = restart_at.fuse() => (),
_ = stop_signal.changed().fuse() => {
if *stop_signal.borrow() {
return;
}
}
_ = stop_signal.changed().fuse() => (),
}
}
}
async fn consul_loop(
async fn discovery_loop(
self: Arc<Self>,
bootstrap_peers: Vec<SocketAddr>,
consul_host: Option<String>,
consul_service_name: Option<String>,
mut stop_signal: watch::Receiver<bool>,
consul_host: String,
consul_service_name: String,
) {
while !*stop_signal.borrow() {
let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
let consul_config = match (consul_host, consul_service_name) {
(Some(ch), Some(csn)) => Some((ch, csn)),
_ => None,
};
match get_consul_nodes(&consul_host, &consul_service_name).await {
Ok(mut node_list) => {
let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
self.clone().ping_nodes(ping_addrs).await;
while !*stop_signal.borrow() {
let not_configured = self.ring.borrow().config.members.len() == 0;
let no_peers = self.status.borrow().nodes.len() < 3;
let bad_peers = self
.status
.borrow()
.nodes
.iter()
.filter(|(_, v)| v.is_up())
.count() != self.ring.borrow().config.members.len();
if not_configured || no_peers || bad_peers {
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
let mut ping_list = bootstrap_peers
.iter()
.map(|ip| (*ip, None))
.collect::<Vec<_>>();
match self.persist_status.load_async().await {
Ok(peers) => {
ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
}
_ => (),
}
if let Some((consul_host, consul_service_name)) = &consul_config {
match get_consul_nodes(consul_host, consul_service_name).await {
Ok(node_list) => {
ping_list.extend(node_list.iter().map(|a| (*a, None)));
}
Err(e) => {
warn!("Could not retrieve node list from Consul: {}", e);
}
}
}
self.clone().ping_nodes(ping_list).await;
}
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
_ = restart_at.fuse() => (),
_ = stop_signal.changed().fuse() => (),
@ -611,4 +666,35 @@ impl System {
let _: Result<_, _> = self.handle_advertise_config(&config).await;
}
}
async fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) {
if status.hash != self.status.borrow().hash {
let mut list = status.to_serializable_membership(&self);
// Combine with old peer list to make sure no peer is lost
match self.persist_status.load_async().await {
Ok(old_list) => {
for pp in old_list {
if !list.iter().any(|np| pp.id == np.id) {
list.push(pp);
}
}
}
_ => (),
}
if list.len() > 0 {
info!("Persisting new peer list ({} peers)", list.len());
self.persist_status
.save_async(&list)
.await
.expect("Unable to persist peer list");
}
}
updaters
.update_status
.send(Arc::new(status))
.expect("Could not update internal membership status");
}
}

View file

@ -1,3 +1,5 @@
//! Module containing types related to computing nodes which should receive a copy of data blocks
//! and metadata
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
@ -8,23 +10,30 @@ use garage_util::data::*;
// A partition number is encoded on 16 bits,
// i.e. we have up to 2**16 partitions.
// (in practice we have exactly 2**PARTITION_BITS partitions)
/// A partition id, stored on 16 bits
pub type Partition = u16;
// TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump
// it up to 10.
// Maximum value : 16
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
/// presence of numerous nodes, but exponentially bigger ring. Max 16
pub const PARTITION_BITS: usize = 8;
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
// TODO: make this constant paraetrizable in the config file
// (most deployments use a replication factor of 3, so...)
/// The maximum number of time an object might get replicated
pub const MAX_REPLICATION: usize = 3;
/// The user-defined configuration of the cluster's nodes
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig {
/// Map of each node's id to it's configuration
pub members: HashMap<UUID, NetworkConfigEntry>,
/// Version of this config
pub version: u64,
}
@ -37,26 +46,40 @@ impl NetworkConfig {
}
}
/// The overall configuration of one (possibly remote) node
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfigEntry {
/// Datacenter at which this entry belong. This infromation might be used to perform a better
/// geodistribution
pub datacenter: String,
/// The (relative) capacity of the node
pub capacity: u32,
/// A tag to recognize the entry, not used for other things than display
pub tag: String,
}
/// A ring distributing fairly objects to nodes
#[derive(Clone)]
pub struct Ring {
/// The network configuration used to generate this ring
pub config: NetworkConfig,
/// The list of entries in the ring
pub ring: Vec<RingEntry>,
}
/// An entry in the ring
#[derive(Clone, Debug)]
pub struct RingEntry {
/// The prefix of the Hash of object which should use this entry
pub location: Hash,
/// The nodes in which a matching object should get stored
pub nodes: [UUID; MAX_REPLICATION],
}
impl Ring {
// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
// levels of imbrication. It is basically impossible to test, maintain, or understand for an
// outsider.
pub(crate) fn new(config: NetworkConfig) -> Self {
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
@ -166,20 +189,16 @@ impl Ring {
})
.collect::<Vec<_>>();
// eprintln!("RING: --");
// for e in ring.iter() {
// eprintln!("{:?}", e);
// }
// eprintln!("END --");
Self { config, ring }
}
/// Get the partition in which data would fall on
pub fn partition_of(&self, from: &Hash) -> Partition {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
top >> (16 - PARTITION_BITS)
}
/// Get the list of partitions and the first hash of a partition key that would fall in it
pub fn partitions(&self) -> Vec<(Partition, Hash)> {
let mut ret = vec![];
@ -193,6 +212,8 @@ impl Ring {
ret
}
// TODO rename this function as it no longer walk the ring
/// Walk the ring to find the n servers in which data should be replicated
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!");
@ -201,12 +222,15 @@ impl Ring {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
// TODO why computing two time in the same way and asserting?
assert_eq!(partition_idx, self.partition_of(from) as usize);
let partition = &self.ring[partition_idx];
let partition_top =
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
// TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should
// probably be a test more than a runtime assertion
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
assert!(n <= partition.nodes.len());

View file

@ -1,3 +1,4 @@
//! Contain structs related to making RPCs
use std::borrow::Borrow;
use std::marker::PhantomData;
use std::net::SocketAddr;
@ -26,14 +27,19 @@ use crate::tls_util;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
/// Max time to wait for reponse
pub rs_timeout: Duration,
/// Min number of response to consider the request successful
pub rs_quorum: usize,
/// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool,
}
impl RequestStrategy {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_quorum(quorum: usize) -> Self {
RequestStrategy {
rs_timeout: DEFAULT_TIMEOUT,
@ -41,19 +47,25 @@ impl RequestStrategy {
rs_interrupt_after_quorum: false,
}
}
/// Set timeout of the strategy
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.rs_timeout = timeout;
self
}
/// Set if requests can be dropped after quorum has been reached
/// In general true for read requests, and false for write
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt;
self
}
}
/// Shortcut for a boxed async function taking a message, and resolving to another message or an
/// error
pub type LocalHandlerFn<M> =
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
/// Client used to send RPC
pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>,
@ -64,6 +76,7 @@ pub struct RpcClient<M: RpcMessage> {
}
impl<M: RpcMessage + 'static> RpcClient<M> {
/// Create a new RpcClient from an address, a job runner, and the status of all RPC servers
pub fn new(
rac: RpcAddrClient<M>,
background: Arc<BackgroundRunner>,
@ -77,6 +90,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
})
}
/// Set the local handler, to process RPC to this node without network usage
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
where
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
@ -90,14 +104,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
self.local_handler.swap(Some(Arc::new((my_id, handler))));
}
/// Get a RPC client to make calls using node's SocketAddr instead of its ID
pub fn by_addr(&self) -> &RpcAddrClient<M> {
&self.rpc_addr_client
}
/// Make a RPC call
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
self.call_arc(to, Arc::new(msg), timeout).await
}
/// Make a RPC call from a message stored in an Arc
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
if let Some(lh) = self.local_handler.load_full() {
let (my_id, local_handler) = lh.as_ref();
@ -128,13 +145,13 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
{
Err(rpc_error) => {
node_status.num_failures.fetch_add(1, Ordering::SeqCst);
// TODO: Save failure info somewhere
Err(Error::from(rpc_error))
}
Ok(x) => x,
}
}
/// Make a RPC call to multiple servers, returning a Vec containing each result
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
let msg = Arc::new(msg);
let mut resp_stream = to
@ -149,6 +166,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
results
}
/// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
/// strategy could not be respected due to too many errors
pub async fn try_call_many(
self: &Arc<Self>,
to: &[UUID],
@ -208,6 +227,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
}
/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request
pub struct RpcAddrClient<M: RpcMessage> {
phantom: PhantomData<M>,
@ -216,6 +236,7 @@ pub struct RpcAddrClient<M: RpcMessage> {
}
impl<M: RpcMessage> RpcAddrClient<M> {
/// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
Self {
phantom: PhantomData::default(),
@ -224,6 +245,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
}
}
/// Make a RPC
pub async fn call<MB>(
&self,
to_addr: &SocketAddr,
@ -239,6 +261,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
}
}
/// HTTP client used to make RPCs
pub struct RpcHttpClient {
request_limiter: Semaphore,
method: ClientMethod,
@ -250,6 +273,7 @@ enum ClientMethod {
}
impl RpcHttpClient {
/// Create a new RpcHttpClient
pub fn new(
max_concurrent_requests: usize,
tls_config: &Option<TlsConfig>,
@ -280,6 +304,7 @@ impl RpcHttpClient {
})
}
/// Make a RPC
async fn call<M, MB>(
&self,
path: &str,

View file

@ -1,3 +1,4 @@
//! Contains structs related to receiving RPCs
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
@ -22,13 +23,17 @@ use garage_util::error::Error;
use crate::tls_util;
/// Trait for messages that can be sent as RPC
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
/// Structure handling RPCs
pub struct RpcServer {
/// The address the RpcServer will bind
pub bind_addr: SocketAddr,
/// The tls configuration used for RPC
pub tls_config: Option<TlsConfig>,
handlers: HashMap<String, Handler>,
@ -87,6 +92,7 @@ where
}
impl RpcServer {
/// Create a new RpcServer
pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
Self {
bind_addr,
@ -95,6 +101,7 @@ impl RpcServer {
}
}
/// Add handler handling request made to `name`
pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
where
M: RpcMessage + 'static,
@ -156,6 +163,7 @@ impl RpcServer {
}
}
/// Run the RpcServer
pub async fn run(
self: Arc<Self>,
shutdown_signal: impl Future<Output = ()>,

View file

@ -34,7 +34,7 @@ use crate::crdt::crdt::*;
/// and may differ from what you observed with your atomic clock!
///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in entreprise when reconciliating databases with ad-hoc scripts.
/// in enterprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWW<T> {
ts: u64,

View file

@ -37,6 +37,7 @@ where
Self { vals: vec![(k, v)] }
}
/// Add a value to the map
pub fn put(&mut self, k: K, v: V) {
self.merge(&Self::put_mutator(k, v));
}

View file

@ -35,7 +35,13 @@ where
F: TableSchema,
R: TableReplication,
{
pub fn new(system: Arc<System>, name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
pub fn new(
system: Arc<System>,
name: String,
instance: F,
replication: R,
db: &sled::Db,
) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", name))
.expect("Unable to open DB tree");

View file

@ -74,7 +74,7 @@ where
while !*must_exit.borrow() {
match self.gc_loop_iter().await {
Ok(true) => {
// Stuff was done, loop imediately
// Stuff was done, loop immediately
continue;
}
Ok(false) => {
@ -157,7 +157,12 @@ where
if errs.is_empty() {
Ok(true)
} else {
Err(Error::Message(errs.into_iter().map(|x| format!("{}", x)).collect::<Vec<_>>().join(", ")))
Err(Error::Message(
errs.into_iter()
.map(|x| format!("{}", x))
.collect::<Vec<_>>()
.join(", "),
))
}
}

View file

@ -8,10 +8,10 @@ pub mod schema;
pub mod util;
pub mod data;
pub mod gc;
pub mod merkle;
mod gc;
mod merkle;
pub mod replication;
pub mod sync;
mod sync;
pub mod table;
pub use schema::*;

View file

@ -200,12 +200,13 @@ where
let subnode = self.read_node_txn(tx, &key_sub)?;
match subnode {
MerkleNode::Empty => {
warn!("({}) Single subnode in tree is empty Merkle node", self.data.name);
warn!(
"({}) Single subnode in tree is empty Merkle node",
self.data.name
);
Some(MerkleNode::Empty)
}
MerkleNode::Intermediate(_) => {
Some(MerkleNode::Intermediate(children))
}
MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
x @ MerkleNode::Leaf(_, _) => {
tx.remove(key_sub.encode())?;
Some(x)
@ -239,14 +240,24 @@ where
{
let exlf_subkey = key.next_key(&exlf_khash);
let exlf_sub_hash = self.update_item_rec(tx, &exlf_k[..], &exlf_khash, &exlf_subkey, Some(exlf_vhash))?.unwrap();
let exlf_sub_hash = self
.update_item_rec(
tx,
&exlf_k[..],
&exlf_khash,
&exlf_subkey,
Some(exlf_vhash),
)?
.unwrap();
intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash);
assert_eq!(int.len(), 1);
}
{
let key2 = key.next_key(khash);
let subhash = self.update_item_rec(tx, k, khash, &key2, new_vhash)?.unwrap();
let subhash = self
.update_item_rec(tx, k, khash, &key2, new_vhash)?
.unwrap();
intermediate_set_child(&mut int, key2.prefix[i], subhash);
if exlf_khash.as_slice()[i] == khash.as_slice()[i] {
assert_eq!(int.len(), 1);

View file

@ -6,19 +6,19 @@ use garage_util::data::*;
use crate::replication::*;
/// Full replication schema: all nodes store everything
/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
#[derive(Clone)]
pub struct TableFullReplication {
/// The membership manager of this node
pub system: Arc<System>,
/// Max number of faults allowed while replicating a record
pub max_faults: usize,
}
impl TableReplication for TableFullReplication {
// Full replication schema: all nodes store everything
// Writes are disseminated in an epidemic manner in the network
// Advantage: do all reads locally, extremely fast
// Inconvenient: only suitable to reasonably small tables
fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
vec![self.system.id]
}

View file

@ -1,6 +1,8 @@
mod parameters;
pub mod fullcopy;
pub mod sharded;
mod fullcopy;
mod sharded;
pub use fullcopy::TableFullReplication;
pub use parameters::*;
pub use sharded::TableShardedReplication;

View file

@ -2,20 +2,25 @@ use garage_rpc::ring::*;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
// Which nodes to send reads from
/// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
// Which nodes to send writes to
/// Which nodes to send writes to
fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
/// Responses needed to consider a write succesfull
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition;
/// List of existing partitions
fn partitions(&self) -> Vec<(Partition, Hash)>;
}

View file

@ -6,22 +6,25 @@ use garage_util::data::*;
use crate::replication::*;
/// Sharded replication schema:
/// - based on the ring of nodes, a certain set of neighbors
/// store entries, given as a function of the position of the
/// entry's hash in the ring
/// - reads are done on all of the nodes that replicate the data
/// - writes as well
#[derive(Clone)]
pub struct TableShardedReplication {
/// The membership manager of this node
pub system: Arc<System>,
/// How many time each data should be replicated
pub replication_factor: usize,
/// How many nodes to contact for a read, should be at most `replication_factor`
pub read_quorum: usize,
/// How many nodes to contact for a write, should be at most `replication_factor`
pub write_quorum: usize,
}
impl TableReplication for TableShardedReplication {
// Sharded replication schema:
// - based on the ring of nodes, a certain set of neighbors
// store entries, given as a function of the position of the
// entry's hash in the ring
// - reads are done on all of the nodes that replicate the data
// - writes as well
fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
let ring = self.system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor)

View file

@ -4,7 +4,9 @@ use garage_util::data::*;
use crate::crdt::CRDT;
/// Trait for field used to partition data
pub trait PartitionKey {
/// Get the key used to partition
fn hash(&self) -> Hash;
}
@ -20,7 +22,9 @@ impl PartitionKey for Hash {
}
}
/// Trait for field used to sort data
pub trait SortKey {
/// Get the key used to sort
fn sort_key(&self) -> &[u8];
}
@ -36,25 +40,34 @@ impl SortKey for Hash {
}
}
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>:
CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{
/// Get the key used to partition
fn partition_key(&self) -> &P;
/// Get the key used to sort
fn sort_key(&self) -> &S;
/// Is the entry a tombstone? Default implementation always return false
fn is_tombstone(&self) -> bool {
false
}
}
/// Trait for the schema used in a table
pub trait TableSchema: Send + Sync {
/// The partition key used in that table
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
/// The sort key used int that table
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
/// They type for an entry in that table
type E: Entry<Self::P, Self::S>;
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
// Action to take if not able to decode current version:
// try loading from an older version
/// Try migrating an entry from an older version
fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
None
}
@ -65,7 +78,5 @@ pub trait TableSchema: Send + Sync {
// to stderr.
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
true
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
}

View file

@ -1,3 +1,4 @@
//! Job runner for futures and async functions
use core::future::Future;
use std::pin::Pin;
use std::sync::Arc;
@ -12,14 +13,15 @@ use crate::error::Error;
type JobOutput = Result<(), Error>;
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
/// Job runner for futures and async functions
pub struct BackgroundRunner {
pub stop_signal: watch::Receiver<bool>,
stop_signal: watch::Receiver<bool>,
queue_in: mpsc::UnboundedSender<(Job, bool)>,
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
}
impl BackgroundRunner {
/// Create a new BackgroundRunner
pub fn new(
n_runners: usize,
stop_signal: watch::Receiver<bool>,
@ -103,7 +105,7 @@ impl BackgroundRunner {
(bgrunner, await_all_done)
}
// Spawn a task to be run in background
/// Spawn a task to be run in background
pub fn spawn<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
@ -115,6 +117,8 @@ impl BackgroundRunner {
.unwrap();
}
/// Spawn a task to be run in background. It may get discarded before running if spawned while
/// the runner is stopping
pub fn spawn_cancellable<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,

View file

@ -1,3 +1,4 @@
//! Contains type and functions related to Garage configuration file
use std::io::Read;
use std::net::SocketAddr;
use std::path::PathBuf;
@ -6,57 +7,89 @@ use serde::{de, Deserialize};
use crate::error::Error;
/// Represent the whole configuration
#[derive(Deserialize, Debug, Clone)]
pub struct Config {
/// Path where to store metadata. Should be fast, but low volume
pub metadata_dir: PathBuf,
/// Path where to store data. Can be slower, but need higher volume
pub data_dir: PathBuf,
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr,
/// Bootstrap peers RPC address
#[serde(deserialize_with = "deserialize_vec_addr")]
pub bootstrap_peers: Vec<SocketAddr>,
/// Consule host to connect to to discover more peers
pub consul_host: Option<String>,
/// Consul service name to use
pub consul_service_name: Option<String>,
/// Max number of concurrent RPC request
#[serde(default = "default_max_concurrent_rpc_requests")]
pub max_concurrent_rpc_requests: usize,
/// Size of data blocks to save to disk
#[serde(default = "default_block_size")]
pub block_size: usize,
#[serde(default = "default_control_write_max_faults")]
pub control_write_max_faults: usize,
/// How many nodes should hold a copy of meta data
#[serde(default = "default_replication_factor")]
pub meta_replication_factor: usize,
/// How many nodes should hold a copy of data
#[serde(default = "default_replication_factor")]
pub data_replication_factor: usize,
/// Enable Zstd compression of block data
pub enable_compression: bool,
/// Zstd compression level used on data blocks
#[serde(default)]
pub compression_level: i32,
/// Configuration for RPC TLS
pub rpc_tls: Option<TlsConfig>,
/// Configuration for S3 api
pub s3_api: ApiConfig,
/// Configuration for serving files as normal web server
pub s3_web: WebConfig,
}
/// Configuration for RPC TLS
#[derive(Deserialize, Debug, Clone)]
pub struct TlsConfig {
/// Path to certificate autority used for all nodes
pub ca_cert: String,
/// Path to public certificate for this node
pub node_cert: String,
/// Path to private key for this node
pub node_key: String,
}
/// Configuration for S3 api
#[derive(Deserialize, Debug, Clone)]
pub struct ApiConfig {
/// Address and port to bind for api serving
pub api_bind_addr: SocketAddr,
/// S3 region to use
pub s3_region: String,
}
/// Configuration for serving files as normal web server
#[derive(Deserialize, Debug, Clone)]
pub struct WebConfig {
/// Address and port to bind for web serving
pub bind_addr: SocketAddr,
/// Suffix to remove from domain name to find bucket
pub root_domain: String,
/// Suffix to add when user-agent request path end with "/"
pub index: String,
}
@ -73,6 +106,7 @@ fn default_control_write_max_faults() -> usize {
1
}
/// Read and parse configuration
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut file = std::fs::OpenOptions::new()
.read(true)

View file

@ -1,8 +1,10 @@
//! Contains common types and functions related to serialization and integrity
use rand::Rng;
use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;
/// An array of 32 bytes
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
pub struct FixedBytes32([u8; 32]);
@ -61,15 +63,20 @@ impl Serialize for FixedBytes32 {
}
impl FixedBytes32 {
/// Access the content as a slice
pub fn as_slice(&self) -> &[u8] {
&self.0[..]
}
/// Access the content as a mutable slice
pub fn as_slice_mut(&mut self) -> &mut [u8] {
&mut self.0[..]
}
/// Copy to a slice
pub fn to_vec(&self) -> Vec<u8> {
self.0.to_vec()
}
/// Try building a FixedBytes32 from a slice
/// Return None if the slice is not 32 bytes long
pub fn try_from(by: &[u8]) -> Option<Self> {
if by.len() != 32 {
return None;
@ -80,9 +87,12 @@ impl FixedBytes32 {
}
}
/// A 32 bytes UUID
pub type UUID = FixedBytes32;
/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance
pub type Hash = FixedBytes32;
/// Compute the sha256 of a slice
pub fn sha256sum(data: &[u8]) -> Hash {
use sha2::{Digest, Sha256};
@ -93,6 +103,7 @@ pub fn sha256sum(data: &[u8]) -> Hash {
hash.into()
}
/// Compute the blake2 of a slice
pub fn blake2sum(data: &[u8]) -> Hash {
use blake2::{Blake2b, Digest};
@ -103,8 +114,10 @@ pub fn blake2sum(data: &[u8]) -> Hash {
hash.into()
}
/// A 64 bit non cryptographic hash
pub type FastHash = u64;
/// Compute a (non cryptographic) of a slice
pub fn fasthash(data: &[u8]) -> FastHash {
use xxhash_rust::xxh3::Xxh3;
@ -113,12 +126,14 @@ pub fn fasthash(data: &[u8]) -> FastHash {
h.digest()
}
/// Generate a random 32 bytes UUID
pub fn gen_uuid() -> UUID {
rand::thread_rng().gen::<[u8; 32]>().into()
}
// RMP serialization with names of fields and variants
/// Serialize to MessagePack
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
where
T: Serialize + ?Sized,
@ -131,10 +146,13 @@ where
Ok(wr)
}
/// Serialize to JSON, truncating long result
pub fn debug_serialize<T: Serialize>(x: T) -> String {
match serde_json::to_string(&x) {
Ok(ss) => {
if ss.len() > 100 {
// TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
// (or more) codepoint
ss[..100].to_string()
} else {
ss

View file

@ -1,9 +1,11 @@
//! Module containing error types used in Garage
use err_derive::Error;
use hyper::StatusCode;
use std::io;
use crate::data::*;
/// RPC related errors
#[derive(Debug, Error)]
pub enum RPCError {
#[error(display = "Node is down: {:?}.", _0)]
@ -28,6 +30,7 @@ pub enum RPCError {
TooManyErrors(Vec<String>),
}
/// Regroup all Garage errors
#[derive(Debug, Error)]
pub enum Error {
#[error(display = "IO error: {}", _0)]

View file

@ -1,3 +1,5 @@
//! Crate containing common functions and types used in Garage
#[macro_use]
extern crate log;
@ -5,4 +7,5 @@ pub mod background;
pub mod config;
pub mod data;
pub mod error;
pub mod persister;
pub mod time;

72
src/util/persister.rs Normal file
View file

@ -0,0 +1,72 @@
use std::io::{Read, Write};
use std::path::PathBuf;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use serde::{Deserialize, Serialize};
use crate::data::*;
use crate::error::Error;
pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
path: PathBuf,
_marker: std::marker::PhantomData<T>,
}
impl<T> Persister<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
pub fn new(base_dir: &PathBuf, file_name: &str) -> Self {
let mut path = base_dir.clone();
path.push(file_name);
Self {
path,
_marker: Default::default(),
}
}
pub fn load(&self) -> Result<T, Error> {
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
Ok(value)
}
pub fn save(&self, t: &T) -> Result<(), Error> {
let bytes = rmp_to_vec_all_named(t)?;
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&self.path)?;
file.write_all(&bytes[..])?;
Ok(())
}
pub async fn load_async(&self) -> Result<T, Error> {
let mut file = tokio::fs::File::open(&self.path).await?;
let mut bytes = vec![];
file.read_to_end(&mut bytes).await?;
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
Ok(value)
}
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
let bytes = rmp_to_vec_all_named(t)?;
let mut file = tokio::fs::File::create(&self.path).await?;
file.write_all(&bytes[..]).await?;
Ok(())
}
}

View file

@ -1,6 +1,8 @@
//! Module containing helper functions to manipulate time
use chrono::{SecondsFormat, TimeZone, Utc};
use std::time::{SystemTime, UNIX_EPOCH};
/// Returns milliseconds since UNIX Epoch
pub fn now_msec() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
@ -8,6 +10,8 @@ pub fn now_msec() -> u64 {
.as_millis() as u64
}
/// Convert a timestamp represented as milliseconds since UNIX Epoch to
/// its RFC3339 representation, such as "2021-01-01T12:30:00Z"
pub fn msec_to_rfc3339(msecs: u64) -> String {
let secs = msecs as i64 / 1000;
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;

View file

@ -3,30 +3,37 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
/// An error received from the API crate
#[error(display = "API error: {}", _0)]
ApiError(#[error(source)] garage_api::error::Error),
ApiError(#[error(source)] garage_api::Error),
// Category: internal error
/// Error internal to garage
#[error(display = "Internal error: {}", _0)]
InternalError(#[error(source)] GarageError),
/// The file does not exist
#[error(display = "Not found")]
NotFound,
// Category: bad request
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8(#[error(source)] std::str::Utf8Error),
/// The client send a header with invalid value
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
/// The client sent a request without host, or with unsupported method
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
}
impl Error {
/// Transform errors into http status code
pub fn http_status_code(&self) -> StatusCode {
match self {
Error::NotFound => StatusCode::NOT_FOUND,

View file

@ -1,6 +1,9 @@
//! Crate for handling web serving of s3 bucket
#[macro_use]
extern crate log;
pub mod error;
mod error;
pub use error::Error;
pub mod web_server;
mod web_server;
pub use web_server::run_web_server;

View file

@ -18,6 +18,7 @@ use garage_model::garage::Garage;
use garage_table::*;
use garage_util::error::Error as GarageError;
/// Run a web server
pub async fn run_web_server(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,