Compare commits
35 commits
Author | SHA1 | Date | |
---|---|---|---|
c3bd672d58 | |||
93a9f96130 | |||
7380f3855c | |||
6cbc8d6ec9 | |||
b3b0b20d72 | |||
47d0aee9f8 | |||
6e0cb2dfb6 | |||
c4c4b7dedc | |||
8225fa2e4b | |||
ab67bd88de | |||
0f192a96b5 | |||
7b85056942 | |||
7fd1f9a869 | |||
c5d8dc7d6d | |||
fa11cb746a | |||
f11bd80d2a | |||
595dc0ed0d | |||
78eeaab5ed | |||
22fbb3b892 | |||
0eb5baea1a | |||
7d772737a5 | |||
4c2d8f5a96 | |||
0325086dac | |||
700925263f | |||
6c83f66700 | |||
55e4a93bad | |||
|
9cb9945131 | ||
7f9c1d5595 | |||
991cf1b818 | |||
e26cb2640d | |||
22041e924b | |||
|
b119e9d3c4 | ||
3e6534d7a8 | |||
2dae4a25d6 | |||
fcd566e89d |
25 changed files with 741 additions and 385 deletions
12
.drone.yml
12
.drone.yml
|
@ -1,3 +1,4 @@
|
|||
---
|
||||
kind: pipeline
|
||||
name: default
|
||||
|
||||
|
@ -35,7 +36,9 @@ steps:
|
|||
commands:
|
||||
- apt-get update
|
||||
- apt-get install --yes libsodium-dev
|
||||
- rustup component add rustfmt
|
||||
- pwd
|
||||
- cargo fmt -- --check
|
||||
- cargo build
|
||||
|
||||
- name: cargo-test
|
||||
|
@ -106,6 +109,15 @@ steps:
|
|||
endpoint: https://garage.deuxfleurs.fr
|
||||
region: garage
|
||||
when:
|
||||
event:
|
||||
- push
|
||||
branch:
|
||||
- main
|
||||
repo:
|
||||
- Deuxfleurs/garage
|
||||
|
||||
---
|
||||
kind: signature
|
||||
hmac: bfe75f47e5eecdd1f6dd8fd3cf1ea359b0215243d06ac767c51a4b4e363e963e
|
||||
|
||||
...
|
||||
|
|
88
Cargo.lock
generated
88
Cargo.lock
generated
|
@ -221,6 +221,12 @@ dependencies = [
|
|||
"synstructure",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fastcdc"
|
||||
version = "1.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5afa29be46b12c8c380b997def8d1ac77c2665da93eb0a768fab0bf4db79333f"
|
||||
|
||||
[[package]]
|
||||
name = "fnv"
|
||||
version = "1.0.7"
|
||||
|
@ -365,7 +371,7 @@ dependencies = [
|
|||
"hex",
|
||||
"log",
|
||||
"pretty_env_logger",
|
||||
"rand",
|
||||
"rand 0.8.3",
|
||||
"rmp-serde",
|
||||
"serde",
|
||||
"sled",
|
||||
|
@ -383,6 +389,7 @@ dependencies = [
|
|||
"chrono",
|
||||
"crypto-mac 0.10.0",
|
||||
"err-derive",
|
||||
"fastcdc",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"garage_model",
|
||||
|
@ -397,6 +404,7 @@ dependencies = [
|
|||
"log",
|
||||
"md-5",
|
||||
"percent-encoding",
|
||||
"rand 0.7.3",
|
||||
"roxmltree",
|
||||
"sha2",
|
||||
"tokio",
|
||||
|
@ -415,7 +423,7 @@ dependencies = [
|
|||
"garage_util",
|
||||
"hex",
|
||||
"log",
|
||||
"rand",
|
||||
"rand 0.8.3",
|
||||
"rmp-serde",
|
||||
"serde",
|
||||
"serde_bytes",
|
||||
|
@ -459,7 +467,7 @@ dependencies = [
|
|||
"garage_util",
|
||||
"hexdump",
|
||||
"log",
|
||||
"rand",
|
||||
"rand 0.8.3",
|
||||
"rmp-serde",
|
||||
"serde",
|
||||
"serde_bytes",
|
||||
|
@ -479,7 +487,7 @@ dependencies = [
|
|||
"http",
|
||||
"hyper",
|
||||
"log",
|
||||
"rand",
|
||||
"rand 0.8.3",
|
||||
"rmp-serde",
|
||||
"rustls",
|
||||
"serde",
|
||||
|
@ -529,6 +537,17 @@ dependencies = [
|
|||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.1.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"wasi 0.9.0+wasi-snapshot-preview1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.2.2"
|
||||
|
@ -537,7 +556,7 @@ checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8"
|
|||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"wasi",
|
||||
"wasi 0.10.2+wasi-snapshot-preview1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1043,6 +1062,19 @@ dependencies = [
|
|||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
|
||||
dependencies = [
|
||||
"getrandom 0.1.16",
|
||||
"libc",
|
||||
"rand_chacha 0.2.2",
|
||||
"rand_core 0.5.1",
|
||||
"rand_hc 0.2.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.8.3"
|
||||
|
@ -1050,9 +1082,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"rand_chacha",
|
||||
"rand_core",
|
||||
"rand_hc",
|
||||
"rand_chacha 0.3.0",
|
||||
"rand_core 0.6.2",
|
||||
"rand_hc 0.3.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
|
||||
dependencies = [
|
||||
"ppv-lite86",
|
||||
"rand_core 0.5.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1062,7 +1104,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d"
|
||||
dependencies = [
|
||||
"ppv-lite86",
|
||||
"rand_core",
|
||||
"rand_core 0.6.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
|
||||
dependencies = [
|
||||
"getrandom 0.1.16",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1071,7 +1122,16 @@ version = "0.6.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
"getrandom 0.2.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_hc"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
|
||||
dependencies = [
|
||||
"rand_core 0.5.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1080,7 +1140,7 @@ version = "0.3.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73"
|
||||
dependencies = [
|
||||
"rand_core",
|
||||
"rand_core 0.6.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1581,6 +1641,12 @@ dependencies = [
|
|||
"try-lock",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.9.0+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.10.2+wasi-snapshot-preview1"
|
||||
|
|
2
Makefile
2
Makefile
|
@ -2,7 +2,7 @@ BIN=target/release/garage
|
|||
DOCKER=lxpz/garage_amd64
|
||||
|
||||
all:
|
||||
clear; RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --no-default-features
|
||||
clear; cargo build
|
||||
|
||||
$(BIN):
|
||||
RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --release --no-default-features
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
Garage [![Build Status](https://drone.deuxfleurs.fr/api/badges/Deuxfleurs/garage/status.svg)](https://drone.deuxfleurs.fr/Deuxfleurs/garage)
|
||||
Garage [![Build Status](https://drone.deuxfleurs.fr/api/badges/Deuxfleurs/garage/status.svg?ref=refs/heads/main)](https://drone.deuxfleurs.fr/Deuxfleurs/garage)
|
||||
===
|
||||
|
||||
<p align="center" style="text-align:center;">
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
- [Host a website](./cookbook/website.md)
|
||||
- [Integrate as a media backend]()
|
||||
- [Operate a cluster]()
|
||||
- [Recovering from failures](./cookbook/recovering.md)
|
||||
|
||||
- [Reference Manual](./reference_manual/index.md)
|
||||
- [Garage CLI]()
|
||||
|
|
|
@ -2,4 +2,4 @@
|
|||
|
||||
A cookbook, when you cook, is a collection of recipes.
|
||||
Similarly, Garage's cookbook contains a collection of recipes that are known to works well!
|
||||
This chapter could also be referred as "Tutorials" or "Best practises".
|
||||
This chapter could also be referred as "Tutorials" or "Best practices".
|
||||
|
|
99
doc/book/src/cookbook/recovering.md
Normal file
99
doc/book/src/cookbook/recovering.md
Normal file
|
@ -0,0 +1,99 @@
|
|||
# Recovering from failures
|
||||
|
||||
Garage is meant to work on old, second-hand hardware.
|
||||
In particular, this makes it likely that some of your drives will fail, and some manual intervention will be needed.
|
||||
Fear not! For Garage is fully equipped to handle drive failures, in most common cases.
|
||||
|
||||
## A note on availability of Garage
|
||||
|
||||
With nodes dispersed in 3 datacenters or more, here are the guarantees Garage provides with the default replication strategy (3 copies of all data, which is the recommended value):
|
||||
|
||||
- The cluster remains fully functional as long as the machines that fail are in only one datacenter. This includes a whole datacenter going down due to power/Internet outage.
|
||||
- No data is lost as long as the machines that fail are in at most two datacenters.
|
||||
|
||||
Of course this only works if your Garage nodes are correctly configured to be aware of the datacenter in which they are located.
|
||||
Make sure this is the case using `garage status` to check on the state of your cluster's configuration.
|
||||
|
||||
|
||||
## First option: removing a node
|
||||
|
||||
If you don't have spare parts (HDD, SDD) to replace the failed component, and if there are enough remaining nodes in your cluster
|
||||
(at least 3), you can simply remove the failed node from Garage's configuration.
|
||||
Note that if you **do** intend to replace the failed parts by new ones, using this method followed by adding back the node is **not recommended** (although it should work),
|
||||
and you should instead use one of the methods detailed in the next sections.
|
||||
|
||||
Removing a node is done with the following command:
|
||||
|
||||
```
|
||||
garage node remove --yes <node_id>
|
||||
```
|
||||
|
||||
(you can get the `node_id` of the failed node by running `garage status`)
|
||||
|
||||
This will repartition the data and ensure that 3 copies of everything are present on the nodes that remain available.
|
||||
|
||||
|
||||
|
||||
## Replacement scenario 1: only data is lost, metadata is fine
|
||||
|
||||
The recommended deployment for Garage uses an SSD to store metadata, and an HDD to store blocks of data.
|
||||
In the case where only a single HDD crashes, the blocks of data are lost but the metadata is still fine.
|
||||
|
||||
This is very easy to recover by setting up a new HDD to replace the failed one.
|
||||
The node does not need to be fully replaced and the configuration doesn't need to change.
|
||||
We just need to tell Garage to get back all the data blocks and store them on the new HDD.
|
||||
|
||||
First, set up a new HDD to store Garage's data directory on the failed node, and restart Garage using
|
||||
the existing configuration. Then, run:
|
||||
|
||||
```
|
||||
garage repair -a --yes blocks
|
||||
```
|
||||
|
||||
This will re-synchronize blocks of data that are missing to the new HDD, reading them from copies located on other nodes.
|
||||
|
||||
You can check on the advancement of this process by doing the following command:
|
||||
|
||||
```
|
||||
garage stats -a
|
||||
```
|
||||
|
||||
Look out for the following output:
|
||||
|
||||
```
|
||||
Block manager stats:
|
||||
resync queue length: 26541
|
||||
```
|
||||
|
||||
This indicates that one of the Garage node is in the process of retrieving missing data from other nodes.
|
||||
This number decreases to zero when the node is fully synchronized.
|
||||
|
||||
|
||||
## Replacement scenario 2: metadata (and possibly data) is lost
|
||||
|
||||
This scenario covers the case where a full node fails, i.e. both the metadata directory and
|
||||
the data directory are lost, as well as the case where only the metadata directory is lost.
|
||||
|
||||
To replace the lost node, we will start from an empty metadata directory, which means
|
||||
Garage will generate a new node ID for the replacement node.
|
||||
We will thus need to remove the previous node ID from Garage's configuration and replace it by the ID of the new node.
|
||||
|
||||
If your data directory is stored on a separate drive and is still fine, you can keep it, but it is not necessary to do so.
|
||||
In all cases, the data will be rebalanced and the replacement node will not store the same pieces of data
|
||||
as were originally stored on the one that failed. So if you keep the data files, the rebalancing
|
||||
might be faster but most of the pieces will be deleted anyway from the disk and replaced by other ones.
|
||||
|
||||
First, set up a new drive to store the metadata directory for the replacement node (a SSD is recommended),
|
||||
and for the data directory if necessary. You can then start Garage on the new node.
|
||||
The restarted node should generate a new node ID, and it should be shown as `NOT CONFIGURED` in `garage status`.
|
||||
The ID of the lost node should be shown in `garage status` in the section for disconnected/unavailable nodes.
|
||||
|
||||
Then, replace the broken node by the new one, using:
|
||||
|
||||
```
|
||||
garage node configure --replace <old_node_id> \
|
||||
-c <capacity> -d <datacenter> -t <node_tag> <new_node_id>
|
||||
```
|
||||
|
||||
Garage will then start synchronizing all required data on the new node.
|
||||
This process can be monitored using the `garage stats -a` command.
|
|
@ -7,14 +7,14 @@ We did not test other architecture/operating system but, as long as your archite
|
|||
## From Docker
|
||||
|
||||
Our docker image is currently named `lxpz/garage_amd64` and is stored on the [Docker Hub](https://hub.docker.com/r/lxpz/garage_amd64/tags?page=1&ordering=last_updated).
|
||||
We encourage you to use a fixed tag (eg. `v0.1.1d`) and not the `latest` tag.
|
||||
For this example, we will use the latest published version at the time of the writing which is `v0.1.1d` but it's up to you
|
||||
We encourage you to use a fixed tag (eg. `v0.2.1`) and not the `latest` tag.
|
||||
For this example, we will use the latest published version at the time of the writing which is `v0.2.1` but it's up to you
|
||||
to check [the most recent versions on the Docker Hub](https://hub.docker.com/r/lxpz/garage_amd64/tags?page=1&ordering=last_updated).
|
||||
|
||||
For example:
|
||||
|
||||
```
|
||||
sudo docker pull lxpz/garage_amd64:v0.1.1d
|
||||
sudo docker pull lxpz/garage_amd64:v0.2.1
|
||||
```
|
||||
|
||||
## From source
|
||||
|
|
|
@ -40,22 +40,18 @@ garagectl key new --name nextcloud-app-key
|
|||
|
||||
You will have the following output (this one is fake, `key_id` and `secret_key` were generated with the openssl CLI tool):
|
||||
|
||||
```javascript
|
||||
Key {
|
||||
key_id: "GK3515373e4c851ebaad366558",
|
||||
secret_key: "7d37d093435a41f2aab8f13c19ba067d9776c90215f56614adad6ece597dbb34",
|
||||
name: "nextcloud-app-key",
|
||||
name_timestamp: 1603280506694,
|
||||
deleted: false,
|
||||
authorized_buckets: []
|
||||
}
|
||||
```
|
||||
Key name: nextcloud-app-key
|
||||
Key ID: GK3515373e4c851ebaad366558
|
||||
Secret key: 7d37d093435a41f2aab8f13c19ba067d9776c90215f56614adad6ece597dbb34
|
||||
Authorized buckets:
|
||||
```
|
||||
|
||||
Check that everything works as intended (be careful, info works only with your key identifier and not with its friendly name!):
|
||||
Check that everything works as intended:
|
||||
|
||||
```
|
||||
garagectl key list
|
||||
garagectl key info GK3515373e4c851ebaad366558
|
||||
garagectl key info nextcloud-app-key
|
||||
```
|
||||
|
||||
## Allow a key to access a bucket
|
||||
|
@ -67,7 +63,7 @@ garagectl bucket allow \
|
|||
--read \
|
||||
--write
|
||||
nextcloud-bucket \
|
||||
--key GK3515373e4c851ebaad366558
|
||||
--key nextcloud-app-key
|
||||
```
|
||||
|
||||
You can check at any times allowed keys on your bucket with:
|
||||
|
|
|
@ -11,20 +11,20 @@ As this part is not relevant for a test cluster, you can use this one-liner to c
|
|||
|
||||
```bash
|
||||
garagectl status | grep UNCONFIGURED | grep -Po '^[0-9a-f]+' | while read id; do
|
||||
garagectl node configure -d dc1 -n 10 $id
|
||||
garagectl node configure -d dc1 -c 1 $id
|
||||
done
|
||||
```
|
||||
|
||||
## Real-world cluster
|
||||
|
||||
For our example, we will suppose we have the following infrastructure (Tokens, Identifier and Datacenter are specific values to garage described in the following):
|
||||
For our example, we will suppose we have the following infrastructure (Capacity, Identifier and Datacenter are specific values to garage described in the following):
|
||||
|
||||
| Location | Name | Disk Space | `Tokens` | `Identifier` | `Datacenter` |
|
||||
|----------|---------|------------|----------|--------------|--------------|
|
||||
| Paris | Mercury | 1 To | `100` | `8781c5` | `par1` |
|
||||
| Paris | Venus | 2 To | `200` | `2a638e` | `par1` |
|
||||
| London | Earth | 2 To | `200` | `68143d` | `lon1` |
|
||||
| Brussels | Mars | 1.5 To | `150` | `212f75` | `bru1` |
|
||||
| Location | Name | Disk Space | `Capacity` | `Identifier` | `Datacenter` |
|
||||
|----------|---------|------------|------------|--------------|--------------|
|
||||
| Paris | Mercury | 1 To | `2` | `8781c5` | `par1` |
|
||||
| Paris | Venus | 2 To | `4` | `2a638e` | `par1` |
|
||||
| London | Earth | 2 To | `4` | `68143d` | `lon1` |
|
||||
| Brussels | Mars | 1.5 To | `3` | `212f75` | `bru1` |
|
||||
|
||||
### Identifier
|
||||
|
||||
|
@ -45,14 +45,15 @@ garagectl status
|
|||
|
||||
It will display the IP address associated with each node; from the IP address you will be able to recognize the node.
|
||||
|
||||
### Tokens
|
||||
### Capacity
|
||||
|
||||
Garage reasons on an arbitrary metric about disk storage that is named "tokens".
|
||||
The number of tokens must be proportional to the disk space dedicated to the node.
|
||||
Additionaly, ideally the number of tokens must be in the order of magnitude of 100
|
||||
to provide a good trade-off between data load balancing and performances (*this sentence must be verified, it may be wrong*).
|
||||
Garage reasons on an arbitrary metric about disk storage that is named the *capacity* of a node.
|
||||
The capacity configured in Garage must be proportional to the disk space dedicated to the node.
|
||||
Additionaly, the capacity values used in Garage should be as small as possible, with
|
||||
1 ideally representing the size of your smallest server.
|
||||
|
||||
Here we chose 1 token = 10 Go but you are free to select the value that best fit your needs.
|
||||
Here we chose that 1 unit of capacity = 0.5 To, so that we can express servers of size
|
||||
1 To and 2 To, as wel as the intermediate size 1.5 To.
|
||||
|
||||
### Datacenter
|
||||
|
||||
|
@ -65,8 +66,8 @@ Behind the scene, garage will try to store the same data on different sites to p
|
|||
Given the information above, we will configure our cluster as follow:
|
||||
|
||||
```
|
||||
garagectl node configure --datacenter par1 -n 100 -t mercury 8781c5
|
||||
garagectl node configure --datacenter par1 -n 200 -t venus 2a638e
|
||||
garagectl node configure --datacenter lon1 -n 200 -t earth 68143d
|
||||
garagectl node configure --datacenter bru1 -n 150 -t mars 212f75
|
||||
garagectl node configure --datacenter par1 -c 2 -t mercury 8781c5
|
||||
garagectl node configure --datacenter par1 -c 4 -t venus 2a638e
|
||||
garagectl node configure --datacenter lon1 -c 4 -t earth 68143d
|
||||
garagectl node configure --datacenter bru1 -c 3 -t mars 212f75
|
||||
```
|
||||
|
|
|
@ -124,8 +124,8 @@ For our example, we will suppose the following infrastructure:
|
|||
|----------|---------|------------|------------|
|
||||
| Paris | Mercury | fc00:1::1 | 1 To |
|
||||
| Paris | Venus | fc00:1::2 | 2 To |
|
||||
| London | Earth | fc00:1::2 | 2 To |
|
||||
| Brussels | Mars | fc00:B::1 | 1.5 To |
|
||||
| London | Earth | fc00:B::1 | 2 To |
|
||||
| Brussels | Mars | fc00:F::1 | 1.5 To |
|
||||
|
||||
On each machine, we will have a similar setup, especially you must consider the following folders/files:
|
||||
- `/etc/garage/pki`: Garage certificates, must be generated on your computer and copied on the servers
|
||||
|
|
|
@ -4,63 +4,68 @@
|
|||
</a>
|
||||
</p>
|
||||
|
||||
```
|
||||
This very website is hosted using Garage. In other words: the doc is the PoC!
|
||||
```
|
||||
|
||||
# The Garage Geo-Distributed Data Store
|
||||
|
||||
Garage is a lightweight geo-distributed data store.
|
||||
It comes from the observation that despite numerous object stores
|
||||
many people have broken data management policies (backup/replication on a single site or none at all).
|
||||
To promote better data management policies, with focused on the following desirable properties:
|
||||
To promote better data management policies, we focused on the following **desirable properties**:
|
||||
|
||||
- **Self-contained & lightweight**: works everywhere and integrates well in existing environments to target hyperconverged infrastructures
|
||||
- **Highly resilient**: highly resilient to network failures, network latency, disk failures, sysadmin failures
|
||||
- **Simple**: simple to understand, simple to operate, simple to debug
|
||||
- **Internet enabled**: made for multi-sites (eg. datacenter, offices, etc.) interconnected through a regular internet connection.
|
||||
- **Self-contained & lightweight**: works everywhere and integrates well in existing environments to target [hyperconverged infrastructures](https://en.wikipedia.org/wiki/Hyper-converged_infrastructure).
|
||||
- **Highly resilient**: highly resilient to network failures, network latency, disk failures, sysadmin failures.
|
||||
- **Simple**: simple to understand, simple to operate, simple to debug.
|
||||
- **Internet enabled**: made for multi-sites (eg. datacenters, offices, households, etc.) interconnected through regular Internet connections.
|
||||
|
||||
We also noted that the pursuit of some other goals are detrimental to our initial goals.
|
||||
The following have been identified has non-goals, if it matters to you, you should not use Garage:
|
||||
The following has been identified as **non-goals** (if these points matter to you, you should not use Garage):
|
||||
|
||||
- **Extreme performances**: high performances constrain a lot the design and the infrastructure; we seek performances through minimalism only.
|
||||
- **Feature extensiveness**: complete implementation of the S3 API or any other API to make garage a drop-in replacement is not targeted as it could lead to decisions impacting our desirable properties.
|
||||
- **Storage optimizations**: erasure coding or any other coding technique both increase the difficulty of placing data and synchronizing; we limit ourselves to duplication.
|
||||
- **POSIX/Filesystem compatibility**: we do not aim at being POSIX compatible or to emulate any kind of filesystem. Indeed, in a distributed environment, such syncronizations are translated in network messages that impose severe constraints on the deployment.
|
||||
- **POSIX/Filesystem compatibility**: we do not aim at being POSIX compatible or to emulate any kind of filesystem. Indeed, in a distributed environment, such synchronizations are translated in network messages that impose severe constraints on the deployment.
|
||||
|
||||
## Supported and planned protocols
|
||||
|
||||
Garage speaks (or will speak) the following protocols:
|
||||
|
||||
- [S3](https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html) - *SUPPORTED* - Enable applications to store large blobs such as pictures, video, images, documents, etc. S3 is versatile enough to also be used to publish a static website.
|
||||
- [IMAP](https://github.com/go-pluto/pluto) - *PLANNED* - email storage is quite complex to get good oerformances.
|
||||
To keep performances optimals, most imap servers only support on-disk storage.
|
||||
- [IMAP](https://github.com/go-pluto/pluto) - *PLANNED* - email storage is quite complex to get good performances.
|
||||
To keep performances optimal, most IMAP servers only support on-disk storage.
|
||||
We plan to add logic to Garage to make it a viable solution for email storage.
|
||||
- *More to come*
|
||||
|
||||
## Use Cases
|
||||
|
||||
**[Deuxfleurs](https://deuxfleurs.fr) :** Garage is used by Deuxfleurs which is a non-profit hosting organization.
|
||||
Especially, it is used to host their main website, this documentation and some of its members's blogs. Additionally,
|
||||
Garage is used as a [backend for Nextcloud](https://docs.nextcloud.com/server/20/admin_manual/configuration_files/primary_storage.html). Deuxfleurs also plans to use Garage as their [Matrix's media backend](https://github.com/matrix-org/synapse-s3-storage-provider) and has the backend of [OCIS](https://github.com/owncloud/ocis).
|
||||
**[Deuxfleurs](https://deuxfleurs.fr):** Garage is used by Deuxfleurs which is a non-profit hosting organization.
|
||||
Especially, it is used to host their main website, this documentation and some of its members' blogs.
|
||||
Additionally, Garage is used as a [backend for Nextcloud](https://docs.nextcloud.com/server/20/admin_manual/configuration_files/primary_storage.html).
|
||||
Deuxfleurs also plans to use Garage as their [Matrix's media backend](https://github.com/matrix-org/synapse-s3-storage-provider) and as the backend of [OCIS](https://github.com/owncloud/ocis).
|
||||
|
||||
*Are you using Garage? [Open a pull request](https://git.deuxfleurs.fr/Deuxfleurs/garage/) to add your organization here!*
|
||||
|
||||
## Comparison to existing software
|
||||
|
||||
**[Minio](https://min.io/) :** Minio shares our *self-contained & lightweight* goal but selected two of our non-goals: *storage optimizations* through erasure coding and *POSIX/Filesystem compatibility* through strong consistency.
|
||||
However, by pursuing these two non-goals, minio do not reach our desirable properties.
|
||||
First, it fails on the *simple* property: due to the erasure coding, minio has severe limitations on how drives can be added or deleted from a cluster.
|
||||
Second, it fails on the *interned enabled* property: due to its strong consistency, minio is latency sensitive.
|
||||
Furthermore, minio has no knowledge of "sites" and thus can not distribute data to minimize the failure of a given site.
|
||||
**[MinIO](https://min.io/):** MinIO shares our *Self-contained & lightweight* goal but selected two of our non-goals: *Storage optimizations* through erasure coding and *POSIX/Filesystem compatibility* through strong consistency.
|
||||
However, by pursuing these two non-goals, MinIO do not reach our desirable properties.
|
||||
Firstly, it fails on the *Simple* property: due to the erasure coding, MinIO has severe limitations on how drives can be added or deleted from a cluster.
|
||||
Secondly, it fails on the *Internet enabled* property: due to its strong consistency, MinIO is latency sensitive.
|
||||
Furthermore, MinIO has no knowledge of "sites" and thus can not distribute data to minimize the failure of a given site.
|
||||
|
||||
**[Openstack Swift](https://docs.openstack.org/swift/latest/) :**
|
||||
OpenStack Swift at least fails on the *self-contained & lightweight* goal.
|
||||
Starting it requires around 8Gb of RAM, which is too much especially in an hyperconverged infrastructure.
|
||||
It seems also to be far from *Simple*.
|
||||
**[Openstack Swift](https://docs.openstack.org/swift/latest/):**
|
||||
OpenStack Swift at least fails on the *Self-contained & lightweight* goal.
|
||||
Starting it requires around 8GB of RAM, which is too much especially in an hyperconverged infrastructure.
|
||||
We also do not classify Swift as *Simple*.
|
||||
|
||||
**[Ceph](https://ceph.io/ceph-storage/object-storage/) :**
|
||||
**[Ceph](https://ceph.io/ceph-storage/object-storage/):**
|
||||
This review holds for the whole Ceph stack, including the RADOS paper, Ceph Object Storage module, the RADOS Gateway, etc.
|
||||
At is core, Ceph has been designed to provide *POSIX/Filesystem compatibility* which requires strong consistency, which in turn
|
||||
makes Ceph latency sensitive and fails our *Internet enabled* goal.
|
||||
Due to its industry oriented design, Ceph is also far from being *Simple* to operate and from being *self-contained & lightweight* which makes it hard to integrate it in an hyperconverged infrastructure.
|
||||
In a certain way, Ceph and Minio are closer togethers than they are from Garage or OpenStack Swift.
|
||||
At its core, Ceph has been designed to provide *POSIX/Filesystem compatibility* which requires strong consistency, which in turn
|
||||
makes Ceph latency-sensitive and fails our *Internet enabled* goal.
|
||||
Due to its industry oriented design, Ceph is also far from being *Simple* to operate and from being *Self-contained & lightweight* which makes it hard to integrate it in an hyperconverged infrastructure.
|
||||
In a certain way, Ceph and MinIO are closer together than they are from Garage or OpenStack Swift.
|
||||
|
||||
*More comparisons are available in our [Related Work](design/related_work.md) chapter.*
|
||||
|
||||
|
@ -71,29 +76,29 @@ We reference here other places on the Internet where you can learn more about Ga
|
|||
|
||||
### Rust API (docs.rs)
|
||||
|
||||
If you encounter a specific bug in Garage or plan to patch it, you may jump directly to the source code documentation!
|
||||
If you encounter a specific bug in Garage or plan to patch it, you may jump directly to the source code's documentation!
|
||||
|
||||
- [garage\_api](https://docs.rs/garage_api/latest/garage_api/) - contains the S3 standard API endpoint
|
||||
- [garage\_model](https://docs.rs/garage_model/latest/garage_model/) - contains Garage's model built on the table abstraction
|
||||
- [garage\_rpc](https://docs.rs/garage_rpc/latest/garage_rpc/) - contains Garage's federation protocol
|
||||
- [garage\_table](https://docs.rs/garage_table/latest/garage_table/) - contains core Garage's CRDT datatypes
|
||||
- [garage\_util](https://docs.rs/garage_util/latest/garage_util/) - contains garage entrypoints (daemon, cli)
|
||||
- [garage\_util](https://docs.rs/garage_util/latest/garage_util/) - contains garage helpers
|
||||
- [garage\_web](https://docs.rs/garage_web/latest/garage_web/) - contains the S3 website endpoint
|
||||
|
||||
### Talks
|
||||
|
||||
We love to talk and hear about Garage, that's why we keep a log here:
|
||||
|
||||
- [(fr, 2020-12-02) Garage : jouer dans la cour des grands quand on est un hébergeur associatif](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/master/doc/20201202_talk/talk.pdf)
|
||||
- [(fr, 2020-12-02) Garage : jouer dans la cour des grands quand on est un hébergeur associatif](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/main/doc/20201202_talk/talk.pdf)
|
||||
|
||||
*Did you write or talk about Garage? [Open a pull request](https://git.deuxfleurs.fr/Deuxfleurs/garage/) to add a link here!*
|
||||
|
||||
## Community
|
||||
|
||||
If you want to discuss with us, you can join our Matrix channel at [#garage:deuxfleurs.fr](https://matrix.to/#/#garage:deuxfleurs.fr).
|
||||
Our code and our issue tracker, which is the place where you should report bugs, are managed on [Deuxfleurs' Gitea](https://git.deuxfleurs.fr/Deuxfleurs/garage).
|
||||
Our code repository and issue tracker, which is the place where you should report bugs, is managed on [Deuxfleurs' Gitea](https://git.deuxfleurs.fr/Deuxfleurs/garage).
|
||||
|
||||
## License
|
||||
|
||||
Garage, all the source code, is released under the [AGPL v3 License](https://www.gnu.org/licenses/agpl-3.0.en.html).
|
||||
Garage's source code, is released under the [AGPL v3 License](https://www.gnu.org/licenses/agpl-3.0.en.html).
|
||||
Please note that if you patch Garage and then use it to provide any service over a network, you must share your code!
|
||||
|
|
|
@ -22,10 +22,12 @@ bytes = "1.0"
|
|||
chrono = "0.4"
|
||||
crypto-mac = "0.10"
|
||||
err-derive = "0.3"
|
||||
fastcdc = "1.0.5"
|
||||
hex = "0.4"
|
||||
hmac = "0.10"
|
||||
log = "0.4"
|
||||
md-5 = "0.9"
|
||||
rand = "0.7"
|
||||
sha2 = "0.9"
|
||||
|
||||
futures = "0.3"
|
||||
|
|
|
@ -2,6 +2,7 @@ use std::collections::{BTreeMap, VecDeque};
|
|||
use std::fmt::Write;
|
||||
use std::sync::Arc;
|
||||
|
||||
use fastcdc::{Chunk, FastCDC};
|
||||
use futures::stream::*;
|
||||
use hyper::{Body, Request, Response};
|
||||
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
||||
|
@ -22,6 +23,8 @@ use crate::encoding::*;
|
|||
use crate::error::*;
|
||||
use crate::signature::verify_signed_content;
|
||||
|
||||
// ---- PutObject call ----
|
||||
|
||||
pub async fn handle_put(
|
||||
garage: Arc<Garage>,
|
||||
req: Request<Body>,
|
||||
|
@ -150,159 +153,6 @@ pub async fn handle_put(
|
|||
Ok(put_response(version_uuid, md5sum_hex))
|
||||
}
|
||||
|
||||
/// Validate MD5 sum against content-md5 header
|
||||
/// and sha256sum against signed content-sha256
|
||||
fn ensure_checksum_matches(
|
||||
data_md5sum: &[u8],
|
||||
data_sha256sum: garage_util::data::FixedBytes32,
|
||||
content_md5: Option<&str>,
|
||||
content_sha256: Option<garage_util::data::FixedBytes32>,
|
||||
) -> Result<(), Error> {
|
||||
if let Some(expected_sha256) = content_sha256 {
|
||||
if expected_sha256 != data_sha256sum {
|
||||
return Err(Error::BadRequest(format!(
|
||||
"Unable to validate x-amz-content-sha256"
|
||||
)));
|
||||
} else {
|
||||
trace!("Successfully validated x-amz-content-sha256");
|
||||
}
|
||||
}
|
||||
if let Some(expected_md5) = content_md5 {
|
||||
if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
|
||||
return Err(Error::BadRequest(format!("Unable to validate content-md5")));
|
||||
} else {
|
||||
trace!("Successfully validated content-md5");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_and_put_blocks(
|
||||
garage: &Garage,
|
||||
version: &Version,
|
||||
part_number: u64,
|
||||
first_block: Vec<u8>,
|
||||
first_block_hash: Hash,
|
||||
chunker: &mut BodyChunker,
|
||||
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
|
||||
let mut md5hasher = Md5::new();
|
||||
let mut sha256hasher = Sha256::new();
|
||||
md5hasher.update(&first_block[..]);
|
||||
sha256hasher.update(&first_block[..]);
|
||||
|
||||
let mut next_offset = first_block.len();
|
||||
let mut put_curr_version_block = put_block_meta(
|
||||
&garage,
|
||||
&version,
|
||||
part_number,
|
||||
0,
|
||||
first_block_hash,
|
||||
first_block.len() as u64,
|
||||
);
|
||||
let mut put_curr_block = garage
|
||||
.block_manager
|
||||
.rpc_put_block(first_block_hash, first_block);
|
||||
|
||||
loop {
|
||||
let (_, _, next_block) =
|
||||
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
|
||||
if let Some(block) = next_block {
|
||||
md5hasher.update(&block[..]);
|
||||
sha256hasher.update(&block[..]);
|
||||
let block_hash = blake2sum(&block[..]);
|
||||
let block_len = block.len();
|
||||
put_curr_version_block = put_block_meta(
|
||||
&garage,
|
||||
&version,
|
||||
part_number,
|
||||
next_offset as u64,
|
||||
block_hash,
|
||||
block_len as u64,
|
||||
);
|
||||
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
|
||||
next_offset += block_len;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let total_size = next_offset as u64;
|
||||
let data_md5sum = md5hasher.finalize();
|
||||
|
||||
let data_sha256sum = sha256hasher.finalize();
|
||||
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
|
||||
|
||||
Ok((total_size, data_md5sum, data_sha256sum))
|
||||
}
|
||||
|
||||
async fn put_block_meta(
|
||||
garage: &Garage,
|
||||
version: &Version,
|
||||
part_number: u64,
|
||||
offset: u64,
|
||||
hash: Hash,
|
||||
size: u64,
|
||||
) -> Result<(), GarageError> {
|
||||
let mut version = version.clone();
|
||||
version.blocks.put(
|
||||
VersionBlockKey {
|
||||
part_number,
|
||||
offset,
|
||||
},
|
||||
VersionBlock { hash, size },
|
||||
);
|
||||
|
||||
let block_ref = BlockRef {
|
||||
block: hash,
|
||||
version: version.uuid,
|
||||
deleted: false.into(),
|
||||
};
|
||||
|
||||
futures::try_join!(
|
||||
garage.version_table.insert(&version),
|
||||
garage.block_ref_table.insert(&block_ref),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct BodyChunker {
|
||||
body: Body,
|
||||
read_all: bool,
|
||||
block_size: usize,
|
||||
buf: VecDeque<u8>,
|
||||
}
|
||||
|
||||
impl BodyChunker {
|
||||
fn new(body: Body, block_size: usize) -> Self {
|
||||
Self {
|
||||
body,
|
||||
read_all: false,
|
||||
block_size,
|
||||
buf: VecDeque::with_capacity(2 * block_size),
|
||||
}
|
||||
}
|
||||
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
|
||||
while !self.read_all && self.buf.len() < self.block_size {
|
||||
if let Some(block) = self.body.next().await {
|
||||
let bytes = block?;
|
||||
trace!("Body next: {} bytes", bytes.len());
|
||||
self.buf.extend(&bytes[..]);
|
||||
} else {
|
||||
self.read_all = true;
|
||||
}
|
||||
}
|
||||
if self.buf.len() == 0 {
|
||||
Ok(None)
|
||||
} else if self.buf.len() <= self.block_size {
|
||||
let block = self.buf.drain(..).collect::<Vec<u8>>();
|
||||
Ok(Some(block))
|
||||
} else {
|
||||
let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>();
|
||||
Ok(Some(block))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> {
|
||||
Response::builder()
|
||||
.header("x-amz-version-id", hex::encode(version_uuid))
|
||||
|
@ -311,6 +161,8 @@ pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> {
|
|||
.unwrap()
|
||||
}
|
||||
|
||||
// ---- Mutlipart upload calls ----
|
||||
|
||||
pub async fn handle_create_multipart_upload(
|
||||
garage: Arc<Garage>,
|
||||
req: &Request<Body>,
|
||||
|
@ -575,59 +427,7 @@ pub async fn handle_abort_multipart_upload(
|
|||
Ok(Response::new(Body::from(vec![])))
|
||||
}
|
||||
|
||||
fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
|
||||
Ok(req
|
||||
.headers()
|
||||
.get(hyper::header::CONTENT_TYPE)
|
||||
.map(|x| x.to_str())
|
||||
.unwrap_or(Ok("blob"))?
|
||||
.to_string())
|
||||
}
|
||||
|
||||
pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
|
||||
let content_type = get_mime_type(req)?;
|
||||
let mut other = BTreeMap::new();
|
||||
|
||||
// Preserve standard headers
|
||||
let standard_header = vec![
|
||||
hyper::header::CACHE_CONTROL,
|
||||
hyper::header::CONTENT_DISPOSITION,
|
||||
hyper::header::CONTENT_ENCODING,
|
||||
hyper::header::CONTENT_LANGUAGE,
|
||||
hyper::header::EXPIRES,
|
||||
];
|
||||
for h in standard_header.iter() {
|
||||
if let Some(v) = req.headers().get(h) {
|
||||
match v.to_str() {
|
||||
Ok(v_str) => {
|
||||
other.insert(h.to_string(), v_str.to_string());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Discarding header {}, error in .to_str(): {}", h, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve x-amz-meta- headers
|
||||
for (k, v) in req.headers().iter() {
|
||||
if k.as_str().starts_with("x-amz-meta-") {
|
||||
match v.to_str() {
|
||||
Ok(v_str) => {
|
||||
other.insert(k.to_string(), v_str.to_string());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Discarding header {}, error in .to_str(): {}", k, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ObjectVersionHeaders {
|
||||
content_type,
|
||||
other,
|
||||
})
|
||||
}
|
||||
// ---- Parsing input to multipart upload calls ----
|
||||
|
||||
fn decode_upload_id(id: &str) -> Result<UUID, Error> {
|
||||
let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?;
|
||||
|
@ -674,3 +474,224 @@ fn parse_complete_multpart_upload_body(
|
|||
|
||||
Some(parts)
|
||||
}
|
||||
|
||||
// ---- Common code ----
|
||||
|
||||
pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
|
||||
let content_type = req
|
||||
.headers()
|
||||
.get(hyper::header::CONTENT_TYPE)
|
||||
.map(|x| x.to_str())
|
||||
.unwrap_or(Ok("blob"))?
|
||||
.to_string();
|
||||
|
||||
let mut other = BTreeMap::new();
|
||||
|
||||
// Preserve standard headers
|
||||
let standard_header = vec![
|
||||
hyper::header::CACHE_CONTROL,
|
||||
hyper::header::CONTENT_DISPOSITION,
|
||||
hyper::header::CONTENT_ENCODING,
|
||||
hyper::header::CONTENT_LANGUAGE,
|
||||
hyper::header::EXPIRES,
|
||||
];
|
||||
for h in standard_header.iter() {
|
||||
if let Some(v) = req.headers().get(h) {
|
||||
match v.to_str() {
|
||||
Ok(v_str) => {
|
||||
other.insert(h.to_string(), v_str.to_string());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Discarding header {}, error in .to_str(): {}", h, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve x-amz-meta- headers
|
||||
for (k, v) in req.headers().iter() {
|
||||
if k.as_str().starts_with("x-amz-meta-") {
|
||||
match v.to_str() {
|
||||
Ok(v_str) => {
|
||||
other.insert(k.to_string(), v_str.to_string());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Discarding header {}, error in .to_str(): {}", k, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ObjectVersionHeaders {
|
||||
content_type,
|
||||
other,
|
||||
})
|
||||
}
|
||||
|
||||
struct BodyChunker {
|
||||
body: Body,
|
||||
read_all: bool,
|
||||
min_block_size: usize,
|
||||
avg_block_size: usize,
|
||||
max_block_size: usize,
|
||||
buf: VecDeque<u8>,
|
||||
}
|
||||
|
||||
impl BodyChunker {
|
||||
fn new(body: Body, block_size: usize) -> Self {
|
||||
let min_block_size = block_size / 4 * 3;
|
||||
let avg_block_size = block_size;
|
||||
let max_block_size = block_size * 2;
|
||||
Self {
|
||||
body,
|
||||
read_all: false,
|
||||
min_block_size,
|
||||
avg_block_size,
|
||||
max_block_size,
|
||||
buf: VecDeque::with_capacity(2 * max_block_size),
|
||||
}
|
||||
}
|
||||
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
|
||||
while !self.read_all && self.buf.len() < self.max_block_size {
|
||||
if let Some(block) = self.body.next().await {
|
||||
let bytes = block?;
|
||||
trace!("Body next: {} bytes", bytes.len());
|
||||
self.buf.extend(&bytes[..]);
|
||||
} else {
|
||||
self.read_all = true;
|
||||
}
|
||||
}
|
||||
if self.buf.len() == 0 {
|
||||
Ok(None)
|
||||
} else {
|
||||
let mut iter = FastCDC::with_eof(
|
||||
self.buf.make_contiguous(),
|
||||
self.min_block_size,
|
||||
self.avg_block_size,
|
||||
self.max_block_size,
|
||||
self.read_all,
|
||||
);
|
||||
if let Some(Chunk { length, .. }) = iter.next() {
|
||||
let block = self.buf.drain(..length).collect::<Vec<u8>>();
|
||||
Ok(Some(block))
|
||||
} else {
|
||||
unreachable!("FastCDC returned not chunk")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_and_put_blocks(
|
||||
garage: &Garage,
|
||||
version: &Version,
|
||||
part_number: u64,
|
||||
first_block: Vec<u8>,
|
||||
first_block_hash: Hash,
|
||||
chunker: &mut BodyChunker,
|
||||
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
|
||||
let mut md5hasher = Md5::new();
|
||||
let mut sha256hasher = Sha256::new();
|
||||
md5hasher.update(&first_block[..]);
|
||||
sha256hasher.update(&first_block[..]);
|
||||
|
||||
let mut next_offset = first_block.len();
|
||||
let mut put_curr_version_block = put_block_meta(
|
||||
&garage,
|
||||
&version,
|
||||
part_number,
|
||||
0,
|
||||
first_block_hash,
|
||||
first_block.len() as u64,
|
||||
);
|
||||
let mut put_curr_block = garage
|
||||
.block_manager
|
||||
.rpc_put_block(first_block_hash, first_block);
|
||||
|
||||
loop {
|
||||
let (_, _, next_block) =
|
||||
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
|
||||
if let Some(block) = next_block {
|
||||
md5hasher.update(&block[..]);
|
||||
sha256hasher.update(&block[..]);
|
||||
let block_hash = blake2sum(&block[..]);
|
||||
let block_len = block.len();
|
||||
put_curr_version_block = put_block_meta(
|
||||
&garage,
|
||||
&version,
|
||||
part_number,
|
||||
next_offset as u64,
|
||||
block_hash,
|
||||
block_len as u64,
|
||||
);
|
||||
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
|
||||
next_offset += block_len;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let total_size = next_offset as u64;
|
||||
let data_md5sum = md5hasher.finalize();
|
||||
|
||||
let data_sha256sum = sha256hasher.finalize();
|
||||
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
|
||||
|
||||
Ok((total_size, data_md5sum, data_sha256sum))
|
||||
}
|
||||
|
||||
async fn put_block_meta(
|
||||
garage: &Garage,
|
||||
version: &Version,
|
||||
part_number: u64,
|
||||
offset: u64,
|
||||
hash: Hash,
|
||||
size: u64,
|
||||
) -> Result<(), GarageError> {
|
||||
let mut version = version.clone();
|
||||
version.blocks.put(
|
||||
VersionBlockKey {
|
||||
part_number,
|
||||
offset,
|
||||
},
|
||||
VersionBlock { hash, size },
|
||||
);
|
||||
|
||||
let block_ref = BlockRef {
|
||||
block: hash,
|
||||
version: version.uuid,
|
||||
deleted: false.into(),
|
||||
};
|
||||
|
||||
futures::try_join!(
|
||||
garage.version_table.insert(&version),
|
||||
garage.block_ref_table.insert(&block_ref),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validate MD5 sum against content-md5 header
|
||||
/// and sha256sum against signed content-sha256
|
||||
fn ensure_checksum_matches(
|
||||
data_md5sum: &[u8],
|
||||
data_sha256sum: garage_util::data::FixedBytes32,
|
||||
content_md5: Option<&str>,
|
||||
content_sha256: Option<garage_util::data::FixedBytes32>,
|
||||
) -> Result<(), Error> {
|
||||
if let Some(expected_sha256) = content_sha256 {
|
||||
if expected_sha256 != data_sha256sum {
|
||||
return Err(Error::BadRequest(format!(
|
||||
"Unable to validate x-amz-content-sha256"
|
||||
)));
|
||||
} else {
|
||||
trace!("Successfully validated x-amz-content-sha256");
|
||||
}
|
||||
}
|
||||
if let Some(expected_md5) = content_md5 {
|
||||
if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
|
||||
return Err(Error::BadRequest(format!("Unable to validate content-md5")));
|
||||
} else {
|
||||
trace!("Successfully validated content-md5");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -246,15 +246,13 @@ impl AdminRpcHandler {
|
|||
)))
|
||||
}
|
||||
KeyOperation::Import(query) => {
|
||||
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id)
|
||||
.await?;
|
||||
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
|
||||
if prev_key.is_some() {
|
||||
return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
|
||||
}
|
||||
let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
|
||||
self.garage.key_table.insert(&imported_key).await?;
|
||||
Ok(AdminRPC::KeyInfo(imported_key))
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,8 +5,8 @@ use std::path::PathBuf;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use structopt::StructOpt;
|
||||
|
||||
use garage_util::error::Error;
|
||||
use garage_util::data::UUID;
|
||||
use garage_util::error::Error;
|
||||
use garage_util::time::*;
|
||||
|
||||
use garage_rpc::membership::*;
|
||||
|
@ -384,7 +384,10 @@ pub async fn cmd_status(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn find_matching_node(cand: impl std::iter::Iterator<Item=UUID>, pattern: &str) -> Result<UUID, Error> {
|
||||
pub fn find_matching_node(
|
||||
cand: impl std::iter::Iterator<Item = UUID>,
|
||||
pattern: &str,
|
||||
) -> Result<UUID, Error> {
|
||||
let mut candidates = vec![];
|
||||
for c in cand {
|
||||
if hex::encode(&c).starts_with(&pattern) {
|
||||
|
@ -428,7 +431,10 @@ pub async fn cmd_configure(
|
|||
for replaced in args.replace.iter() {
|
||||
let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
|
||||
if config.members.remove(&replaced_node).is_none() {
|
||||
return Err(Error::Message(format!("Cannot replace node {:?} as it is not in current configuration", replaced_node)));
|
||||
return Err(Error::Message(format!(
|
||||
"Cannot replace node {:?} as it is not in current configuration",
|
||||
replaced_node
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
|||
info!("Initializing Garage main data store...");
|
||||
let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
|
||||
let bootstrap = garage.system.clone().bootstrap(
|
||||
&config.bootstrap_peers[..],
|
||||
config.bootstrap_peers,
|
||||
config.consul_host,
|
||||
config.consul_service_name,
|
||||
);
|
||||
|
|
|
@ -147,7 +147,9 @@ impl Entry<String, String> for Object {
|
|||
&self.key
|
||||
}
|
||||
fn is_tombstone(&self) -> bool {
|
||||
self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
|
||||
self.versions.len() == 1
|
||||
&& self.versions[0].state
|
||||
== ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,13 +11,13 @@ use futures::future::join_all;
|
|||
use futures::select;
|
||||
use futures_util::future::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::sync::watch;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use garage_util::background::BackgroundRunner;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error;
|
||||
use garage_util::persister::Persister;
|
||||
use garage_util::time::*;
|
||||
|
||||
use crate::consul::get_consul_nodes;
|
||||
|
@ -26,7 +26,7 @@ use crate::rpc_client::*;
|
|||
use crate::rpc_server::*;
|
||||
|
||||
const PING_INTERVAL: Duration = Duration::from_secs(10);
|
||||
const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
|
||||
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
|
||||
const PING_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
|
||||
|
||||
|
@ -69,7 +69,8 @@ pub struct AdvertisedNode {
|
|||
pub struct System {
|
||||
pub id: UUID,
|
||||
|
||||
metadata_dir: PathBuf,
|
||||
persist_config: Persister<NetworkConfig>,
|
||||
persist_status: Persister<Vec<AdvertisedNode>>,
|
||||
rpc_local_port: u16,
|
||||
|
||||
state_info: StateInfo,
|
||||
|
@ -80,11 +81,16 @@ pub struct System {
|
|||
pub(crate) status: watch::Receiver<Arc<Status>>,
|
||||
pub ring: watch::Receiver<Arc<Ring>>,
|
||||
|
||||
update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,
|
||||
update_lock: Mutex<Updaters>,
|
||||
|
||||
pub background: Arc<BackgroundRunner>,
|
||||
}
|
||||
|
||||
struct Updaters {
|
||||
update_status: watch::Sender<Arc<Status>>,
|
||||
update_ring: watch::Sender<Arc<Ring>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Status {
|
||||
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
|
||||
|
@ -144,6 +150,25 @@ impl Status {
|
|||
debug!("END --");
|
||||
self.hash = blake2sum(nodes_txt.as_bytes());
|
||||
}
|
||||
|
||||
fn to_serializable_membership(&self, system: &System) -> Vec<AdvertisedNode> {
|
||||
let mut mem = vec![];
|
||||
for (node, status) in self.nodes.iter() {
|
||||
let state_info = if *node == system.id {
|
||||
system.state_info.clone()
|
||||
} else {
|
||||
status.state_info.clone()
|
||||
};
|
||||
mem.push(AdvertisedNode {
|
||||
id: *node,
|
||||
addr: status.addr,
|
||||
is_up: status.is_up(),
|
||||
last_seen: status.last_seen,
|
||||
state_info,
|
||||
});
|
||||
}
|
||||
mem
|
||||
}
|
||||
}
|
||||
|
||||
fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
|
||||
|
@ -169,23 +194,6 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
|
|||
}
|
||||
}
|
||||
|
||||
fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
|
||||
let mut path = metadata_dir.clone();
|
||||
path.push("network_config");
|
||||
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(path.as_path())?;
|
||||
|
||||
let mut net_config_bytes = vec![];
|
||||
file.read_to_end(&mut net_config_bytes)?;
|
||||
|
||||
let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])
|
||||
.expect("Unable to parse network configuration file (has version format changed?).");
|
||||
|
||||
Ok(net_config)
|
||||
}
|
||||
|
||||
impl System {
|
||||
pub fn new(
|
||||
metadata_dir: PathBuf,
|
||||
|
@ -196,7 +204,10 @@ impl System {
|
|||
let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
|
||||
info!("Node ID: {}", hex::encode(&id));
|
||||
|
||||
let net_config = match read_network_config(&metadata_dir) {
|
||||
let persist_config = Persister::new(&metadata_dir, "network_config");
|
||||
let persist_status = Persister::new(&metadata_dir, "peer_info");
|
||||
|
||||
let net_config = match persist_config.load() {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
info!(
|
||||
|
@ -206,6 +217,7 @@ impl System {
|
|||
NetworkConfig::new()
|
||||
}
|
||||
};
|
||||
|
||||
let mut status = Status {
|
||||
nodes: HashMap::new(),
|
||||
hash: Hash::default(),
|
||||
|
@ -231,14 +243,18 @@ impl System {
|
|||
|
||||
let sys = Arc::new(System {
|
||||
id,
|
||||
metadata_dir,
|
||||
persist_config,
|
||||
persist_status,
|
||||
rpc_local_port: rpc_server.bind_addr.port(),
|
||||
state_info,
|
||||
rpc_http_client,
|
||||
rpc_client,
|
||||
status,
|
||||
ring,
|
||||
update_lock: Mutex::new((update_status, update_ring)),
|
||||
update_lock: Mutex::new(Updaters {
|
||||
update_status,
|
||||
update_ring,
|
||||
}),
|
||||
background,
|
||||
});
|
||||
sys.clone().register_handler(rpc_server, rpc_path);
|
||||
|
@ -272,14 +288,11 @@ impl System {
|
|||
}
|
||||
|
||||
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
|
||||
let mut path = self.metadata_dir.clone();
|
||||
path.push("network_config");
|
||||
|
||||
let ring = self.ring.borrow().clone();
|
||||
let data = rmp_to_vec_all_named(&ring.config)?;
|
||||
|
||||
let mut f = tokio::fs::File::create(path.as_path()).await?;
|
||||
f.write_all(&data[..]).await?;
|
||||
self.persist_config
|
||||
.save_async(&ring.config)
|
||||
.await
|
||||
.expect("Cannot save current cluster configuration");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -308,26 +321,21 @@ impl System {
|
|||
|
||||
pub async fn bootstrap(
|
||||
self: Arc<Self>,
|
||||
peers: &[SocketAddr],
|
||||
peers: Vec<SocketAddr>,
|
||||
consul_host: Option<String>,
|
||||
consul_service_name: Option<String>,
|
||||
) {
|
||||
let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>();
|
||||
self.clone().ping_nodes(bootstrap_peers).await;
|
||||
let self2 = self.clone();
|
||||
self.background
|
||||
.spawn_worker(format!("discovery loop"), |stop_signal| {
|
||||
self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal)
|
||||
});
|
||||
|
||||
let self2 = self.clone();
|
||||
self.background
|
||||
.spawn_worker(format!("ping loop"), |stop_signal| {
|
||||
self2.ping_loop(stop_signal)
|
||||
});
|
||||
|
||||
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
|
||||
let self2 = self.clone();
|
||||
self.background
|
||||
.spawn_worker(format!("Consul loop"), |stop_signal| {
|
||||
self2.consul_loop(stop_signal, consul_host, consul_service_name)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
|
||||
|
@ -394,9 +402,7 @@ impl System {
|
|||
if has_changes {
|
||||
status.recalculate_hash();
|
||||
}
|
||||
if let Err(e) = update_locked.0.send(Arc::new(status)) {
|
||||
error!("In ping_nodes: could not save status update ({})", e);
|
||||
}
|
||||
self.update_status(&update_locked, status).await;
|
||||
drop(update_locked);
|
||||
|
||||
if to_advertise.len() > 0 {
|
||||
|
@ -420,7 +426,7 @@ impl System {
|
|||
let status_hash = status.hash;
|
||||
let config_version = self.ring.borrow().config.version;
|
||||
|
||||
update_locked.0.send(Arc::new(status))?;
|
||||
self.update_status(&update_locked, status).await;
|
||||
drop(update_locked);
|
||||
|
||||
if is_new || status_hash != ping.status_hash {
|
||||
|
@ -436,23 +442,9 @@ impl System {
|
|||
}
|
||||
|
||||
fn handle_pull_status(&self) -> Result<Message, Error> {
|
||||
let status = self.status.borrow().clone();
|
||||
let mut mem = vec![];
|
||||
for (node, status) in status.nodes.iter() {
|
||||
let state_info = if *node == self.id {
|
||||
self.state_info.clone()
|
||||
} else {
|
||||
status.state_info.clone()
|
||||
};
|
||||
mem.push(AdvertisedNode {
|
||||
id: *node,
|
||||
addr: status.addr,
|
||||
is_up: status.is_up(),
|
||||
last_seen: status.last_seen,
|
||||
state_info,
|
||||
});
|
||||
}
|
||||
Ok(Message::AdvertiseNodesUp(mem))
|
||||
Ok(Message::AdvertiseNodesUp(
|
||||
self.status.borrow().to_serializable_membership(self),
|
||||
))
|
||||
}
|
||||
|
||||
fn handle_pull_config(&self) -> Result<Message, Error> {
|
||||
|
@ -502,7 +494,7 @@ impl System {
|
|||
if has_changed {
|
||||
status.recalculate_hash();
|
||||
}
|
||||
update_lock.0.send(Arc::new(status))?;
|
||||
self.update_status(&update_lock, status).await;
|
||||
drop(update_lock);
|
||||
|
||||
if to_ping.len() > 0 {
|
||||
|
@ -522,7 +514,7 @@ impl System {
|
|||
|
||||
if adv.version > ring.config.version {
|
||||
let ring = Ring::new(adv.clone());
|
||||
update_lock.1.send(Arc::new(ring))?;
|
||||
update_lock.update_ring.send(Arc::new(ring))?;
|
||||
drop(update_lock);
|
||||
|
||||
self.background.spawn_cancellable(
|
||||
|
@ -537,7 +529,7 @@ impl System {
|
|||
}
|
||||
|
||||
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
|
||||
loop {
|
||||
while !*stop_signal.borrow() {
|
||||
let restart_at = tokio::time::sleep(PING_INTERVAL);
|
||||
|
||||
let status = self.status.borrow().clone();
|
||||
|
@ -552,34 +544,64 @@ impl System {
|
|||
|
||||
select! {
|
||||
_ = restart_at.fuse() => (),
|
||||
_ = stop_signal.changed().fuse() => {
|
||||
if *stop_signal.borrow() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
_ = stop_signal.changed().fuse() => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn consul_loop(
|
||||
async fn discovery_loop(
|
||||
self: Arc<Self>,
|
||||
bootstrap_peers: Vec<SocketAddr>,
|
||||
consul_host: Option<String>,
|
||||
consul_service_name: Option<String>,
|
||||
mut stop_signal: watch::Receiver<bool>,
|
||||
consul_host: String,
|
||||
consul_service_name: String,
|
||||
) {
|
||||
while !*stop_signal.borrow() {
|
||||
let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
|
||||
let consul_config = match (consul_host, consul_service_name) {
|
||||
(Some(ch), Some(csn)) => Some((ch, csn)),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
match get_consul_nodes(&consul_host, &consul_service_name).await {
|
||||
Ok(mut node_list) => {
|
||||
let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
|
||||
self.clone().ping_nodes(ping_addrs).await;
|
||||
while !*stop_signal.borrow() {
|
||||
let not_configured = self.ring.borrow().config.members.len() == 0;
|
||||
let no_peers = self.status.borrow().nodes.len() < 3;
|
||||
let bad_peers = self
|
||||
.status
|
||||
.borrow()
|
||||
.nodes
|
||||
.iter()
|
||||
.filter(|(_, v)| v.is_up())
|
||||
.count() != self.ring.borrow().config.members.len();
|
||||
|
||||
if not_configured || no_peers || bad_peers {
|
||||
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
|
||||
|
||||
let mut ping_list = bootstrap_peers
|
||||
.iter()
|
||||
.map(|ip| (*ip, None))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
match self.persist_status.load_async().await {
|
||||
Ok(peers) => {
|
||||
ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
if let Some((consul_host, consul_service_name)) = &consul_config {
|
||||
match get_consul_nodes(consul_host, consul_service_name).await {
|
||||
Ok(node_list) => {
|
||||
ping_list.extend(node_list.iter().map(|a| (*a, None)));
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Could not retrieve node list from Consul: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.clone().ping_nodes(ping_list).await;
|
||||
}
|
||||
|
||||
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
|
||||
select! {
|
||||
_ = restart_at.fuse() => (),
|
||||
_ = stop_signal.changed().fuse() => (),
|
||||
|
@ -611,4 +633,35 @@ impl System {
|
|||
let _: Result<_, _> = self.handle_advertise_config(&config).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) {
|
||||
if status.hash != self.status.borrow().hash {
|
||||
let mut list = status.to_serializable_membership(&self);
|
||||
|
||||
// Combine with old peer list to make sure no peer is lost
|
||||
match self.persist_status.load_async().await {
|
||||
Ok(old_list) => {
|
||||
for pp in old_list {
|
||||
if !list.iter().any(|np| pp.id == np.id) {
|
||||
list.push(pp);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
if list.len() > 0 {
|
||||
info!("Persisting new peer list ({} peers)", list.len());
|
||||
self.persist_status
|
||||
.save_async(&list)
|
||||
.await
|
||||
.expect("Unable to persist peer list");
|
||||
}
|
||||
}
|
||||
|
||||
updaters
|
||||
.update_status
|
||||
.send(Arc::new(status))
|
||||
.expect("Could not update internal membership status");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -128,7 +128,6 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
|||
{
|
||||
Err(rpc_error) => {
|
||||
node_status.num_failures.fetch_add(1, Ordering::SeqCst);
|
||||
// TODO: Save failure info somewhere
|
||||
Err(Error::from(rpc_error))
|
||||
}
|
||||
Ok(x) => x,
|
||||
|
|
|
@ -35,7 +35,13 @@ where
|
|||
F: TableSchema,
|
||||
R: TableReplication,
|
||||
{
|
||||
pub fn new(system: Arc<System>, name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
|
||||
pub fn new(
|
||||
system: Arc<System>,
|
||||
name: String,
|
||||
instance: F,
|
||||
replication: R,
|
||||
db: &sled::Db,
|
||||
) -> Arc<Self> {
|
||||
let store = db
|
||||
.open_tree(&format!("{}:table", name))
|
||||
.expect("Unable to open DB tree");
|
||||
|
|
|
@ -157,7 +157,12 @@ where
|
|||
if errs.is_empty() {
|
||||
Ok(true)
|
||||
} else {
|
||||
Err(Error::Message(errs.into_iter().map(|x| format!("{}", x)).collect::<Vec<_>>().join(", ")))
|
||||
Err(Error::Message(
|
||||
errs.into_iter()
|
||||
.map(|x| format!("{}", x))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", "),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -200,12 +200,13 @@ where
|
|||
let subnode = self.read_node_txn(tx, &key_sub)?;
|
||||
match subnode {
|
||||
MerkleNode::Empty => {
|
||||
warn!("({}) Single subnode in tree is empty Merkle node", self.data.name);
|
||||
warn!(
|
||||
"({}) Single subnode in tree is empty Merkle node",
|
||||
self.data.name
|
||||
);
|
||||
Some(MerkleNode::Empty)
|
||||
}
|
||||
MerkleNode::Intermediate(_) => {
|
||||
Some(MerkleNode::Intermediate(children))
|
||||
}
|
||||
MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
|
||||
x @ MerkleNode::Leaf(_, _) => {
|
||||
tx.remove(key_sub.encode())?;
|
||||
Some(x)
|
||||
|
@ -239,14 +240,24 @@ where
|
|||
|
||||
{
|
||||
let exlf_subkey = key.next_key(&exlf_khash);
|
||||
let exlf_sub_hash = self.update_item_rec(tx, &exlf_k[..], &exlf_khash, &exlf_subkey, Some(exlf_vhash))?.unwrap();
|
||||
let exlf_sub_hash = self
|
||||
.update_item_rec(
|
||||
tx,
|
||||
&exlf_k[..],
|
||||
&exlf_khash,
|
||||
&exlf_subkey,
|
||||
Some(exlf_vhash),
|
||||
)?
|
||||
.unwrap();
|
||||
intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash);
|
||||
assert_eq!(int.len(), 1);
|
||||
}
|
||||
|
||||
{
|
||||
let key2 = key.next_key(khash);
|
||||
let subhash = self.update_item_rec(tx, k, khash, &key2, new_vhash)?.unwrap();
|
||||
let subhash = self
|
||||
.update_item_rec(tx, k, khash, &key2, new_vhash)?
|
||||
.unwrap();
|
||||
intermediate_set_child(&mut int, key2.prefix[i], subhash);
|
||||
if exlf_khash.as_slice()[i] == khash.as_slice()[i] {
|
||||
assert_eq!(int.len(), 1);
|
||||
|
|
|
@ -5,4 +5,5 @@ pub mod background;
|
|||
pub mod config;
|
||||
pub mod data;
|
||||
pub mod error;
|
||||
pub mod persister;
|
||||
pub mod time;
|
||||
|
|
72
src/util/persister.rs
Normal file
72
src/util/persister.rs
Normal file
|
@ -0,0 +1,72 @@
|
|||
use std::io::{Read, Write};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::data::*;
|
||||
use crate::error::Error;
|
||||
|
||||
pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
|
||||
path: PathBuf,
|
||||
|
||||
_marker: std::marker::PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T> Persister<T>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de>,
|
||||
{
|
||||
pub fn new(base_dir: &PathBuf, file_name: &str) -> Self {
|
||||
let mut path = base_dir.clone();
|
||||
path.push(file_name);
|
||||
Self {
|
||||
path,
|
||||
_marker: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load(&self) -> Result<T, Error> {
|
||||
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
|
||||
|
||||
let mut bytes = vec![];
|
||||
file.read_to_end(&mut bytes)?;
|
||||
|
||||
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
pub fn save(&self, t: &T) -> Result<(), Error> {
|
||||
let bytes = rmp_to_vec_all_named(t)?;
|
||||
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.open(&self.path)?;
|
||||
|
||||
file.write_all(&bytes[..])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn load_async(&self) -> Result<T, Error> {
|
||||
let mut file = tokio::fs::File::open(&self.path).await?;
|
||||
|
||||
let mut bytes = vec![];
|
||||
file.read_to_end(&mut bytes).await?;
|
||||
|
||||
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
|
||||
let bytes = rmp_to_vec_all_named(t)?;
|
||||
|
||||
let mut file = tokio::fs::File::create(&self.path).await?;
|
||||
file.write_all(&bytes[..]).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue