Compare commits

...

35 commits
main ... main

Author SHA1 Message Date
c3bd672d58
Reorganize code in s3_put.rs
All checks were successful
continuous-integration/drone/push Build is passing
2021-04-07 10:00:47 +02:00
93a9f96130
remove useless comment
All checks were successful
continuous-integration/drone/push Build is passing
2021-04-06 23:38:30 +02:00
7380f3855c Merge pull request 'Use content defined chunking' (#43) from trinity-1686a/garage:content-defined-chunking into main
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #43
2021-04-06 22:18:41 +02:00
6cbc8d6ec9 mark branch as unreachable
All checks were successful
continuous-integration/drone/pr Build is passing
2021-04-06 16:53:39 +02:00
b3b0b20d72 run fmt
All checks were successful
continuous-integration/drone/pr Build is passing
2021-04-06 02:54:05 +02:00
47d0aee9f8 change crate used for cdc
Some checks failed
continuous-integration/drone/pr Build is failing
previous one seemed to output incorrect results
2021-04-06 02:50:07 +02:00
6e0cb2dfb6 add content defined chuking 2021-04-06 02:50:07 +02:00
c4c4b7dedc
Update documentation
All checks were successful
continuous-integration/drone/push Build is passing
2021-04-06 00:01:49 +02:00
8225fa2e4b Merge pull request 'Improved bootstraping procedure' (#56) from better_bootstrap into main
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #56
2021-04-05 23:50:33 +02:00
ab67bd88de
Try to fix Drone
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2021-04-05 23:41:50 +02:00
0f192a96b5
small simplify
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is passing
2021-04-05 23:21:25 +02:00
7b85056942
Merge discovery loop with consul
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is passing
2021-04-05 23:04:08 +02:00
7fd1f9a869
cargo fmt
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2021-04-05 20:42:57 +02:00
c5d8dc7d6d
Print stats
Some checks failed
continuous-integration/drone/push Build is failing
2021-04-05 20:42:46 +02:00
fa11cb746a
Cargo fmt
All checks were successful
continuous-integration/drone/push Build is passing
2021-04-05 20:35:26 +02:00
f11bd80d2a
Keep old data
Some checks failed
continuous-integration/drone/push Build is failing
2021-04-05 20:33:24 +02:00
595dc0ed0d
Persist directly and not in background
Some checks failed
continuous-integration/drone/push Build is failing
2021-04-05 20:26:01 +02:00
78eeaab5ed
Install rustfmt
Some checks reported errors
continuous-integration/drone/push Build encountered an error
2021-04-05 20:00:48 +02:00
22fbb3b892
Fix Drone CI signature
Some checks failed
continuous-integration/drone/push Build is failing
2021-04-05 19:58:42 +02:00
0eb5baea1a
Improve bootstraping: do it regularly; persist peer list
Some checks are pending
continuous-integration/drone/push Build is running
2021-04-05 19:55:53 +02:00
7d772737a5 Add signature to Drone CI
All checks were successful
continuous-integration/drone/push Build is passing
2021-04-05 12:10:28 +02:00
4c2d8f5a96 test2
Some checks are pending
continuous-integration/drone/push Build is pending
2021-04-05 12:09:20 +02:00
0325086dac test 2021-04-05 12:08:05 +02:00
700925263f
Drone CI badge on branch main
All checks were successful
continuous-integration/drone/push Build is passing
2021-04-05 11:45:20 +02:00
6c83f66700 fix command for adding node
Some checks reported errors
continuous-integration/drone/push Build was killed
2021-04-02 23:23:14 +02:00
55e4a93bad Section on recovering from failures
Some checks reported errors
continuous-integration/drone/push Build was killed
2021-04-02 19:30:29 +02:00
LUXEY Adrien
9cb9945131 [doc] Added mention that GarageHQ is hosted with Garage
All checks were successful
continuous-integration/drone/push Build is passing
2021-03-22 16:19:58 +01:00
7f9c1d5595 publish website only in correct repo
All checks were successful
continuous-integration/drone/push Build is passing
2021-03-22 12:49:21 +01:00
991cf1b818 fix typos
All checks were successful
continuous-integration/drone/push Build is passing
2021-03-19 17:55:10 +01:00
e26cb2640d Merge remote-tracking branch 'origin/feature/mdbook' 2021-03-19 17:49:57 +01:00
22041e924b Merge pull request 'intro.md: fix some typos, errors & dead links, plus some stylistic stuff' (#51) from adrien/garage:feature/mdbook into feature/mdbook
Reviewed-on: #51
2021-03-19 17:14:41 +01:00
LUXEY Adrien
b119e9d3c4 intro.md: fix some typos, errors & dead links, plus some stylistic stuff
modifié :         doc/book/src/intro.md
2021-03-19 16:51:05 +01:00
3e6534d7a8 Fix garage_util description
Some checks failed
continuous-integration/drone/push Build is failing
2021-03-18 11:46:29 +01:00
2dae4a25d6 Fix some typos
Some checks failed
continuous-integration/drone/push Build is failing
2021-03-18 11:35:50 +01:00
fcd566e89d Fix a table in the doc
All checks were successful
continuous-integration/drone/push Build is passing
2021-03-18 11:26:02 +01:00
25 changed files with 741 additions and 385 deletions

View file

@ -1,3 +1,4 @@
---
kind: pipeline
name: default
@ -35,7 +36,9 @@ steps:
commands:
- apt-get update
- apt-get install --yes libsodium-dev
- rustup component add rustfmt
- pwd
- cargo fmt -- --check
- cargo build
- name: cargo-test
@ -106,6 +109,15 @@ steps:
endpoint: https://garage.deuxfleurs.fr
region: garage
when:
event:
- push
branch:
- main
repo:
- Deuxfleurs/garage
---
kind: signature
hmac: bfe75f47e5eecdd1f6dd8fd3cf1ea359b0215243d06ac767c51a4b4e363e963e
...

88
Cargo.lock generated
View file

@ -221,6 +221,12 @@ dependencies = [
"synstructure",
]
[[package]]
name = "fastcdc"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5afa29be46b12c8c380b997def8d1ac77c2665da93eb0a768fab0bf4db79333f"
[[package]]
name = "fnv"
version = "1.0.7"
@ -365,7 +371,7 @@ dependencies = [
"hex",
"log",
"pretty_env_logger",
"rand",
"rand 0.8.3",
"rmp-serde",
"serde",
"sled",
@ -383,6 +389,7 @@ dependencies = [
"chrono",
"crypto-mac 0.10.0",
"err-derive",
"fastcdc",
"futures",
"futures-util",
"garage_model",
@ -397,6 +404,7 @@ dependencies = [
"log",
"md-5",
"percent-encoding",
"rand 0.7.3",
"roxmltree",
"sha2",
"tokio",
@ -415,7 +423,7 @@ dependencies = [
"garage_util",
"hex",
"log",
"rand",
"rand 0.8.3",
"rmp-serde",
"serde",
"serde_bytes",
@ -459,7 +467,7 @@ dependencies = [
"garage_util",
"hexdump",
"log",
"rand",
"rand 0.8.3",
"rmp-serde",
"serde",
"serde_bytes",
@ -479,7 +487,7 @@ dependencies = [
"http",
"hyper",
"log",
"rand",
"rand 0.8.3",
"rmp-serde",
"rustls",
"serde",
@ -529,6 +537,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "getrandom"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce"
dependencies = [
"cfg-if",
"libc",
"wasi 0.9.0+wasi-snapshot-preview1",
]
[[package]]
name = "getrandom"
version = "0.2.2"
@ -537,7 +556,7 @@ checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8"
dependencies = [
"cfg-if",
"libc",
"wasi",
"wasi 0.10.2+wasi-snapshot-preview1",
]
[[package]]
@ -1043,6 +1062,19 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [
"getrandom 0.1.16",
"libc",
"rand_chacha 0.2.2",
"rand_core 0.5.1",
"rand_hc 0.2.0",
]
[[package]]
name = "rand"
version = "0.8.3"
@ -1050,9 +1082,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
"rand_chacha 0.3.0",
"rand_core 0.6.2",
"rand_hc 0.3.0",
]
[[package]]
name = "rand_chacha"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core 0.5.1",
]
[[package]]
@ -1062,7 +1104,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d"
dependencies = [
"ppv-lite86",
"rand_core",
"rand_core 0.6.2",
]
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom 0.1.16",
]
[[package]]
@ -1071,7 +1122,16 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7"
dependencies = [
"getrandom",
"getrandom 0.2.2",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core 0.5.1",
]
[[package]]
@ -1080,7 +1140,7 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73"
dependencies = [
"rand_core",
"rand_core 0.6.2",
]
[[package]]
@ -1581,6 +1641,12 @@ dependencies = [
"try-lock",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"

View file

@ -2,7 +2,7 @@ BIN=target/release/garage
DOCKER=lxpz/garage_amd64
all:
clear; RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --no-default-features
clear; cargo build
$(BIN):
RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --release --no-default-features

View file

@ -1,4 +1,4 @@
Garage [![Build Status](https://drone.deuxfleurs.fr/api/badges/Deuxfleurs/garage/status.svg)](https://drone.deuxfleurs.fr/Deuxfleurs/garage)
Garage [![Build Status](https://drone.deuxfleurs.fr/api/badges/Deuxfleurs/garage/status.svg?ref=refs/heads/main)](https://drone.deuxfleurs.fr/Deuxfleurs/garage)
===
<p align="center" style="text-align:center;">

View file

@ -14,6 +14,7 @@
- [Host a website](./cookbook/website.md)
- [Integrate as a media backend]()
- [Operate a cluster]()
- [Recovering from failures](./cookbook/recovering.md)
- [Reference Manual](./reference_manual/index.md)
- [Garage CLI]()

View file

@ -2,4 +2,4 @@
A cookbook, when you cook, is a collection of recipes.
Similarly, Garage's cookbook contains a collection of recipes that are known to works well!
This chapter could also be referred as "Tutorials" or "Best practises".
This chapter could also be referred as "Tutorials" or "Best practices".

View file

@ -0,0 +1,99 @@
# Recovering from failures
Garage is meant to work on old, second-hand hardware.
In particular, this makes it likely that some of your drives will fail, and some manual intervention will be needed.
Fear not! For Garage is fully equipped to handle drive failures, in most common cases.
## A note on availability of Garage
With nodes dispersed in 3 datacenters or more, here are the guarantees Garage provides with the default replication strategy (3 copies of all data, which is the recommended value):
- The cluster remains fully functional as long as the machines that fail are in only one datacenter. This includes a whole datacenter going down due to power/Internet outage.
- No data is lost as long as the machines that fail are in at most two datacenters.
Of course this only works if your Garage nodes are correctly configured to be aware of the datacenter in which they are located.
Make sure this is the case using `garage status` to check on the state of your cluster's configuration.
## First option: removing a node
If you don't have spare parts (HDD, SDD) to replace the failed component, and if there are enough remaining nodes in your cluster
(at least 3), you can simply remove the failed node from Garage's configuration.
Note that if you **do** intend to replace the failed parts by new ones, using this method followed by adding back the node is **not recommended** (although it should work),
and you should instead use one of the methods detailed in the next sections.
Removing a node is done with the following command:
```
garage node remove --yes <node_id>
```
(you can get the `node_id` of the failed node by running `garage status`)
This will repartition the data and ensure that 3 copies of everything are present on the nodes that remain available.
## Replacement scenario 1: only data is lost, metadata is fine
The recommended deployment for Garage uses an SSD to store metadata, and an HDD to store blocks of data.
In the case where only a single HDD crashes, the blocks of data are lost but the metadata is still fine.
This is very easy to recover by setting up a new HDD to replace the failed one.
The node does not need to be fully replaced and the configuration doesn't need to change.
We just need to tell Garage to get back all the data blocks and store them on the new HDD.
First, set up a new HDD to store Garage's data directory on the failed node, and restart Garage using
the existing configuration. Then, run:
```
garage repair -a --yes blocks
```
This will re-synchronize blocks of data that are missing to the new HDD, reading them from copies located on other nodes.
You can check on the advancement of this process by doing the following command:
```
garage stats -a
```
Look out for the following output:
```
Block manager stats:
resync queue length: 26541
```
This indicates that one of the Garage node is in the process of retrieving missing data from other nodes.
This number decreases to zero when the node is fully synchronized.
## Replacement scenario 2: metadata (and possibly data) is lost
This scenario covers the case where a full node fails, i.e. both the metadata directory and
the data directory are lost, as well as the case where only the metadata directory is lost.
To replace the lost node, we will start from an empty metadata directory, which means
Garage will generate a new node ID for the replacement node.
We will thus need to remove the previous node ID from Garage's configuration and replace it by the ID of the new node.
If your data directory is stored on a separate drive and is still fine, you can keep it, but it is not necessary to do so.
In all cases, the data will be rebalanced and the replacement node will not store the same pieces of data
as were originally stored on the one that failed. So if you keep the data files, the rebalancing
might be faster but most of the pieces will be deleted anyway from the disk and replaced by other ones.
First, set up a new drive to store the metadata directory for the replacement node (a SSD is recommended),
and for the data directory if necessary. You can then start Garage on the new node.
The restarted node should generate a new node ID, and it should be shown as `NOT CONFIGURED` in `garage status`.
The ID of the lost node should be shown in `garage status` in the section for disconnected/unavailable nodes.
Then, replace the broken node by the new one, using:
```
garage node configure --replace <old_node_id> \
-c <capacity> -d <datacenter> -t <node_tag> <new_node_id>
```
Garage will then start synchronizing all required data on the new node.
This process can be monitored using the `garage stats -a` command.

View file

@ -7,14 +7,14 @@ We did not test other architecture/operating system but, as long as your archite
## From Docker
Our docker image is currently named `lxpz/garage_amd64` and is stored on the [Docker Hub](https://hub.docker.com/r/lxpz/garage_amd64/tags?page=1&ordering=last_updated).
We encourage you to use a fixed tag (eg. `v0.1.1d`) and not the `latest` tag.
For this example, we will use the latest published version at the time of the writing which is `v0.1.1d` but it's up to you
We encourage you to use a fixed tag (eg. `v0.2.1`) and not the `latest` tag.
For this example, we will use the latest published version at the time of the writing which is `v0.2.1` but it's up to you
to check [the most recent versions on the Docker Hub](https://hub.docker.com/r/lxpz/garage_amd64/tags?page=1&ordering=last_updated).
For example:
```
sudo docker pull lxpz/garage_amd64:v0.1.1d
sudo docker pull lxpz/garage_amd64:v0.2.1
```
## From source

View file

@ -40,22 +40,18 @@ garagectl key new --name nextcloud-app-key
You will have the following output (this one is fake, `key_id` and `secret_key` were generated with the openssl CLI tool):
```javascript
Key {
key_id: "GK3515373e4c851ebaad366558",
secret_key: "7d37d093435a41f2aab8f13c19ba067d9776c90215f56614adad6ece597dbb34",
name: "nextcloud-app-key",
name_timestamp: 1603280506694,
deleted: false,
authorized_buckets: []
}
```
Key name: nextcloud-app-key
Key ID: GK3515373e4c851ebaad366558
Secret key: 7d37d093435a41f2aab8f13c19ba067d9776c90215f56614adad6ece597dbb34
Authorized buckets:
```
Check that everything works as intended (be careful, info works only with your key identifier and not with its friendly name!):
Check that everything works as intended:
```
garagectl key list
garagectl key info GK3515373e4c851ebaad366558
garagectl key info nextcloud-app-key
```
## Allow a key to access a bucket
@ -67,7 +63,7 @@ garagectl bucket allow \
--read \
--write
nextcloud-bucket \
--key GK3515373e4c851ebaad366558
--key nextcloud-app-key
```
You can check at any times allowed keys on your bucket with:

View file

@ -11,20 +11,20 @@ As this part is not relevant for a test cluster, you can use this one-liner to c
```bash
garagectl status | grep UNCONFIGURED | grep -Po '^[0-9a-f]+' | while read id; do
garagectl node configure -d dc1 -n 10 $id
garagectl node configure -d dc1 -c 1 $id
done
```
## Real-world cluster
For our example, we will suppose we have the following infrastructure (Tokens, Identifier and Datacenter are specific values to garage described in the following):
For our example, we will suppose we have the following infrastructure (Capacity, Identifier and Datacenter are specific values to garage described in the following):
| Location | Name | Disk Space | `Tokens` | `Identifier` | `Datacenter` |
|----------|---------|------------|----------|--------------|--------------|
| Paris | Mercury | 1 To | `100` | `8781c5` | `par1` |
| Paris | Venus | 2 To | `200` | `2a638e` | `par1` |
| London | Earth | 2 To | `200` | `68143d` | `lon1` |
| Brussels | Mars | 1.5 To | `150` | `212f75` | `bru1` |
| Location | Name | Disk Space | `Capacity` | `Identifier` | `Datacenter` |
|----------|---------|------------|------------|--------------|--------------|
| Paris | Mercury | 1 To | `2` | `8781c5` | `par1` |
| Paris | Venus | 2 To | `4` | `2a638e` | `par1` |
| London | Earth | 2 To | `4` | `68143d` | `lon1` |
| Brussels | Mars | 1.5 To | `3` | `212f75` | `bru1` |
### Identifier
@ -45,14 +45,15 @@ garagectl status
It will display the IP address associated with each node; from the IP address you will be able to recognize the node.
### Tokens
### Capacity
Garage reasons on an arbitrary metric about disk storage that is named "tokens".
The number of tokens must be proportional to the disk space dedicated to the node.
Additionaly, ideally the number of tokens must be in the order of magnitude of 100
to provide a good trade-off between data load balancing and performances (*this sentence must be verified, it may be wrong*).
Garage reasons on an arbitrary metric about disk storage that is named the *capacity* of a node.
The capacity configured in Garage must be proportional to the disk space dedicated to the node.
Additionaly, the capacity values used in Garage should be as small as possible, with
1 ideally representing the size of your smallest server.
Here we chose 1 token = 10 Go but you are free to select the value that best fit your needs.
Here we chose that 1 unit of capacity = 0.5 To, so that we can express servers of size
1 To and 2 To, as wel as the intermediate size 1.5 To.
### Datacenter
@ -65,8 +66,8 @@ Behind the scene, garage will try to store the same data on different sites to p
Given the information above, we will configure our cluster as follow:
```
garagectl node configure --datacenter par1 -n 100 -t mercury 8781c5
garagectl node configure --datacenter par1 -n 200 -t venus 2a638e
garagectl node configure --datacenter lon1 -n 200 -t earth 68143d
garagectl node configure --datacenter bru1 -n 150 -t mars 212f75
garagectl node configure --datacenter par1 -c 2 -t mercury 8781c5
garagectl node configure --datacenter par1 -c 4 -t venus 2a638e
garagectl node configure --datacenter lon1 -c 4 -t earth 68143d
garagectl node configure --datacenter bru1 -c 3 -t mars 212f75
```

View file

@ -124,8 +124,8 @@ For our example, we will suppose the following infrastructure:
|----------|---------|------------|------------|
| Paris | Mercury | fc00:1::1 | 1 To |
| Paris | Venus | fc00:1::2 | 2 To |
| London | Earth | fc00:1::2 | 2 To |
| Brussels | Mars | fc00:B::1 | 1.5 To |
| London | Earth | fc00:B::1 | 2 To |
| Brussels | Mars | fc00:F::1 | 1.5 To |
On each machine, we will have a similar setup, especially you must consider the following folders/files:
- `/etc/garage/pki`: Garage certificates, must be generated on your computer and copied on the servers

View file

@ -4,63 +4,68 @@
</a>
</p>
```
This very website is hosted using Garage. In other words: the doc is the PoC!
```
# The Garage Geo-Distributed Data Store
Garage is a lightweight geo-distributed data store.
It comes from the observation that despite numerous object stores
many people have broken data management policies (backup/replication on a single site or none at all).
To promote better data management policies, with focused on the following desirable properties:
To promote better data management policies, we focused on the following **desirable properties**:
- **Self-contained & lightweight**: works everywhere and integrates well in existing environments to target hyperconverged infrastructures
- **Highly resilient**: highly resilient to network failures, network latency, disk failures, sysadmin failures
- **Simple**: simple to understand, simple to operate, simple to debug
- **Internet enabled**: made for multi-sites (eg. datacenter, offices, etc.) interconnected through a regular internet connection.
- **Self-contained & lightweight**: works everywhere and integrates well in existing environments to target [hyperconverged infrastructures](https://en.wikipedia.org/wiki/Hyper-converged_infrastructure).
- **Highly resilient**: highly resilient to network failures, network latency, disk failures, sysadmin failures.
- **Simple**: simple to understand, simple to operate, simple to debug.
- **Internet enabled**: made for multi-sites (eg. datacenters, offices, households, etc.) interconnected through regular Internet connections.
We also noted that the pursuit of some other goals are detrimental to our initial goals.
The following have been identified has non-goals, if it matters to you, you should not use Garage:
The following has been identified as **non-goals** (if these points matter to you, you should not use Garage):
- **Extreme performances**: high performances constrain a lot the design and the infrastructure; we seek performances through minimalism only.
- **Feature extensiveness**: complete implementation of the S3 API or any other API to make garage a drop-in replacement is not targeted as it could lead to decisions impacting our desirable properties.
- **Storage optimizations**: erasure coding or any other coding technique both increase the difficulty of placing data and synchronizing; we limit ourselves to duplication.
- **POSIX/Filesystem compatibility**: we do not aim at being POSIX compatible or to emulate any kind of filesystem. Indeed, in a distributed environment, such syncronizations are translated in network messages that impose severe constraints on the deployment.
- **POSIX/Filesystem compatibility**: we do not aim at being POSIX compatible or to emulate any kind of filesystem. Indeed, in a distributed environment, such synchronizations are translated in network messages that impose severe constraints on the deployment.
## Supported and planned protocols
Garage speaks (or will speak) the following protocols:
- [S3](https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html) - *SUPPORTED* - Enable applications to store large blobs such as pictures, video, images, documents, etc. S3 is versatile enough to also be used to publish a static website.
- [IMAP](https://github.com/go-pluto/pluto) - *PLANNED* - email storage is quite complex to get good oerformances.
To keep performances optimals, most imap servers only support on-disk storage.
- [IMAP](https://github.com/go-pluto/pluto) - *PLANNED* - email storage is quite complex to get good performances.
To keep performances optimal, most IMAP servers only support on-disk storage.
We plan to add logic to Garage to make it a viable solution for email storage.
- *More to come*
## Use Cases
**[Deuxfleurs](https://deuxfleurs.fr):** Garage is used by Deuxfleurs which is a non-profit hosting organization.
Especially, it is used to host their main website, this documentation and some of its members's blogs. Additionally,
Garage is used as a [backend for Nextcloud](https://docs.nextcloud.com/server/20/admin_manual/configuration_files/primary_storage.html). Deuxfleurs also plans to use Garage as their [Matrix's media backend](https://github.com/matrix-org/synapse-s3-storage-provider) and has the backend of [OCIS](https://github.com/owncloud/ocis).
Especially, it is used to host their main website, this documentation and some of its members' blogs.
Additionally, Garage is used as a [backend for Nextcloud](https://docs.nextcloud.com/server/20/admin_manual/configuration_files/primary_storage.html).
Deuxfleurs also plans to use Garage as their [Matrix's media backend](https://github.com/matrix-org/synapse-s3-storage-provider) and as the backend of [OCIS](https://github.com/owncloud/ocis).
*Are you using Garage? [Open a pull request](https://git.deuxfleurs.fr/Deuxfleurs/garage/) to add your organization here!*
## Comparison to existing software
**[Minio](https://min.io/) :** Minio shares our *self-contained & lightweight* goal but selected two of our non-goals: *storage optimizations* through erasure coding and *POSIX/Filesystem compatibility* through strong consistency.
However, by pursuing these two non-goals, minio do not reach our desirable properties.
First, it fails on the *simple* property: due to the erasure coding, minio has severe limitations on how drives can be added or deleted from a cluster.
Second, it fails on the *interned enabled* property: due to its strong consistency, minio is latency sensitive.
Furthermore, minio has no knowledge of "sites" and thus can not distribute data to minimize the failure of a given site.
**[MinIO](https://min.io/):** MinIO shares our *Self-contained & lightweight* goal but selected two of our non-goals: *Storage optimizations* through erasure coding and *POSIX/Filesystem compatibility* through strong consistency.
However, by pursuing these two non-goals, MinIO do not reach our desirable properties.
Firstly, it fails on the *Simple* property: due to the erasure coding, MinIO has severe limitations on how drives can be added or deleted from a cluster.
Secondly, it fails on the *Internet enabled* property: due to its strong consistency, MinIO is latency sensitive.
Furthermore, MinIO has no knowledge of "sites" and thus can not distribute data to minimize the failure of a given site.
**[Openstack Swift](https://docs.openstack.org/swift/latest/):**
OpenStack Swift at least fails on the *self-contained & lightweight* goal.
Starting it requires around 8Gb of RAM, which is too much especially in an hyperconverged infrastructure.
It seems also to be far from *Simple*.
OpenStack Swift at least fails on the *Self-contained & lightweight* goal.
Starting it requires around 8GB of RAM, which is too much especially in an hyperconverged infrastructure.
We also do not classify Swift as *Simple*.
**[Ceph](https://ceph.io/ceph-storage/object-storage/):**
This review holds for the whole Ceph stack, including the RADOS paper, Ceph Object Storage module, the RADOS Gateway, etc.
At is core, Ceph has been designed to provide *POSIX/Filesystem compatibility* which requires strong consistency, which in turn
makes Ceph latency sensitive and fails our *Internet enabled* goal.
Due to its industry oriented design, Ceph is also far from being *Simple* to operate and from being *self-contained & lightweight* which makes it hard to integrate it in an hyperconverged infrastructure.
In a certain way, Ceph and Minio are closer togethers than they are from Garage or OpenStack Swift.
At its core, Ceph has been designed to provide *POSIX/Filesystem compatibility* which requires strong consistency, which in turn
makes Ceph latency-sensitive and fails our *Internet enabled* goal.
Due to its industry oriented design, Ceph is also far from being *Simple* to operate and from being *Self-contained & lightweight* which makes it hard to integrate it in an hyperconverged infrastructure.
In a certain way, Ceph and MinIO are closer together than they are from Garage or OpenStack Swift.
*More comparisons are available in our [Related Work](design/related_work.md) chapter.*
@ -71,29 +76,29 @@ We reference here other places on the Internet where you can learn more about Ga
### Rust API (docs.rs)
If you encounter a specific bug in Garage or plan to patch it, you may jump directly to the source code documentation!
If you encounter a specific bug in Garage or plan to patch it, you may jump directly to the source code's documentation!
- [garage\_api](https://docs.rs/garage_api/latest/garage_api/) - contains the S3 standard API endpoint
- [garage\_model](https://docs.rs/garage_model/latest/garage_model/) - contains Garage's model built on the table abstraction
- [garage\_rpc](https://docs.rs/garage_rpc/latest/garage_rpc/) - contains Garage's federation protocol
- [garage\_table](https://docs.rs/garage_table/latest/garage_table/) - contains core Garage's CRDT datatypes
- [garage\_util](https://docs.rs/garage_util/latest/garage_util/) - contains garage entrypoints (daemon, cli)
- [garage\_util](https://docs.rs/garage_util/latest/garage_util/) - contains garage helpers
- [garage\_web](https://docs.rs/garage_web/latest/garage_web/) - contains the S3 website endpoint
### Talks
We love to talk and hear about Garage, that's why we keep a log here:
- [(fr, 2020-12-02) Garage : jouer dans la cour des grands quand on est un hébergeur associatif](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/master/doc/20201202_talk/talk.pdf)
- [(fr, 2020-12-02) Garage : jouer dans la cour des grands quand on est un hébergeur associatif](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/main/doc/20201202_talk/talk.pdf)
*Did you write or talk about Garage? [Open a pull request](https://git.deuxfleurs.fr/Deuxfleurs/garage/) to add a link here!*
## Community
If you want to discuss with us, you can join our Matrix channel at [#garage:deuxfleurs.fr](https://matrix.to/#/#garage:deuxfleurs.fr).
Our code and our issue tracker, which is the place where you should report bugs, are managed on [Deuxfleurs' Gitea](https://git.deuxfleurs.fr/Deuxfleurs/garage).
Our code repository and issue tracker, which is the place where you should report bugs, is managed on [Deuxfleurs' Gitea](https://git.deuxfleurs.fr/Deuxfleurs/garage).
## License
Garage, all the source code, is released under the [AGPL v3 License](https://www.gnu.org/licenses/agpl-3.0.en.html).
Garage's source code, is released under the [AGPL v3 License](https://www.gnu.org/licenses/agpl-3.0.en.html).
Please note that if you patch Garage and then use it to provide any service over a network, you must share your code!

View file

@ -22,10 +22,12 @@ bytes = "1.0"
chrono = "0.4"
crypto-mac = "0.10"
err-derive = "0.3"
fastcdc = "1.0.5"
hex = "0.4"
hmac = "0.10"
log = "0.4"
md-5 = "0.9"
rand = "0.7"
sha2 = "0.9"
futures = "0.3"

View file

@ -2,6 +2,7 @@ use std::collections::{BTreeMap, VecDeque};
use std::fmt::Write;
use std::sync::Arc;
use fastcdc::{Chunk, FastCDC};
use futures::stream::*;
use hyper::{Body, Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
@ -22,6 +23,8 @@ use crate::encoding::*;
use crate::error::*;
use crate::signature::verify_signed_content;
// ---- PutObject call ----
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
@ -150,159 +153,6 @@ pub async fn handle_put(
Ok(put_response(version_uuid, md5sum_hex))
}
/// Validate MD5 sum against content-md5 header
/// and sha256sum against signed content-sha256
fn ensure_checksum_matches(
data_md5sum: &[u8],
data_sha256sum: garage_util::data::FixedBytes32,
content_md5: Option<&str>,
content_sha256: Option<garage_util::data::FixedBytes32>,
) -> Result<(), Error> {
if let Some(expected_sha256) = content_sha256 {
if expected_sha256 != data_sha256sum {
return Err(Error::BadRequest(format!(
"Unable to validate x-amz-content-sha256"
)));
} else {
trace!("Successfully validated x-amz-content-sha256");
}
}
if let Some(expected_md5) = content_md5 {
if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
return Err(Error::BadRequest(format!("Unable to validate content-md5")));
} else {
trace!("Successfully validated content-md5");
}
}
Ok(())
}
async fn read_and_put_blocks(
garage: &Garage,
version: &Version,
part_number: u64,
first_block: Vec<u8>,
first_block_hash: Hash,
chunker: &mut BodyChunker,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
let mut md5hasher = Md5::new();
let mut sha256hasher = Sha256::new();
md5hasher.update(&first_block[..]);
sha256hasher.update(&first_block[..]);
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
&garage,
&version,
part_number,
0,
first_block_hash,
first_block.len() as u64,
);
let mut put_curr_block = garage
.block_manager
.rpc_put_block(first_block_hash, first_block);
loop {
let (_, _, next_block) =
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
sha256hasher.update(&block[..]);
let block_hash = blake2sum(&block[..]);
let block_len = block.len();
put_curr_version_block = put_block_meta(
&garage,
&version,
part_number,
next_offset as u64,
block_hash,
block_len as u64,
);
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
next_offset += block_len;
} else {
break;
}
}
let total_size = next_offset as u64;
let data_md5sum = md5hasher.finalize();
let data_sha256sum = sha256hasher.finalize();
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
Ok((total_size, data_md5sum, data_sha256sum))
}
async fn put_block_meta(
garage: &Garage,
version: &Version,
part_number: u64,
offset: u64,
hash: Hash,
size: u64,
) -> Result<(), GarageError> {
let mut version = version.clone();
version.blocks.put(
VersionBlockKey {
part_number,
offset,
},
VersionBlock { hash, size },
);
let block_ref = BlockRef {
block: hash,
version: version.uuid,
deleted: false.into(),
};
futures::try_join!(
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
)?;
Ok(())
}
struct BodyChunker {
body: Body,
read_all: bool,
block_size: usize,
buf: VecDeque<u8>,
}
impl BodyChunker {
fn new(body: Body, block_size: usize) -> Self {
Self {
body,
read_all: false,
block_size,
buf: VecDeque::with_capacity(2 * block_size),
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
while !self.read_all && self.buf.len() < self.block_size {
if let Some(block) = self.body.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
self.buf.extend(&bytes[..]);
} else {
self.read_all = true;
}
}
if self.buf.len() == 0 {
Ok(None)
} else if self.buf.len() <= self.block_size {
let block = self.buf.drain(..).collect::<Vec<u8>>();
Ok(Some(block))
} else {
let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>();
Ok(Some(block))
}
}
}
pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> {
Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid))
@ -311,6 +161,8 @@ pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> {
.unwrap()
}
// ---- Mutlipart upload calls ----
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
req: &Request<Body>,
@ -575,59 +427,7 @@ pub async fn handle_abort_multipart_upload(
Ok(Response::new(Body::from(vec![])))
}
fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
Ok(req
.headers()
.get(hyper::header::CONTENT_TYPE)
.map(|x| x.to_str())
.unwrap_or(Ok("blob"))?
.to_string())
}
pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let content_type = get_mime_type(req)?;
let mut other = BTreeMap::new();
// Preserve standard headers
let standard_header = vec![
hyper::header::CACHE_CONTROL,
hyper::header::CONTENT_DISPOSITION,
hyper::header::CONTENT_ENCODING,
hyper::header::CONTENT_LANGUAGE,
hyper::header::EXPIRES,
];
for h in standard_header.iter() {
if let Some(v) = req.headers().get(h) {
match v.to_str() {
Ok(v_str) => {
other.insert(h.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", h, e);
}
}
}
}
// Preserve x-amz-meta- headers
for (k, v) in req.headers().iter() {
if k.as_str().starts_with("x-amz-meta-") {
match v.to_str() {
Ok(v_str) => {
other.insert(k.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", k, e);
}
}
}
}
Ok(ObjectVersionHeaders {
content_type,
other,
})
}
// ---- Parsing input to multipart upload calls ----
fn decode_upload_id(id: &str) -> Result<UUID, Error> {
let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?;
@ -674,3 +474,224 @@ fn parse_complete_multpart_upload_body(
Some(parts)
}
// ---- Common code ----
pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let content_type = req
.headers()
.get(hyper::header::CONTENT_TYPE)
.map(|x| x.to_str())
.unwrap_or(Ok("blob"))?
.to_string();
let mut other = BTreeMap::new();
// Preserve standard headers
let standard_header = vec![
hyper::header::CACHE_CONTROL,
hyper::header::CONTENT_DISPOSITION,
hyper::header::CONTENT_ENCODING,
hyper::header::CONTENT_LANGUAGE,
hyper::header::EXPIRES,
];
for h in standard_header.iter() {
if let Some(v) = req.headers().get(h) {
match v.to_str() {
Ok(v_str) => {
other.insert(h.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", h, e);
}
}
}
}
// Preserve x-amz-meta- headers
for (k, v) in req.headers().iter() {
if k.as_str().starts_with("x-amz-meta-") {
match v.to_str() {
Ok(v_str) => {
other.insert(k.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", k, e);
}
}
}
}
Ok(ObjectVersionHeaders {
content_type,
other,
})
}
struct BodyChunker {
body: Body,
read_all: bool,
min_block_size: usize,
avg_block_size: usize,
max_block_size: usize,
buf: VecDeque<u8>,
}
impl BodyChunker {
fn new(body: Body, block_size: usize) -> Self {
let min_block_size = block_size / 4 * 3;
let avg_block_size = block_size;
let max_block_size = block_size * 2;
Self {
body,
read_all: false,
min_block_size,
avg_block_size,
max_block_size,
buf: VecDeque::with_capacity(2 * max_block_size),
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
while !self.read_all && self.buf.len() < self.max_block_size {
if let Some(block) = self.body.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
self.buf.extend(&bytes[..]);
} else {
self.read_all = true;
}
}
if self.buf.len() == 0 {
Ok(None)
} else {
let mut iter = FastCDC::with_eof(
self.buf.make_contiguous(),
self.min_block_size,
self.avg_block_size,
self.max_block_size,
self.read_all,
);
if let Some(Chunk { length, .. }) = iter.next() {
let block = self.buf.drain(..length).collect::<Vec<u8>>();
Ok(Some(block))
} else {
unreachable!("FastCDC returned not chunk")
}
}
}
}
async fn read_and_put_blocks(
garage: &Garage,
version: &Version,
part_number: u64,
first_block: Vec<u8>,
first_block_hash: Hash,
chunker: &mut BodyChunker,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
let mut md5hasher = Md5::new();
let mut sha256hasher = Sha256::new();
md5hasher.update(&first_block[..]);
sha256hasher.update(&first_block[..]);
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
&garage,
&version,
part_number,
0,
first_block_hash,
first_block.len() as u64,
);
let mut put_curr_block = garage
.block_manager
.rpc_put_block(first_block_hash, first_block);
loop {
let (_, _, next_block) =
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
sha256hasher.update(&block[..]);
let block_hash = blake2sum(&block[..]);
let block_len = block.len();
put_curr_version_block = put_block_meta(
&garage,
&version,
part_number,
next_offset as u64,
block_hash,
block_len as u64,
);
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
next_offset += block_len;
} else {
break;
}
}
let total_size = next_offset as u64;
let data_md5sum = md5hasher.finalize();
let data_sha256sum = sha256hasher.finalize();
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
Ok((total_size, data_md5sum, data_sha256sum))
}
async fn put_block_meta(
garage: &Garage,
version: &Version,
part_number: u64,
offset: u64,
hash: Hash,
size: u64,
) -> Result<(), GarageError> {
let mut version = version.clone();
version.blocks.put(
VersionBlockKey {
part_number,
offset,
},
VersionBlock { hash, size },
);
let block_ref = BlockRef {
block: hash,
version: version.uuid,
deleted: false.into(),
};
futures::try_join!(
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
)?;
Ok(())
}
/// Validate MD5 sum against content-md5 header
/// and sha256sum against signed content-sha256
fn ensure_checksum_matches(
data_md5sum: &[u8],
data_sha256sum: garage_util::data::FixedBytes32,
content_md5: Option<&str>,
content_sha256: Option<garage_util::data::FixedBytes32>,
) -> Result<(), Error> {
if let Some(expected_sha256) = content_sha256 {
if expected_sha256 != data_sha256sum {
return Err(Error::BadRequest(format!(
"Unable to validate x-amz-content-sha256"
)));
} else {
trace!("Successfully validated x-amz-content-sha256");
}
}
if let Some(expected_md5) = content_md5 {
if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
return Err(Error::BadRequest(format!("Unable to validate content-md5")));
} else {
trace!("Successfully validated content-md5");
}
}
Ok(())
}

View file

@ -246,15 +246,13 @@ impl AdminRpcHandler {
)))
}
KeyOperation::Import(query) => {
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id)
.await?;
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
if prev_key.is_some() {
return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
}
let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
self.garage.key_table.insert(&imported_key).await?;
Ok(AdminRPC::KeyInfo(imported_key))
}
}
}

View file

@ -5,8 +5,8 @@ use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use garage_util::error::Error;
use garage_util::data::UUID;
use garage_util::error::Error;
use garage_util::time::*;
use garage_rpc::membership::*;
@ -384,7 +384,10 @@ pub async fn cmd_status(
Ok(())
}
pub fn find_matching_node(cand: impl std::iter::Iterator<Item=UUID>, pattern: &str) -> Result<UUID, Error> {
pub fn find_matching_node(
cand: impl std::iter::Iterator<Item = UUID>,
pattern: &str,
) -> Result<UUID, Error> {
let mut candidates = vec![];
for c in cand {
if hex::encode(&c).starts_with(&pattern) {
@ -428,7 +431,10 @@ pub async fn cmd_configure(
for replaced in args.replace.iter() {
let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
if config.members.remove(&replaced_node).is_none() {
return Err(Error::Message(format!("Cannot replace node {:?} as it is not in current configuration", replaced_node)));
return Err(Error::Message(format!(
"Cannot replace node {:?} as it is not in current configuration",
replaced_node
)));
}
}

View file

@ -52,7 +52,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing Garage main data store...");
let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
let bootstrap = garage.system.clone().bootstrap(
&config.bootstrap_peers[..],
config.bootstrap_peers,
config.consul_host,
config.consul_service_name,
);

View file

@ -147,7 +147,9 @@ impl Entry<String, String> for Object {
&self.key
}
fn is_tombstone(&self) -> bool {
self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
self.versions.len() == 1
&& self.versions[0].state
== ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
}
}

View file

@ -11,13 +11,13 @@ use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tokio::sync::watch;
use tokio::sync::Mutex;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::persister::Persister;
use garage_util::time::*;
use crate::consul::get_consul_nodes;
@ -26,7 +26,7 @@ use crate::rpc_client::*;
use crate::rpc_server::*;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
@ -69,7 +69,8 @@ pub struct AdvertisedNode {
pub struct System {
pub id: UUID,
metadata_dir: PathBuf,
persist_config: Persister<NetworkConfig>,
persist_status: Persister<Vec<AdvertisedNode>>,
rpc_local_port: u16,
state_info: StateInfo,
@ -80,11 +81,16 @@ pub struct System {
pub(crate) status: watch::Receiver<Arc<Status>>,
pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,
update_lock: Mutex<Updaters>,
pub background: Arc<BackgroundRunner>,
}
struct Updaters {
update_status: watch::Sender<Arc<Status>>,
update_ring: watch::Sender<Arc<Ring>>,
}
#[derive(Debug, Clone)]
pub struct Status {
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
@ -144,6 +150,25 @@ impl Status {
debug!("END --");
self.hash = blake2sum(nodes_txt.as_bytes());
}
fn to_serializable_membership(&self, system: &System) -> Vec<AdvertisedNode> {
let mut mem = vec![];
for (node, status) in self.nodes.iter() {
let state_info = if *node == system.id {
system.state_info.clone()
} else {
status.state_info.clone()
};
mem.push(AdvertisedNode {
id: *node,
addr: status.addr,
is_up: status.is_up(),
last_seen: status.last_seen,
state_info,
});
}
mem
}
}
fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
@ -169,23 +194,6 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
}
}
fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
let mut path = metadata_dir.clone();
path.push("network_config");
let mut file = std::fs::OpenOptions::new()
.read(true)
.open(path.as_path())?;
let mut net_config_bytes = vec![];
file.read_to_end(&mut net_config_bytes)?;
let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])
.expect("Unable to parse network configuration file (has version format changed?).");
Ok(net_config)
}
impl System {
pub fn new(
metadata_dir: PathBuf,
@ -196,7 +204,10 @@ impl System {
let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
info!("Node ID: {}", hex::encode(&id));
let net_config = match read_network_config(&metadata_dir) {
let persist_config = Persister::new(&metadata_dir, "network_config");
let persist_status = Persister::new(&metadata_dir, "peer_info");
let net_config = match persist_config.load() {
Ok(x) => x,
Err(e) => {
info!(
@ -206,6 +217,7 @@ impl System {
NetworkConfig::new()
}
};
let mut status = Status {
nodes: HashMap::new(),
hash: Hash::default(),
@ -231,14 +243,18 @@ impl System {
let sys = Arc::new(System {
id,
metadata_dir,
persist_config,
persist_status,
rpc_local_port: rpc_server.bind_addr.port(),
state_info,
rpc_http_client,
rpc_client,
status,
ring,
update_lock: Mutex::new((update_status, update_ring)),
update_lock: Mutex::new(Updaters {
update_status,
update_ring,
}),
background,
});
sys.clone().register_handler(rpc_server, rpc_path);
@ -272,14 +288,11 @@ impl System {
}
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let mut path = self.metadata_dir.clone();
path.push("network_config");
let ring = self.ring.borrow().clone();
let data = rmp_to_vec_all_named(&ring.config)?;
let mut f = tokio::fs::File::create(path.as_path()).await?;
f.write_all(&data[..]).await?;
self.persist_config
.save_async(&ring.config)
.await
.expect("Cannot save current cluster configuration");
Ok(())
}
@ -308,26 +321,21 @@ impl System {
pub async fn bootstrap(
self: Arc<Self>,
peers: &[SocketAddr],
peers: Vec<SocketAddr>,
consul_host: Option<String>,
consul_service_name: Option<String>,
) {
let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>();
self.clone().ping_nodes(bootstrap_peers).await;
let self2 = self.clone();
self.background
.spawn_worker(format!("discovery loop"), |stop_signal| {
self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal)
});
let self2 = self.clone();
self.background
.spawn_worker(format!("ping loop"), |stop_signal| {
self2.ping_loop(stop_signal)
});
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
self.background
.spawn_worker(format!("Consul loop"), |stop_signal| {
self2.consul_loop(stop_signal, consul_host, consul_service_name)
});
}
}
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
@ -394,9 +402,7 @@ impl System {
if has_changes {
status.recalculate_hash();
}
if let Err(e) = update_locked.0.send(Arc::new(status)) {
error!("In ping_nodes: could not save status update ({})", e);
}
self.update_status(&update_locked, status).await;
drop(update_locked);
if to_advertise.len() > 0 {
@ -420,7 +426,7 @@ impl System {
let status_hash = status.hash;
let config_version = self.ring.borrow().config.version;
update_locked.0.send(Arc::new(status))?;
self.update_status(&update_locked, status).await;
drop(update_locked);
if is_new || status_hash != ping.status_hash {
@ -436,23 +442,9 @@ impl System {
}
fn handle_pull_status(&self) -> Result<Message, Error> {
let status = self.status.borrow().clone();
let mut mem = vec![];
for (node, status) in status.nodes.iter() {
let state_info = if *node == self.id {
self.state_info.clone()
} else {
status.state_info.clone()
};
mem.push(AdvertisedNode {
id: *node,
addr: status.addr,
is_up: status.is_up(),
last_seen: status.last_seen,
state_info,
});
}
Ok(Message::AdvertiseNodesUp(mem))
Ok(Message::AdvertiseNodesUp(
self.status.borrow().to_serializable_membership(self),
))
}
fn handle_pull_config(&self) -> Result<Message, Error> {
@ -502,7 +494,7 @@ impl System {
if has_changed {
status.recalculate_hash();
}
update_lock.0.send(Arc::new(status))?;
self.update_status(&update_lock, status).await;
drop(update_lock);
if to_ping.len() > 0 {
@ -522,7 +514,7 @@ impl System {
if adv.version > ring.config.version {
let ring = Ring::new(adv.clone());
update_lock.1.send(Arc::new(ring))?;
update_lock.update_ring.send(Arc::new(ring))?;
drop(update_lock);
self.background.spawn_cancellable(
@ -537,7 +529,7 @@ impl System {
}
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
while !*stop_signal.borrow() {
let restart_at = tokio::time::sleep(PING_INTERVAL);
let status = self.status.borrow().clone();
@ -552,34 +544,64 @@ impl System {
select! {
_ = restart_at.fuse() => (),
_ = stop_signal.changed().fuse() => {
if *stop_signal.borrow() {
return;
}
}
_ = stop_signal.changed().fuse() => (),
}
}
}
async fn consul_loop(
async fn discovery_loop(
self: Arc<Self>,
bootstrap_peers: Vec<SocketAddr>,
consul_host: Option<String>,
consul_service_name: Option<String>,
mut stop_signal: watch::Receiver<bool>,
consul_host: String,
consul_service_name: String,
) {
while !*stop_signal.borrow() {
let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
let consul_config = match (consul_host, consul_service_name) {
(Some(ch), Some(csn)) => Some((ch, csn)),
_ => None,
};
match get_consul_nodes(&consul_host, &consul_service_name).await {
Ok(mut node_list) => {
let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
self.clone().ping_nodes(ping_addrs).await;
while !*stop_signal.borrow() {
let not_configured = self.ring.borrow().config.members.len() == 0;
let no_peers = self.status.borrow().nodes.len() < 3;
let bad_peers = self
.status
.borrow()
.nodes
.iter()
.filter(|(_, v)| v.is_up())
.count() != self.ring.borrow().config.members.len();
if not_configured || no_peers || bad_peers {
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
let mut ping_list = bootstrap_peers
.iter()
.map(|ip| (*ip, None))
.collect::<Vec<_>>();
match self.persist_status.load_async().await {
Ok(peers) => {
ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
}
_ => (),
}
if let Some((consul_host, consul_service_name)) = &consul_config {
match get_consul_nodes(consul_host, consul_service_name).await {
Ok(node_list) => {
ping_list.extend(node_list.iter().map(|a| (*a, None)));
}
Err(e) => {
warn!("Could not retrieve node list from Consul: {}", e);
}
}
}
self.clone().ping_nodes(ping_list).await;
}
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
_ = restart_at.fuse() => (),
_ = stop_signal.changed().fuse() => (),
@ -611,4 +633,35 @@ impl System {
let _: Result<_, _> = self.handle_advertise_config(&config).await;
}
}
async fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) {
if status.hash != self.status.borrow().hash {
let mut list = status.to_serializable_membership(&self);
// Combine with old peer list to make sure no peer is lost
match self.persist_status.load_async().await {
Ok(old_list) => {
for pp in old_list {
if !list.iter().any(|np| pp.id == np.id) {
list.push(pp);
}
}
}
_ => (),
}
if list.len() > 0 {
info!("Persisting new peer list ({} peers)", list.len());
self.persist_status
.save_async(&list)
.await
.expect("Unable to persist peer list");
}
}
updaters
.update_status
.send(Arc::new(status))
.expect("Could not update internal membership status");
}
}

View file

@ -128,7 +128,6 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
{
Err(rpc_error) => {
node_status.num_failures.fetch_add(1, Ordering::SeqCst);
// TODO: Save failure info somewhere
Err(Error::from(rpc_error))
}
Ok(x) => x,

View file

@ -35,7 +35,13 @@ where
F: TableSchema,
R: TableReplication,
{
pub fn new(system: Arc<System>, name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
pub fn new(
system: Arc<System>,
name: String,
instance: F,
replication: R,
db: &sled::Db,
) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", name))
.expect("Unable to open DB tree");

View file

@ -157,7 +157,12 @@ where
if errs.is_empty() {
Ok(true)
} else {
Err(Error::Message(errs.into_iter().map(|x| format!("{}", x)).collect::<Vec<_>>().join(", ")))
Err(Error::Message(
errs.into_iter()
.map(|x| format!("{}", x))
.collect::<Vec<_>>()
.join(", "),
))
}
}

View file

@ -200,12 +200,13 @@ where
let subnode = self.read_node_txn(tx, &key_sub)?;
match subnode {
MerkleNode::Empty => {
warn!("({}) Single subnode in tree is empty Merkle node", self.data.name);
warn!(
"({}) Single subnode in tree is empty Merkle node",
self.data.name
);
Some(MerkleNode::Empty)
}
MerkleNode::Intermediate(_) => {
Some(MerkleNode::Intermediate(children))
}
MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
x @ MerkleNode::Leaf(_, _) => {
tx.remove(key_sub.encode())?;
Some(x)
@ -239,14 +240,24 @@ where
{
let exlf_subkey = key.next_key(&exlf_khash);
let exlf_sub_hash = self.update_item_rec(tx, &exlf_k[..], &exlf_khash, &exlf_subkey, Some(exlf_vhash))?.unwrap();
let exlf_sub_hash = self
.update_item_rec(
tx,
&exlf_k[..],
&exlf_khash,
&exlf_subkey,
Some(exlf_vhash),
)?
.unwrap();
intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash);
assert_eq!(int.len(), 1);
}
{
let key2 = key.next_key(khash);
let subhash = self.update_item_rec(tx, k, khash, &key2, new_vhash)?.unwrap();
let subhash = self
.update_item_rec(tx, k, khash, &key2, new_vhash)?
.unwrap();
intermediate_set_child(&mut int, key2.prefix[i], subhash);
if exlf_khash.as_slice()[i] == khash.as_slice()[i] {
assert_eq!(int.len(), 1);

View file

@ -5,4 +5,5 @@ pub mod background;
pub mod config;
pub mod data;
pub mod error;
pub mod persister;
pub mod time;

72
src/util/persister.rs Normal file
View file

@ -0,0 +1,72 @@
use std::io::{Read, Write};
use std::path::PathBuf;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use serde::{Deserialize, Serialize};
use crate::data::*;
use crate::error::Error;
pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
path: PathBuf,
_marker: std::marker::PhantomData<T>,
}
impl<T> Persister<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
pub fn new(base_dir: &PathBuf, file_name: &str) -> Self {
let mut path = base_dir.clone();
path.push(file_name);
Self {
path,
_marker: Default::default(),
}
}
pub fn load(&self) -> Result<T, Error> {
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
Ok(value)
}
pub fn save(&self, t: &T) -> Result<(), Error> {
let bytes = rmp_to_vec_all_named(t)?;
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&self.path)?;
file.write_all(&bytes[..])?;
Ok(())
}
pub async fn load_async(&self) -> Result<T, Error> {
let mut file = tokio::fs::File::open(&self.path).await?;
let mut bytes = vec![];
file.read_to_end(&mut bytes).await?;
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
Ok(value)
}
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
let bytes = rmp_to_vec_all_named(t)?;
let mut file = tokio::fs::File::create(&self.path).await?;
file.write_all(&bytes[..]).await?;
Ok(())
}
}