Compare commits
26 commits
main
...
better_boo
Author | SHA1 | Date | |
---|---|---|---|
ab67bd88de | |||
0f192a96b5 | |||
7b85056942 | |||
7fd1f9a869 | |||
c5d8dc7d6d | |||
fa11cb746a | |||
f11bd80d2a | |||
595dc0ed0d | |||
78eeaab5ed | |||
22fbb3b892 | |||
0eb5baea1a | |||
7d772737a5 | |||
4c2d8f5a96 | |||
0325086dac | |||
700925263f | |||
6c83f66700 | |||
55e4a93bad | |||
|
9cb9945131 | ||
7f9c1d5595 | |||
991cf1b818 | |||
e26cb2640d | |||
22041e924b | |||
|
b119e9d3c4 | ||
3e6534d7a8 | |||
2dae4a25d6 | |||
fcd566e89d |
17 changed files with 404 additions and 133 deletions
12
.drone.yml
12
.drone.yml
|
@ -1,3 +1,4 @@
|
|||
---
|
||||
kind: pipeline
|
||||
name: default
|
||||
|
||||
|
@ -35,7 +36,9 @@ steps:
|
|||
commands:
|
||||
- apt-get update
|
||||
- apt-get install --yes libsodium-dev
|
||||
- rustup component add rustfmt
|
||||
- pwd
|
||||
- cargo fmt -- --check
|
||||
- cargo build
|
||||
|
||||
- name: cargo-test
|
||||
|
@ -106,6 +109,15 @@ steps:
|
|||
endpoint: https://garage.deuxfleurs.fr
|
||||
region: garage
|
||||
when:
|
||||
event:
|
||||
- push
|
||||
branch:
|
||||
- main
|
||||
repo:
|
||||
- Deuxfleurs/garage
|
||||
|
||||
---
|
||||
kind: signature
|
||||
hmac: bfe75f47e5eecdd1f6dd8fd3cf1ea359b0215243d06ac767c51a4b4e363e963e
|
||||
|
||||
...
|
||||
|
|
2
Makefile
2
Makefile
|
@ -2,7 +2,7 @@ BIN=target/release/garage
|
|||
DOCKER=lxpz/garage_amd64
|
||||
|
||||
all:
|
||||
clear; RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --no-default-features
|
||||
clear; cargo build
|
||||
|
||||
$(BIN):
|
||||
RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --release --no-default-features
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
Garage [![Build Status](https://drone.deuxfleurs.fr/api/badges/Deuxfleurs/garage/status.svg)](https://drone.deuxfleurs.fr/Deuxfleurs/garage)
|
||||
Garage [![Build Status](https://drone.deuxfleurs.fr/api/badges/Deuxfleurs/garage/status.svg?ref=refs/heads/main)](https://drone.deuxfleurs.fr/Deuxfleurs/garage)
|
||||
===
|
||||
|
||||
<p align="center" style="text-align:center;">
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
- [Host a website](./cookbook/website.md)
|
||||
- [Integrate as a media backend]()
|
||||
- [Operate a cluster]()
|
||||
- [Recovering from failures](./cookbook/recovering.md)
|
||||
|
||||
- [Reference Manual](./reference_manual/index.md)
|
||||
- [Garage CLI]()
|
||||
|
|
99
doc/book/src/cookbook/recovering.md
Normal file
99
doc/book/src/cookbook/recovering.md
Normal file
|
@ -0,0 +1,99 @@
|
|||
# Recovering from failures
|
||||
|
||||
Garage is meant to work on old, second-hand hardware.
|
||||
In particular, this makes it likely that some of your drives will fail, and some manual intervention will be needed.
|
||||
Fear not! For Garage is fully equipped to handle drive failures, in most common cases.
|
||||
|
||||
## A note on availability of Garage
|
||||
|
||||
With nodes dispersed in 3 datacenters or more, here are the guarantees Garage provides with the default replication strategy (3 copies of all data, which is the recommended value):
|
||||
|
||||
- The cluster remains fully functionnal as long as the machines that fail are in only one datacenter. This includes a whole datacenter going down due to power/Internet outage.
|
||||
- No data is lost as long as the machines that fail are in at most two datacenters.
|
||||
|
||||
Of course this only works if your Garage nodes are correctly configured to be aware of the datacenter in which they are located.
|
||||
Make sure this is the case using `garage status` to check on the state of your cluster's configuration.
|
||||
|
||||
|
||||
## First option: removing a node
|
||||
|
||||
If you don't have spare parts (HDD, SDD) to replace the failed component, and if there are enough remaining nodes in your cluster
|
||||
(at least 3), you can simply remove the failed node from Garage's configuration.
|
||||
Note that if you **do** intend to replace the failed parts by new ones, using this method followed by adding back the node is **not recommended** (although it should work),
|
||||
and you should instead use one of the methods detailed in the next sections.
|
||||
|
||||
Removing a node is done with the following command:
|
||||
|
||||
```
|
||||
garage node remove --yes <node_id>
|
||||
```
|
||||
|
||||
(you can get the `node_id` of the failed node by running `garage status`)
|
||||
|
||||
This will repartition the data and ensure that 3 copies of everything are present on the nodes that remain available.
|
||||
|
||||
|
||||
|
||||
## Replacement scenario 1: only data is lost, metadata is fine
|
||||
|
||||
The recommended deployment for Garage uses an SSD to store metadata, and an HDD to store blocks of data.
|
||||
In the case where only a single HDD crashes, the blocks of data are lost but the metadata is still fine.
|
||||
|
||||
This is very easy to recover by setting up a new HDD to replace the failed one.
|
||||
The node does not need to be fully replaced and the configuration doesn't need to change.
|
||||
We just need to tell Garage to get back all the data blocks and store them on the new HDD.
|
||||
|
||||
First, set up a new HDD to store Garage's data directory on the failed node, and restart Garage using
|
||||
the existing configuration. Then, run:
|
||||
|
||||
```
|
||||
garage repair -a --yes blocks
|
||||
```
|
||||
|
||||
This will re-synchronize blocks of data that are missing to the new HDD, reading them from copies located on other nodes.
|
||||
|
||||
You can check on the advancement of this process by doing the following command:
|
||||
|
||||
```
|
||||
garage stats -a
|
||||
```
|
||||
|
||||
Look out for the following output:
|
||||
|
||||
```
|
||||
Block manager stats:
|
||||
resync queue length: 26541
|
||||
```
|
||||
|
||||
This indicates that one of the Garage node is in the process of retrieving missing data from other nodes.
|
||||
This number decreases to zero when the node is fully synchronized.
|
||||
|
||||
|
||||
## Replacement scenario 2: metadata (and possibly data) is lost
|
||||
|
||||
This scenario covers the case where a full node fails, i.e. both the metadata directory and
|
||||
the data directory are lost, as well as the case where only the metadata directory is lost.
|
||||
|
||||
To replace the lost node, we will start from an empty metadata directory, which means
|
||||
Garage will generate a new node ID for the replacement node.
|
||||
We will thus need to remove the previous node ID from Garage's configuration and replace it by the ID of the new node.
|
||||
|
||||
If your data directory is stored on a separate drive and is still fine, you can keep it, but it is not necessary to do so.
|
||||
In all cases, the data will be rebalanced and the replacement node will not store the same pieces of data
|
||||
as were originally stored on the one that failed. So if you keep the data files, the rebalancing
|
||||
might be faster but most of the pieces will be deleted anyway from the disk and replaced by other ones.
|
||||
|
||||
First, set up a new drive to store the metadata directory for the replacement node (a SSD is recommended),
|
||||
and for the data directory if necessary. You can then start Garage on the new node.
|
||||
The restarted node should generate a new node ID, and it should be shown as `NOT CONFIGURED` in `garage status`.
|
||||
The ID of the lost node should be shown in `garage status` in the section for disconnected/unavailable nodes.
|
||||
|
||||
Then, replace the broken node by the new one, using:
|
||||
|
||||
```
|
||||
garage node configure --replace <old_node_id> \
|
||||
-c <capacity> -d <datacenter> -t <node_tag> <new_node_id>
|
||||
```
|
||||
|
||||
Garage will then start synchronizing all required data on the new node.
|
||||
This process can be monitored using the `garage stats -a` command.
|
|
@ -124,8 +124,8 @@ For our example, we will suppose the following infrastructure:
|
|||
|----------|---------|------------|------------|
|
||||
| Paris | Mercury | fc00:1::1 | 1 To |
|
||||
| Paris | Venus | fc00:1::2 | 2 To |
|
||||
| London | Earth | fc00:1::2 | 2 To |
|
||||
| Brussels | Mars | fc00:B::1 | 1.5 To |
|
||||
| London | Earth | fc00:B::1 | 2 To |
|
||||
| Brussels | Mars | fc00:F::1 | 1.5 To |
|
||||
|
||||
On each machine, we will have a similar setup, especially you must consider the following folders/files:
|
||||
- `/etc/garage/pki`: Garage certificates, must be generated on your computer and copied on the servers
|
||||
|
|
|
@ -4,63 +4,68 @@
|
|||
</a>
|
||||
</p>
|
||||
|
||||
```
|
||||
This very website is hosted using Garage. In other words: the doc is the PoC!
|
||||
```
|
||||
|
||||
# The Garage Geo-Distributed Data Store
|
||||
|
||||
Garage is a lightweight geo-distributed data store.
|
||||
It comes from the observation that despite numerous object stores
|
||||
many people have broken data management policies (backup/replication on a single site or none at all).
|
||||
To promote better data management policies, with focused on the following desirable properties:
|
||||
To promote better data management policies, we focused on the following **desirable properties**:
|
||||
|
||||
- **Self-contained & lightweight**: works everywhere and integrates well in existing environments to target hyperconverged infrastructures
|
||||
- **Highly resilient**: highly resilient to network failures, network latency, disk failures, sysadmin failures
|
||||
- **Simple**: simple to understand, simple to operate, simple to debug
|
||||
- **Internet enabled**: made for multi-sites (eg. datacenter, offices, etc.) interconnected through a regular internet connection.
|
||||
- **Self-contained & lightweight**: works everywhere and integrates well in existing environments to target [hyperconverged infrastructures](https://en.wikipedia.org/wiki/Hyper-converged_infrastructure).
|
||||
- **Highly resilient**: highly resilient to network failures, network latency, disk failures, sysadmin failures.
|
||||
- **Simple**: simple to understand, simple to operate, simple to debug.
|
||||
- **Internet enabled**: made for multi-sites (eg. datacenters, offices, households, etc.) interconnected through regular Internet connections.
|
||||
|
||||
We also noted that the pursuit of some other goals are detrimental to our initial goals.
|
||||
The following have been identified has non-goals, if it matters to you, you should not use Garage:
|
||||
The following has been identified as **non-goals** (if these points matter to you, you should not use Garage):
|
||||
|
||||
- **Extreme performances**: high performances constrain a lot the design and the infrastructure; we seek performances through minimalism only.
|
||||
- **Feature extensiveness**: complete implementation of the S3 API or any other API to make garage a drop-in replacement is not targeted as it could lead to decisions impacting our desirable properties.
|
||||
- **Storage optimizations**: erasure coding or any other coding technique both increase the difficulty of placing data and synchronizing; we limit ourselves to duplication.
|
||||
- **POSIX/Filesystem compatibility**: we do not aim at being POSIX compatible or to emulate any kind of filesystem. Indeed, in a distributed environment, such syncronizations are translated in network messages that impose severe constraints on the deployment.
|
||||
- **POSIX/Filesystem compatibility**: we do not aim at being POSIX compatible or to emulate any kind of filesystem. Indeed, in a distributed environment, such synchronizations are translated in network messages that impose severe constraints on the deployment.
|
||||
|
||||
## Supported and planned protocols
|
||||
|
||||
Garage speaks (or will speak) the following protocols:
|
||||
|
||||
- [S3](https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html) - *SUPPORTED* - Enable applications to store large blobs such as pictures, video, images, documents, etc. S3 is versatile enough to also be used to publish a static website.
|
||||
- [IMAP](https://github.com/go-pluto/pluto) - *PLANNED* - email storage is quite complex to get good oerformances.
|
||||
To keep performances optimals, most imap servers only support on-disk storage.
|
||||
- [IMAP](https://github.com/go-pluto/pluto) - *PLANNED* - email storage is quite complex to get good performances.
|
||||
To keep performances optimal, most IMAP servers only support on-disk storage.
|
||||
We plan to add logic to Garage to make it a viable solution for email storage.
|
||||
- *More to come*
|
||||
|
||||
## Use Cases
|
||||
|
||||
**[Deuxfleurs](https://deuxfleurs.fr) :** Garage is used by Deuxfleurs which is a non-profit hosting organization.
|
||||
Especially, it is used to host their main website, this documentation and some of its members's blogs. Additionally,
|
||||
Garage is used as a [backend for Nextcloud](https://docs.nextcloud.com/server/20/admin_manual/configuration_files/primary_storage.html). Deuxfleurs also plans to use Garage as their [Matrix's media backend](https://github.com/matrix-org/synapse-s3-storage-provider) and has the backend of [OCIS](https://github.com/owncloud/ocis).
|
||||
**[Deuxfleurs](https://deuxfleurs.fr):** Garage is used by Deuxfleurs which is a non-profit hosting organization.
|
||||
Especially, it is used to host their main website, this documentation and some of its members' blogs.
|
||||
Additionally, Garage is used as a [backend for Nextcloud](https://docs.nextcloud.com/server/20/admin_manual/configuration_files/primary_storage.html).
|
||||
Deuxfleurs also plans to use Garage as their [Matrix's media backend](https://github.com/matrix-org/synapse-s3-storage-provider) and as the backend of [OCIS](https://github.com/owncloud/ocis).
|
||||
|
||||
*Are you using Garage? [Open a pull request](https://git.deuxfleurs.fr/Deuxfleurs/garage/) to add your organization here!*
|
||||
|
||||
## Comparison to existing software
|
||||
|
||||
**[Minio](https://min.io/) :** Minio shares our *self-contained & lightweight* goal but selected two of our non-goals: *storage optimizations* through erasure coding and *POSIX/Filesystem compatibility* through strong consistency.
|
||||
However, by pursuing these two non-goals, minio do not reach our desirable properties.
|
||||
First, it fails on the *simple* property: due to the erasure coding, minio has severe limitations on how drives can be added or deleted from a cluster.
|
||||
Second, it fails on the *interned enabled* property: due to its strong consistency, minio is latency sensitive.
|
||||
Furthermore, minio has no knowledge of "sites" and thus can not distribute data to minimize the failure of a given site.
|
||||
**[MinIO](https://min.io/):** MinIO shares our *Self-contained & lightweight* goal but selected two of our non-goals: *Storage optimizations* through erasure coding and *POSIX/Filesystem compatibility* through strong consistency.
|
||||
However, by pursuing these two non-goals, MinIO do not reach our desirable properties.
|
||||
Firstly, it fails on the *Simple* property: due to the erasure coding, MinIO has severe limitations on how drives can be added or deleted from a cluster.
|
||||
Secondly, it fails on the *Internet enabled* property: due to its strong consistency, MinIO is latency sensitive.
|
||||
Furthermore, MinIO has no knowledge of "sites" and thus can not distribute data to minimize the failure of a given site.
|
||||
|
||||
**[Openstack Swift](https://docs.openstack.org/swift/latest/) :**
|
||||
OpenStack Swift at least fails on the *self-contained & lightweight* goal.
|
||||
Starting it requires around 8Gb of RAM, which is too much especially in an hyperconverged infrastructure.
|
||||
It seems also to be far from *Simple*.
|
||||
**[Openstack Swift](https://docs.openstack.org/swift/latest/):**
|
||||
OpenStack Swift at least fails on the *Self-contained & lightweight* goal.
|
||||
Starting it requires around 8GB of RAM, which is too much especially in an hyperconverged infrastructure.
|
||||
We also do not classify Swift as *Simple*.
|
||||
|
||||
**[Ceph](https://ceph.io/ceph-storage/object-storage/) :**
|
||||
**[Ceph](https://ceph.io/ceph-storage/object-storage/):**
|
||||
This review holds for the whole Ceph stack, including the RADOS paper, Ceph Object Storage module, the RADOS Gateway, etc.
|
||||
At is core, Ceph has been designed to provide *POSIX/Filesystem compatibility* which requires strong consistency, which in turn
|
||||
makes Ceph latency sensitive and fails our *Internet enabled* goal.
|
||||
Due to its industry oriented design, Ceph is also far from being *Simple* to operate and from being *self-contained & lightweight* which makes it hard to integrate it in an hyperconverged infrastructure.
|
||||
In a certain way, Ceph and Minio are closer togethers than they are from Garage or OpenStack Swift.
|
||||
At its core, Ceph has been designed to provide *POSIX/Filesystem compatibility* which requires strong consistency, which in turn
|
||||
makes Ceph latency-sensitive and fails our *Internet enabled* goal.
|
||||
Due to its industry oriented design, Ceph is also far from being *Simple* to operate and from being *Self-contained & lightweight* which makes it hard to integrate it in an hyperconverged infrastructure.
|
||||
In a certain way, Ceph and MinIO are closer together than they are from Garage or OpenStack Swift.
|
||||
|
||||
*More comparisons are available in our [Related Work](design/related_work.md) chapter.*
|
||||
|
||||
|
@ -71,29 +76,29 @@ We reference here other places on the Internet where you can learn more about Ga
|
|||
|
||||
### Rust API (docs.rs)
|
||||
|
||||
If you encounter a specific bug in Garage or plan to patch it, you may jump directly to the source code documentation!
|
||||
If you encounter a specific bug in Garage or plan to patch it, you may jump directly to the source code's documentation!
|
||||
|
||||
- [garage\_api](https://docs.rs/garage_api/latest/garage_api/) - contains the S3 standard API endpoint
|
||||
- [garage\_model](https://docs.rs/garage_model/latest/garage_model/) - contains Garage's model built on the table abstraction
|
||||
- [garage\_rpc](https://docs.rs/garage_rpc/latest/garage_rpc/) - contains Garage's federation protocol
|
||||
- [garage\_table](https://docs.rs/garage_table/latest/garage_table/) - contains core Garage's CRDT datatypes
|
||||
- [garage\_util](https://docs.rs/garage_util/latest/garage_util/) - contains garage entrypoints (daemon, cli)
|
||||
- [garage\_util](https://docs.rs/garage_util/latest/garage_util/) - contains garage helpers
|
||||
- [garage\_web](https://docs.rs/garage_web/latest/garage_web/) - contains the S3 website endpoint
|
||||
|
||||
### Talks
|
||||
|
||||
We love to talk and hear about Garage, that's why we keep a log here:
|
||||
|
||||
- [(fr, 2020-12-02) Garage : jouer dans la cour des grands quand on est un hébergeur associatif](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/master/doc/20201202_talk/talk.pdf)
|
||||
- [(fr, 2020-12-02) Garage : jouer dans la cour des grands quand on est un hébergeur associatif](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/main/doc/20201202_talk/talk.pdf)
|
||||
|
||||
*Did you write or talk about Garage? [Open a pull request](https://git.deuxfleurs.fr/Deuxfleurs/garage/) to add a link here!*
|
||||
|
||||
## Community
|
||||
|
||||
If you want to discuss with us, you can join our Matrix channel at [#garage:deuxfleurs.fr](https://matrix.to/#/#garage:deuxfleurs.fr).
|
||||
Our code and our issue tracker, which is the place where you should report bugs, are managed on [Deuxfleurs' Gitea](https://git.deuxfleurs.fr/Deuxfleurs/garage).
|
||||
Our code repository and issue tracker, which is the place where you should report bugs, is managed on [Deuxfleurs' Gitea](https://git.deuxfleurs.fr/Deuxfleurs/garage).
|
||||
|
||||
## License
|
||||
|
||||
Garage, all the source code, is released under the [AGPL v3 License](https://www.gnu.org/licenses/agpl-3.0.en.html).
|
||||
Garage's source code, is released under the [AGPL v3 License](https://www.gnu.org/licenses/agpl-3.0.en.html).
|
||||
Please note that if you patch Garage and then use it to provide any service over a network, you must share your code!
|
||||
|
|
|
@ -246,15 +246,13 @@ impl AdminRpcHandler {
|
|||
)))
|
||||
}
|
||||
KeyOperation::Import(query) => {
|
||||
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id)
|
||||
.await?;
|
||||
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
|
||||
if prev_key.is_some() {
|
||||
return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
|
||||
}
|
||||
let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
|
||||
self.garage.key_table.insert(&imported_key).await?;
|
||||
Ok(AdminRPC::KeyInfo(imported_key))
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,8 +5,8 @@ use std::path::PathBuf;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use structopt::StructOpt;
|
||||
|
||||
use garage_util::error::Error;
|
||||
use garage_util::data::UUID;
|
||||
use garage_util::error::Error;
|
||||
use garage_util::time::*;
|
||||
|
||||
use garage_rpc::membership::*;
|
||||
|
@ -384,7 +384,10 @@ pub async fn cmd_status(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn find_matching_node(cand: impl std::iter::Iterator<Item=UUID>, pattern: &str) -> Result<UUID, Error> {
|
||||
pub fn find_matching_node(
|
||||
cand: impl std::iter::Iterator<Item = UUID>,
|
||||
pattern: &str,
|
||||
) -> Result<UUID, Error> {
|
||||
let mut candidates = vec![];
|
||||
for c in cand {
|
||||
if hex::encode(&c).starts_with(&pattern) {
|
||||
|
@ -428,7 +431,10 @@ pub async fn cmd_configure(
|
|||
for replaced in args.replace.iter() {
|
||||
let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
|
||||
if config.members.remove(&replaced_node).is_none() {
|
||||
return Err(Error::Message(format!("Cannot replace node {:?} as it is not in current configuration", replaced_node)));
|
||||
return Err(Error::Message(format!(
|
||||
"Cannot replace node {:?} as it is not in current configuration",
|
||||
replaced_node
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
|||
info!("Initializing Garage main data store...");
|
||||
let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
|
||||
let bootstrap = garage.system.clone().bootstrap(
|
||||
&config.bootstrap_peers[..],
|
||||
config.bootstrap_peers,
|
||||
config.consul_host,
|
||||
config.consul_service_name,
|
||||
);
|
||||
|
|
|
@ -147,7 +147,9 @@ impl Entry<String, String> for Object {
|
|||
&self.key
|
||||
}
|
||||
fn is_tombstone(&self) -> bool {
|
||||
self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
|
||||
self.versions.len() == 1
|
||||
&& self.versions[0].state
|
||||
== ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,13 +11,13 @@ use futures::future::join_all;
|
|||
use futures::select;
|
||||
use futures_util::future::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::sync::watch;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use garage_util::background::BackgroundRunner;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error;
|
||||
use garage_util::persister::Persister;
|
||||
use garage_util::time::*;
|
||||
|
||||
use crate::consul::get_consul_nodes;
|
||||
|
@ -26,7 +26,7 @@ use crate::rpc_client::*;
|
|||
use crate::rpc_server::*;
|
||||
|
||||
const PING_INTERVAL: Duration = Duration::from_secs(10);
|
||||
const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
|
||||
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
|
||||
const PING_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
|
||||
|
||||
|
@ -69,7 +69,8 @@ pub struct AdvertisedNode {
|
|||
pub struct System {
|
||||
pub id: UUID,
|
||||
|
||||
metadata_dir: PathBuf,
|
||||
persist_config: Persister<NetworkConfig>,
|
||||
persist_status: Persister<Vec<AdvertisedNode>>,
|
||||
rpc_local_port: u16,
|
||||
|
||||
state_info: StateInfo,
|
||||
|
@ -80,11 +81,16 @@ pub struct System {
|
|||
pub(crate) status: watch::Receiver<Arc<Status>>,
|
||||
pub ring: watch::Receiver<Arc<Ring>>,
|
||||
|
||||
update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,
|
||||
update_lock: Mutex<Updaters>,
|
||||
|
||||
pub background: Arc<BackgroundRunner>,
|
||||
}
|
||||
|
||||
struct Updaters {
|
||||
update_status: watch::Sender<Arc<Status>>,
|
||||
update_ring: watch::Sender<Arc<Ring>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Status {
|
||||
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
|
||||
|
@ -144,6 +150,25 @@ impl Status {
|
|||
debug!("END --");
|
||||
self.hash = blake2sum(nodes_txt.as_bytes());
|
||||
}
|
||||
|
||||
fn to_serializable_membership(&self, system: &System) -> Vec<AdvertisedNode> {
|
||||
let mut mem = vec![];
|
||||
for (node, status) in self.nodes.iter() {
|
||||
let state_info = if *node == system.id {
|
||||
system.state_info.clone()
|
||||
} else {
|
||||
status.state_info.clone()
|
||||
};
|
||||
mem.push(AdvertisedNode {
|
||||
id: *node,
|
||||
addr: status.addr,
|
||||
is_up: status.is_up(),
|
||||
last_seen: status.last_seen,
|
||||
state_info,
|
||||
});
|
||||
}
|
||||
mem
|
||||
}
|
||||
}
|
||||
|
||||
fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
|
||||
|
@ -169,23 +194,6 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
|
|||
}
|
||||
}
|
||||
|
||||
fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
|
||||
let mut path = metadata_dir.clone();
|
||||
path.push("network_config");
|
||||
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(path.as_path())?;
|
||||
|
||||
let mut net_config_bytes = vec![];
|
||||
file.read_to_end(&mut net_config_bytes)?;
|
||||
|
||||
let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])
|
||||
.expect("Unable to parse network configuration file (has version format changed?).");
|
||||
|
||||
Ok(net_config)
|
||||
}
|
||||
|
||||
impl System {
|
||||
pub fn new(
|
||||
metadata_dir: PathBuf,
|
||||
|
@ -196,7 +204,10 @@ impl System {
|
|||
let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
|
||||
info!("Node ID: {}", hex::encode(&id));
|
||||
|
||||
let net_config = match read_network_config(&metadata_dir) {
|
||||
let persist_config = Persister::new(&metadata_dir, "network_config");
|
||||
let persist_status = Persister::new(&metadata_dir, "peer_info");
|
||||
|
||||
let net_config = match persist_config.load() {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
info!(
|
||||
|
@ -206,6 +217,7 @@ impl System {
|
|||
NetworkConfig::new()
|
||||
}
|
||||
};
|
||||
|
||||
let mut status = Status {
|
||||
nodes: HashMap::new(),
|
||||
hash: Hash::default(),
|
||||
|
@ -231,14 +243,18 @@ impl System {
|
|||
|
||||
let sys = Arc::new(System {
|
||||
id,
|
||||
metadata_dir,
|
||||
persist_config,
|
||||
persist_status,
|
||||
rpc_local_port: rpc_server.bind_addr.port(),
|
||||
state_info,
|
||||
rpc_http_client,
|
||||
rpc_client,
|
||||
status,
|
||||
ring,
|
||||
update_lock: Mutex::new((update_status, update_ring)),
|
||||
update_lock: Mutex::new(Updaters {
|
||||
update_status,
|
||||
update_ring,
|
||||
}),
|
||||
background,
|
||||
});
|
||||
sys.clone().register_handler(rpc_server, rpc_path);
|
||||
|
@ -272,14 +288,11 @@ impl System {
|
|||
}
|
||||
|
||||
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
|
||||
let mut path = self.metadata_dir.clone();
|
||||
path.push("network_config");
|
||||
|
||||
let ring = self.ring.borrow().clone();
|
||||
let data = rmp_to_vec_all_named(&ring.config)?;
|
||||
|
||||
let mut f = tokio::fs::File::create(path.as_path()).await?;
|
||||
f.write_all(&data[..]).await?;
|
||||
self.persist_config
|
||||
.save_async(&ring.config)
|
||||
.await
|
||||
.expect("Cannot save current cluster configuration");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -308,26 +321,21 @@ impl System {
|
|||
|
||||
pub async fn bootstrap(
|
||||
self: Arc<Self>,
|
||||
peers: &[SocketAddr],
|
||||
peers: Vec<SocketAddr>,
|
||||
consul_host: Option<String>,
|
||||
consul_service_name: Option<String>,
|
||||
) {
|
||||
let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>();
|
||||
self.clone().ping_nodes(bootstrap_peers).await;
|
||||
let self2 = self.clone();
|
||||
self.background
|
||||
.spawn_worker(format!("discovery loop"), |stop_signal| {
|
||||
self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal)
|
||||
});
|
||||
|
||||
let self2 = self.clone();
|
||||
self.background
|
||||
.spawn_worker(format!("ping loop"), |stop_signal| {
|
||||
self2.ping_loop(stop_signal)
|
||||
});
|
||||
|
||||
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
|
||||
let self2 = self.clone();
|
||||
self.background
|
||||
.spawn_worker(format!("Consul loop"), |stop_signal| {
|
||||
self2.consul_loop(stop_signal, consul_host, consul_service_name)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
|
||||
|
@ -394,9 +402,7 @@ impl System {
|
|||
if has_changes {
|
||||
status.recalculate_hash();
|
||||
}
|
||||
if let Err(e) = update_locked.0.send(Arc::new(status)) {
|
||||
error!("In ping_nodes: could not save status update ({})", e);
|
||||
}
|
||||
self.update_status(&update_locked, status).await;
|
||||
drop(update_locked);
|
||||
|
||||
if to_advertise.len() > 0 {
|
||||
|
@ -420,7 +426,7 @@ impl System {
|
|||
let status_hash = status.hash;
|
||||
let config_version = self.ring.borrow().config.version;
|
||||
|
||||
update_locked.0.send(Arc::new(status))?;
|
||||
self.update_status(&update_locked, status).await;
|
||||
drop(update_locked);
|
||||
|
||||
if is_new || status_hash != ping.status_hash {
|
||||
|
@ -436,23 +442,9 @@ impl System {
|
|||
}
|
||||
|
||||
fn handle_pull_status(&self) -> Result<Message, Error> {
|
||||
let status = self.status.borrow().clone();
|
||||
let mut mem = vec![];
|
||||
for (node, status) in status.nodes.iter() {
|
||||
let state_info = if *node == self.id {
|
||||
self.state_info.clone()
|
||||
} else {
|
||||
status.state_info.clone()
|
||||
};
|
||||
mem.push(AdvertisedNode {
|
||||
id: *node,
|
||||
addr: status.addr,
|
||||
is_up: status.is_up(),
|
||||
last_seen: status.last_seen,
|
||||
state_info,
|
||||
});
|
||||
}
|
||||
Ok(Message::AdvertiseNodesUp(mem))
|
||||
Ok(Message::AdvertiseNodesUp(
|
||||
self.status.borrow().to_serializable_membership(self),
|
||||
))
|
||||
}
|
||||
|
||||
fn handle_pull_config(&self) -> Result<Message, Error> {
|
||||
|
@ -502,7 +494,7 @@ impl System {
|
|||
if has_changed {
|
||||
status.recalculate_hash();
|
||||
}
|
||||
update_lock.0.send(Arc::new(status))?;
|
||||
self.update_status(&update_lock, status).await;
|
||||
drop(update_lock);
|
||||
|
||||
if to_ping.len() > 0 {
|
||||
|
@ -522,7 +514,7 @@ impl System {
|
|||
|
||||
if adv.version > ring.config.version {
|
||||
let ring = Ring::new(adv.clone());
|
||||
update_lock.1.send(Arc::new(ring))?;
|
||||
update_lock.update_ring.send(Arc::new(ring))?;
|
||||
drop(update_lock);
|
||||
|
||||
self.background.spawn_cancellable(
|
||||
|
@ -537,7 +529,7 @@ impl System {
|
|||
}
|
||||
|
||||
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
|
||||
loop {
|
||||
while !*stop_signal.borrow() {
|
||||
let restart_at = tokio::time::sleep(PING_INTERVAL);
|
||||
|
||||
let status = self.status.borrow().clone();
|
||||
|
@ -552,34 +544,64 @@ impl System {
|
|||
|
||||
select! {
|
||||
_ = restart_at.fuse() => (),
|
||||
_ = stop_signal.changed().fuse() => {
|
||||
if *stop_signal.borrow() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
_ = stop_signal.changed().fuse() => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn consul_loop(
|
||||
async fn discovery_loop(
|
||||
self: Arc<Self>,
|
||||
bootstrap_peers: Vec<SocketAddr>,
|
||||
consul_host: Option<String>,
|
||||
consul_service_name: Option<String>,
|
||||
mut stop_signal: watch::Receiver<bool>,
|
||||
consul_host: String,
|
||||
consul_service_name: String,
|
||||
) {
|
||||
while !*stop_signal.borrow() {
|
||||
let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
|
||||
let consul_config = match (consul_host, consul_service_name) {
|
||||
(Some(ch), Some(csn)) => Some((ch, csn)),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
match get_consul_nodes(&consul_host, &consul_service_name).await {
|
||||
Ok(mut node_list) => {
|
||||
let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
|
||||
self.clone().ping_nodes(ping_addrs).await;
|
||||
while !*stop_signal.borrow() {
|
||||
let not_configured = self.ring.borrow().config.members.len() == 0;
|
||||
let no_peers = self.status.borrow().nodes.len() < 3;
|
||||
let bad_peers = self
|
||||
.status
|
||||
.borrow()
|
||||
.nodes
|
||||
.iter()
|
||||
.filter(|(_, v)| v.is_up())
|
||||
.count() != self.ring.borrow().config.members.len();
|
||||
|
||||
if not_configured || no_peers || bad_peers {
|
||||
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
|
||||
|
||||
let mut ping_list = bootstrap_peers
|
||||
.iter()
|
||||
.map(|ip| (*ip, None))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
match self.persist_status.load_async().await {
|
||||
Ok(peers) => {
|
||||
ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Could not retrieve node list from Consul: {}", e);
|
||||
|
||||
if let Some((consul_host, consul_service_name)) = &consul_config {
|
||||
match get_consul_nodes(consul_host, consul_service_name).await {
|
||||
Ok(node_list) => {
|
||||
ping_list.extend(node_list.iter().map(|a| (*a, None)));
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Could not retrieve node list from Consul: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.clone().ping_nodes(ping_list).await;
|
||||
}
|
||||
|
||||
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
|
||||
select! {
|
||||
_ = restart_at.fuse() => (),
|
||||
_ = stop_signal.changed().fuse() => (),
|
||||
|
@ -611,4 +633,35 @@ impl System {
|
|||
let _: Result<_, _> = self.handle_advertise_config(&config).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) {
|
||||
if status.hash != self.status.borrow().hash {
|
||||
let mut list = status.to_serializable_membership(&self);
|
||||
|
||||
// Combine with old peer list to make sure no peer is lost
|
||||
match self.persist_status.load_async().await {
|
||||
Ok(old_list) => {
|
||||
for pp in old_list {
|
||||
if !list.iter().any(|np| pp.id == np.id) {
|
||||
list.push(pp);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
if list.len() > 0 {
|
||||
info!("Persisting new peer list ({} peers)", list.len());
|
||||
self.persist_status
|
||||
.save_async(&list)
|
||||
.await
|
||||
.expect("Unable to persist peer list");
|
||||
}
|
||||
}
|
||||
|
||||
updaters
|
||||
.update_status
|
||||
.send(Arc::new(status))
|
||||
.expect("Could not update internal membership status");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,13 @@ where
|
|||
F: TableSchema,
|
||||
R: TableReplication,
|
||||
{
|
||||
pub fn new(system: Arc<System>, name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
|
||||
pub fn new(
|
||||
system: Arc<System>,
|
||||
name: String,
|
||||
instance: F,
|
||||
replication: R,
|
||||
db: &sled::Db,
|
||||
) -> Arc<Self> {
|
||||
let store = db
|
||||
.open_tree(&format!("{}:table", name))
|
||||
.expect("Unable to open DB tree");
|
||||
|
|
|
@ -157,7 +157,12 @@ where
|
|||
if errs.is_empty() {
|
||||
Ok(true)
|
||||
} else {
|
||||
Err(Error::Message(errs.into_iter().map(|x| format!("{}", x)).collect::<Vec<_>>().join(", ")))
|
||||
Err(Error::Message(
|
||||
errs.into_iter()
|
||||
.map(|x| format!("{}", x))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", "),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -200,12 +200,13 @@ where
|
|||
let subnode = self.read_node_txn(tx, &key_sub)?;
|
||||
match subnode {
|
||||
MerkleNode::Empty => {
|
||||
warn!("({}) Single subnode in tree is empty Merkle node", self.data.name);
|
||||
warn!(
|
||||
"({}) Single subnode in tree is empty Merkle node",
|
||||
self.data.name
|
||||
);
|
||||
Some(MerkleNode::Empty)
|
||||
}
|
||||
MerkleNode::Intermediate(_) => {
|
||||
Some(MerkleNode::Intermediate(children))
|
||||
}
|
||||
MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
|
||||
x @ MerkleNode::Leaf(_, _) => {
|
||||
tx.remove(key_sub.encode())?;
|
||||
Some(x)
|
||||
|
@ -239,14 +240,24 @@ where
|
|||
|
||||
{
|
||||
let exlf_subkey = key.next_key(&exlf_khash);
|
||||
let exlf_sub_hash = self.update_item_rec(tx, &exlf_k[..], &exlf_khash, &exlf_subkey, Some(exlf_vhash))?.unwrap();
|
||||
let exlf_sub_hash = self
|
||||
.update_item_rec(
|
||||
tx,
|
||||
&exlf_k[..],
|
||||
&exlf_khash,
|
||||
&exlf_subkey,
|
||||
Some(exlf_vhash),
|
||||
)?
|
||||
.unwrap();
|
||||
intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash);
|
||||
assert_eq!(int.len(), 1);
|
||||
}
|
||||
|
||||
{
|
||||
let key2 = key.next_key(khash);
|
||||
let subhash = self.update_item_rec(tx, k, khash, &key2, new_vhash)?.unwrap();
|
||||
let subhash = self
|
||||
.update_item_rec(tx, k, khash, &key2, new_vhash)?
|
||||
.unwrap();
|
||||
intermediate_set_child(&mut int, key2.prefix[i], subhash);
|
||||
if exlf_khash.as_slice()[i] == khash.as_slice()[i] {
|
||||
assert_eq!(int.len(), 1);
|
||||
|
|
|
@ -5,4 +5,5 @@ pub mod background;
|
|||
pub mod config;
|
||||
pub mod data;
|
||||
pub mod error;
|
||||
pub mod persister;
|
||||
pub mod time;
|
||||
|
|
72
src/util/persister.rs
Normal file
72
src/util/persister.rs
Normal file
|
@ -0,0 +1,72 @@
|
|||
use std::io::{Read, Write};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::data::*;
|
||||
use crate::error::Error;
|
||||
|
||||
pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
|
||||
path: PathBuf,
|
||||
|
||||
_marker: std::marker::PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T> Persister<T>
|
||||
where
|
||||
T: Serialize + for<'de> Deserialize<'de>,
|
||||
{
|
||||
pub fn new(base_dir: &PathBuf, file_name: &str) -> Self {
|
||||
let mut path = base_dir.clone();
|
||||
path.push(file_name);
|
||||
Self {
|
||||
path,
|
||||
_marker: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load(&self) -> Result<T, Error> {
|
||||
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
|
||||
|
||||
let mut bytes = vec![];
|
||||
file.read_to_end(&mut bytes)?;
|
||||
|
||||
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
pub fn save(&self, t: &T) -> Result<(), Error> {
|
||||
let bytes = rmp_to_vec_all_named(t)?;
|
||||
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.open(&self.path)?;
|
||||
|
||||
file.write_all(&bytes[..])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn load_async(&self) -> Result<T, Error> {
|
||||
let mut file = tokio::fs::File::open(&self.path).await?;
|
||||
|
||||
let mut bytes = vec![];
|
||||
file.read_to_end(&mut bytes).await?;
|
||||
|
||||
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
|
||||
let bytes = rmp_to_vec_all_named(t)?;
|
||||
|
||||
let mut file = tokio::fs::File::create(&self.path).await?;
|
||||
file.write_all(&bytes[..]).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue