Compare commits

...

26 commits

Author SHA1 Message Date
ab67bd88de
Try to fix Drone
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2021-04-05 23:41:50 +02:00
0f192a96b5
small simplify
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is passing
2021-04-05 23:21:25 +02:00
7b85056942
Merge discovery loop with consul
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is passing
2021-04-05 23:04:08 +02:00
7fd1f9a869
cargo fmt
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2021-04-05 20:42:57 +02:00
c5d8dc7d6d
Print stats
Some checks failed
continuous-integration/drone/push Build is failing
2021-04-05 20:42:46 +02:00
fa11cb746a
Cargo fmt
All checks were successful
continuous-integration/drone/push Build is passing
2021-04-05 20:35:26 +02:00
f11bd80d2a
Keep old data
Some checks failed
continuous-integration/drone/push Build is failing
2021-04-05 20:33:24 +02:00
595dc0ed0d
Persist directly and not in background
Some checks failed
continuous-integration/drone/push Build is failing
2021-04-05 20:26:01 +02:00
78eeaab5ed
Install rustfmt
Some checks reported errors
continuous-integration/drone/push Build encountered an error
2021-04-05 20:00:48 +02:00
22fbb3b892
Fix Drone CI signature
Some checks failed
continuous-integration/drone/push Build is failing
2021-04-05 19:58:42 +02:00
0eb5baea1a
Improve bootstraping: do it regularly; persist peer list
Some checks are pending
continuous-integration/drone/push Build is running
2021-04-05 19:55:53 +02:00
7d772737a5 Add signature to Drone CI
All checks were successful
continuous-integration/drone/push Build is passing
2021-04-05 12:10:28 +02:00
4c2d8f5a96 test2
Some checks are pending
continuous-integration/drone/push Build is pending
2021-04-05 12:09:20 +02:00
0325086dac test 2021-04-05 12:08:05 +02:00
700925263f
Drone CI badge on branch main
All checks were successful
continuous-integration/drone/push Build is passing
2021-04-05 11:45:20 +02:00
6c83f66700 fix command for adding node
Some checks reported errors
continuous-integration/drone/push Build was killed
2021-04-02 23:23:14 +02:00
55e4a93bad Section on recovering from failures
Some checks reported errors
continuous-integration/drone/push Build was killed
2021-04-02 19:30:29 +02:00
LUXEY Adrien
9cb9945131 [doc] Added mention that GarageHQ is hosted with Garage
All checks were successful
continuous-integration/drone/push Build is passing
2021-03-22 16:19:58 +01:00
7f9c1d5595 publish website only in correct repo
All checks were successful
continuous-integration/drone/push Build is passing
2021-03-22 12:49:21 +01:00
991cf1b818 fix typos
All checks were successful
continuous-integration/drone/push Build is passing
2021-03-19 17:55:10 +01:00
e26cb2640d Merge remote-tracking branch 'origin/feature/mdbook' 2021-03-19 17:49:57 +01:00
22041e924b Merge pull request 'intro.md: fix some typos, errors & dead links, plus some stylistic stuff' (#51) from adrien/garage:feature/mdbook into feature/mdbook
Reviewed-on: #51
2021-03-19 17:14:41 +01:00
LUXEY Adrien
b119e9d3c4 intro.md: fix some typos, errors & dead links, plus some stylistic stuff
modifié :         doc/book/src/intro.md
2021-03-19 16:51:05 +01:00
3e6534d7a8 Fix garage_util description
Some checks failed
continuous-integration/drone/push Build is failing
2021-03-18 11:46:29 +01:00
2dae4a25d6 Fix some typos
Some checks failed
continuous-integration/drone/push Build is failing
2021-03-18 11:35:50 +01:00
fcd566e89d Fix a table in the doc
All checks were successful
continuous-integration/drone/push Build is passing
2021-03-18 11:26:02 +01:00
17 changed files with 404 additions and 133 deletions

View file

@ -1,3 +1,4 @@
---
kind: pipeline
name: default
@ -35,7 +36,9 @@ steps:
commands:
- apt-get update
- apt-get install --yes libsodium-dev
- rustup component add rustfmt
- pwd
- cargo fmt -- --check
- cargo build
- name: cargo-test
@ -106,6 +109,15 @@ steps:
endpoint: https://garage.deuxfleurs.fr
region: garage
when:
event:
- push
branch:
- main
repo:
- Deuxfleurs/garage
---
kind: signature
hmac: bfe75f47e5eecdd1f6dd8fd3cf1ea359b0215243d06ac767c51a4b4e363e963e
...

View file

@ -2,7 +2,7 @@ BIN=target/release/garage
DOCKER=lxpz/garage_amd64
all:
clear; RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --no-default-features
clear; cargo build
$(BIN):
RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --release --no-default-features

View file

@ -1,4 +1,4 @@
Garage [![Build Status](https://drone.deuxfleurs.fr/api/badges/Deuxfleurs/garage/status.svg)](https://drone.deuxfleurs.fr/Deuxfleurs/garage)
Garage [![Build Status](https://drone.deuxfleurs.fr/api/badges/Deuxfleurs/garage/status.svg?ref=refs/heads/main)](https://drone.deuxfleurs.fr/Deuxfleurs/garage)
===
<p align="center" style="text-align:center;">

View file

@ -14,6 +14,7 @@
- [Host a website](./cookbook/website.md)
- [Integrate as a media backend]()
- [Operate a cluster]()
- [Recovering from failures](./cookbook/recovering.md)
- [Reference Manual](./reference_manual/index.md)
- [Garage CLI]()

View file

@ -0,0 +1,99 @@
# Recovering from failures
Garage is meant to work on old, second-hand hardware.
In particular, this makes it likely that some of your drives will fail, and some manual intervention will be needed.
Fear not! For Garage is fully equipped to handle drive failures, in most common cases.
## A note on availability of Garage
With nodes dispersed in 3 datacenters or more, here are the guarantees Garage provides with the default replication strategy (3 copies of all data, which is the recommended value):
- The cluster remains fully functionnal as long as the machines that fail are in only one datacenter. This includes a whole datacenter going down due to power/Internet outage.
- No data is lost as long as the machines that fail are in at most two datacenters.
Of course this only works if your Garage nodes are correctly configured to be aware of the datacenter in which they are located.
Make sure this is the case using `garage status` to check on the state of your cluster's configuration.
## First option: removing a node
If you don't have spare parts (HDD, SDD) to replace the failed component, and if there are enough remaining nodes in your cluster
(at least 3), you can simply remove the failed node from Garage's configuration.
Note that if you **do** intend to replace the failed parts by new ones, using this method followed by adding back the node is **not recommended** (although it should work),
and you should instead use one of the methods detailed in the next sections.
Removing a node is done with the following command:
```
garage node remove --yes <node_id>
```
(you can get the `node_id` of the failed node by running `garage status`)
This will repartition the data and ensure that 3 copies of everything are present on the nodes that remain available.
## Replacement scenario 1: only data is lost, metadata is fine
The recommended deployment for Garage uses an SSD to store metadata, and an HDD to store blocks of data.
In the case where only a single HDD crashes, the blocks of data are lost but the metadata is still fine.
This is very easy to recover by setting up a new HDD to replace the failed one.
The node does not need to be fully replaced and the configuration doesn't need to change.
We just need to tell Garage to get back all the data blocks and store them on the new HDD.
First, set up a new HDD to store Garage's data directory on the failed node, and restart Garage using
the existing configuration. Then, run:
```
garage repair -a --yes blocks
```
This will re-synchronize blocks of data that are missing to the new HDD, reading them from copies located on other nodes.
You can check on the advancement of this process by doing the following command:
```
garage stats -a
```
Look out for the following output:
```
Block manager stats:
resync queue length: 26541
```
This indicates that one of the Garage node is in the process of retrieving missing data from other nodes.
This number decreases to zero when the node is fully synchronized.
## Replacement scenario 2: metadata (and possibly data) is lost
This scenario covers the case where a full node fails, i.e. both the metadata directory and
the data directory are lost, as well as the case where only the metadata directory is lost.
To replace the lost node, we will start from an empty metadata directory, which means
Garage will generate a new node ID for the replacement node.
We will thus need to remove the previous node ID from Garage's configuration and replace it by the ID of the new node.
If your data directory is stored on a separate drive and is still fine, you can keep it, but it is not necessary to do so.
In all cases, the data will be rebalanced and the replacement node will not store the same pieces of data
as were originally stored on the one that failed. So if you keep the data files, the rebalancing
might be faster but most of the pieces will be deleted anyway from the disk and replaced by other ones.
First, set up a new drive to store the metadata directory for the replacement node (a SSD is recommended),
and for the data directory if necessary. You can then start Garage on the new node.
The restarted node should generate a new node ID, and it should be shown as `NOT CONFIGURED` in `garage status`.
The ID of the lost node should be shown in `garage status` in the section for disconnected/unavailable nodes.
Then, replace the broken node by the new one, using:
```
garage node configure --replace <old_node_id> \
-c <capacity> -d <datacenter> -t <node_tag> <new_node_id>
```
Garage will then start synchronizing all required data on the new node.
This process can be monitored using the `garage stats -a` command.

View file

@ -124,8 +124,8 @@ For our example, we will suppose the following infrastructure:
|----------|---------|------------|------------|
| Paris | Mercury | fc00:1::1 | 1 To |
| Paris | Venus | fc00:1::2 | 2 To |
| London | Earth | fc00:1::2 | 2 To |
| Brussels | Mars | fc00:B::1 | 1.5 To |
| London | Earth | fc00:B::1 | 2 To |
| Brussels | Mars | fc00:F::1 | 1.5 To |
On each machine, we will have a similar setup, especially you must consider the following folders/files:
- `/etc/garage/pki`: Garage certificates, must be generated on your computer and copied on the servers

View file

@ -4,63 +4,68 @@
</a>
</p>
```
This very website is hosted using Garage. In other words: the doc is the PoC!
```
# The Garage Geo-Distributed Data Store
Garage is a lightweight geo-distributed data store.
It comes from the observation that despite numerous object stores
many people have broken data management policies (backup/replication on a single site or none at all).
To promote better data management policies, with focused on the following desirable properties:
To promote better data management policies, we focused on the following **desirable properties**:
- **Self-contained & lightweight**: works everywhere and integrates well in existing environments to target hyperconverged infrastructures
- **Highly resilient**: highly resilient to network failures, network latency, disk failures, sysadmin failures
- **Simple**: simple to understand, simple to operate, simple to debug
- **Internet enabled**: made for multi-sites (eg. datacenter, offices, etc.) interconnected through a regular internet connection.
- **Self-contained & lightweight**: works everywhere and integrates well in existing environments to target [hyperconverged infrastructures](https://en.wikipedia.org/wiki/Hyper-converged_infrastructure).
- **Highly resilient**: highly resilient to network failures, network latency, disk failures, sysadmin failures.
- **Simple**: simple to understand, simple to operate, simple to debug.
- **Internet enabled**: made for multi-sites (eg. datacenters, offices, households, etc.) interconnected through regular Internet connections.
We also noted that the pursuit of some other goals are detrimental to our initial goals.
The following have been identified has non-goals, if it matters to you, you should not use Garage:
The following has been identified as **non-goals** (if these points matter to you, you should not use Garage):
- **Extreme performances**: high performances constrain a lot the design and the infrastructure; we seek performances through minimalism only.
- **Feature extensiveness**: complete implementation of the S3 API or any other API to make garage a drop-in replacement is not targeted as it could lead to decisions impacting our desirable properties.
- **Storage optimizations**: erasure coding or any other coding technique both increase the difficulty of placing data and synchronizing; we limit ourselves to duplication.
- **POSIX/Filesystem compatibility**: we do not aim at being POSIX compatible or to emulate any kind of filesystem. Indeed, in a distributed environment, such syncronizations are translated in network messages that impose severe constraints on the deployment.
- **POSIX/Filesystem compatibility**: we do not aim at being POSIX compatible or to emulate any kind of filesystem. Indeed, in a distributed environment, such synchronizations are translated in network messages that impose severe constraints on the deployment.
## Supported and planned protocols
Garage speaks (or will speak) the following protocols:
- [S3](https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html) - *SUPPORTED* - Enable applications to store large blobs such as pictures, video, images, documents, etc. S3 is versatile enough to also be used to publish a static website.
- [IMAP](https://github.com/go-pluto/pluto) - *PLANNED* - email storage is quite complex to get good oerformances.
To keep performances optimals, most imap servers only support on-disk storage.
- [IMAP](https://github.com/go-pluto/pluto) - *PLANNED* - email storage is quite complex to get good performances.
To keep performances optimal, most IMAP servers only support on-disk storage.
We plan to add logic to Garage to make it a viable solution for email storage.
- *More to come*
## Use Cases
**[Deuxfleurs](https://deuxfleurs.fr) :** Garage is used by Deuxfleurs which is a non-profit hosting organization.
Especially, it is used to host their main website, this documentation and some of its members's blogs. Additionally,
Garage is used as a [backend for Nextcloud](https://docs.nextcloud.com/server/20/admin_manual/configuration_files/primary_storage.html). Deuxfleurs also plans to use Garage as their [Matrix's media backend](https://github.com/matrix-org/synapse-s3-storage-provider) and has the backend of [OCIS](https://github.com/owncloud/ocis).
**[Deuxfleurs](https://deuxfleurs.fr):** Garage is used by Deuxfleurs which is a non-profit hosting organization.
Especially, it is used to host their main website, this documentation and some of its members' blogs.
Additionally, Garage is used as a [backend for Nextcloud](https://docs.nextcloud.com/server/20/admin_manual/configuration_files/primary_storage.html).
Deuxfleurs also plans to use Garage as their [Matrix's media backend](https://github.com/matrix-org/synapse-s3-storage-provider) and as the backend of [OCIS](https://github.com/owncloud/ocis).
*Are you using Garage? [Open a pull request](https://git.deuxfleurs.fr/Deuxfleurs/garage/) to add your organization here!*
## Comparison to existing software
**[Minio](https://min.io/) :** Minio shares our *self-contained & lightweight* goal but selected two of our non-goals: *storage optimizations* through erasure coding and *POSIX/Filesystem compatibility* through strong consistency.
However, by pursuing these two non-goals, minio do not reach our desirable properties.
First, it fails on the *simple* property: due to the erasure coding, minio has severe limitations on how drives can be added or deleted from a cluster.
Second, it fails on the *interned enabled* property: due to its strong consistency, minio is latency sensitive.
Furthermore, minio has no knowledge of "sites" and thus can not distribute data to minimize the failure of a given site.
**[MinIO](https://min.io/):** MinIO shares our *Self-contained & lightweight* goal but selected two of our non-goals: *Storage optimizations* through erasure coding and *POSIX/Filesystem compatibility* through strong consistency.
However, by pursuing these two non-goals, MinIO do not reach our desirable properties.
Firstly, it fails on the *Simple* property: due to the erasure coding, MinIO has severe limitations on how drives can be added or deleted from a cluster.
Secondly, it fails on the *Internet enabled* property: due to its strong consistency, MinIO is latency sensitive.
Furthermore, MinIO has no knowledge of "sites" and thus can not distribute data to minimize the failure of a given site.
**[Openstack Swift](https://docs.openstack.org/swift/latest/) :**
OpenStack Swift at least fails on the *self-contained & lightweight* goal.
Starting it requires around 8Gb of RAM, which is too much especially in an hyperconverged infrastructure.
It seems also to be far from *Simple*.
**[Openstack Swift](https://docs.openstack.org/swift/latest/):**
OpenStack Swift at least fails on the *Self-contained & lightweight* goal.
Starting it requires around 8GB of RAM, which is too much especially in an hyperconverged infrastructure.
We also do not classify Swift as *Simple*.
**[Ceph](https://ceph.io/ceph-storage/object-storage/) :**
**[Ceph](https://ceph.io/ceph-storage/object-storage/):**
This review holds for the whole Ceph stack, including the RADOS paper, Ceph Object Storage module, the RADOS Gateway, etc.
At is core, Ceph has been designed to provide *POSIX/Filesystem compatibility* which requires strong consistency, which in turn
makes Ceph latency sensitive and fails our *Internet enabled* goal.
Due to its industry oriented design, Ceph is also far from being *Simple* to operate and from being *self-contained & lightweight* which makes it hard to integrate it in an hyperconverged infrastructure.
In a certain way, Ceph and Minio are closer togethers than they are from Garage or OpenStack Swift.
At its core, Ceph has been designed to provide *POSIX/Filesystem compatibility* which requires strong consistency, which in turn
makes Ceph latency-sensitive and fails our *Internet enabled* goal.
Due to its industry oriented design, Ceph is also far from being *Simple* to operate and from being *Self-contained & lightweight* which makes it hard to integrate it in an hyperconverged infrastructure.
In a certain way, Ceph and MinIO are closer together than they are from Garage or OpenStack Swift.
*More comparisons are available in our [Related Work](design/related_work.md) chapter.*
@ -71,29 +76,29 @@ We reference here other places on the Internet where you can learn more about Ga
### Rust API (docs.rs)
If you encounter a specific bug in Garage or plan to patch it, you may jump directly to the source code documentation!
If you encounter a specific bug in Garage or plan to patch it, you may jump directly to the source code's documentation!
- [garage\_api](https://docs.rs/garage_api/latest/garage_api/) - contains the S3 standard API endpoint
- [garage\_model](https://docs.rs/garage_model/latest/garage_model/) - contains Garage's model built on the table abstraction
- [garage\_rpc](https://docs.rs/garage_rpc/latest/garage_rpc/) - contains Garage's federation protocol
- [garage\_table](https://docs.rs/garage_table/latest/garage_table/) - contains core Garage's CRDT datatypes
- [garage\_util](https://docs.rs/garage_util/latest/garage_util/) - contains garage entrypoints (daemon, cli)
- [garage\_util](https://docs.rs/garage_util/latest/garage_util/) - contains garage helpers
- [garage\_web](https://docs.rs/garage_web/latest/garage_web/) - contains the S3 website endpoint
### Talks
We love to talk and hear about Garage, that's why we keep a log here:
- [(fr, 2020-12-02) Garage : jouer dans la cour des grands quand on est un hébergeur associatif](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/master/doc/20201202_talk/talk.pdf)
- [(fr, 2020-12-02) Garage : jouer dans la cour des grands quand on est un hébergeur associatif](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/main/doc/20201202_talk/talk.pdf)
*Did you write or talk about Garage? [Open a pull request](https://git.deuxfleurs.fr/Deuxfleurs/garage/) to add a link here!*
## Community
If you want to discuss with us, you can join our Matrix channel at [#garage:deuxfleurs.fr](https://matrix.to/#/#garage:deuxfleurs.fr).
Our code and our issue tracker, which is the place where you should report bugs, are managed on [Deuxfleurs' Gitea](https://git.deuxfleurs.fr/Deuxfleurs/garage).
Our code repository and issue tracker, which is the place where you should report bugs, is managed on [Deuxfleurs' Gitea](https://git.deuxfleurs.fr/Deuxfleurs/garage).
## License
Garage, all the source code, is released under the [AGPL v3 License](https://www.gnu.org/licenses/agpl-3.0.en.html).
Garage's source code, is released under the [AGPL v3 License](https://www.gnu.org/licenses/agpl-3.0.en.html).
Please note that if you patch Garage and then use it to provide any service over a network, you must share your code!

View file

@ -246,15 +246,13 @@ impl AdminRpcHandler {
)))
}
KeyOperation::Import(query) => {
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id)
.await?;
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
if prev_key.is_some() {
return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
}
let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
self.garage.key_table.insert(&imported_key).await?;
Ok(AdminRPC::KeyInfo(imported_key))
}
}
}

View file

@ -5,8 +5,8 @@ use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use garage_util::error::Error;
use garage_util::data::UUID;
use garage_util::error::Error;
use garage_util::time::*;
use garage_rpc::membership::*;
@ -384,7 +384,10 @@ pub async fn cmd_status(
Ok(())
}
pub fn find_matching_node(cand: impl std::iter::Iterator<Item=UUID>, pattern: &str) -> Result<UUID, Error> {
pub fn find_matching_node(
cand: impl std::iter::Iterator<Item = UUID>,
pattern: &str,
) -> Result<UUID, Error> {
let mut candidates = vec![];
for c in cand {
if hex::encode(&c).starts_with(&pattern) {
@ -428,7 +431,10 @@ pub async fn cmd_configure(
for replaced in args.replace.iter() {
let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
if config.members.remove(&replaced_node).is_none() {
return Err(Error::Message(format!("Cannot replace node {:?} as it is not in current configuration", replaced_node)));
return Err(Error::Message(format!(
"Cannot replace node {:?} as it is not in current configuration",
replaced_node
)));
}
}

View file

@ -52,7 +52,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing Garage main data store...");
let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
let bootstrap = garage.system.clone().bootstrap(
&config.bootstrap_peers[..],
config.bootstrap_peers,
config.consul_host,
config.consul_service_name,
);

View file

@ -147,7 +147,9 @@ impl Entry<String, String> for Object {
&self.key
}
fn is_tombstone(&self) -> bool {
self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
self.versions.len() == 1
&& self.versions[0].state
== ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
}
}

View file

@ -11,13 +11,13 @@ use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tokio::sync::watch;
use tokio::sync::Mutex;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::persister::Persister;
use garage_util::time::*;
use crate::consul::get_consul_nodes;
@ -26,7 +26,7 @@ use crate::rpc_client::*;
use crate::rpc_server::*;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
@ -69,7 +69,8 @@ pub struct AdvertisedNode {
pub struct System {
pub id: UUID,
metadata_dir: PathBuf,
persist_config: Persister<NetworkConfig>,
persist_status: Persister<Vec<AdvertisedNode>>,
rpc_local_port: u16,
state_info: StateInfo,
@ -80,11 +81,16 @@ pub struct System {
pub(crate) status: watch::Receiver<Arc<Status>>,
pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,
update_lock: Mutex<Updaters>,
pub background: Arc<BackgroundRunner>,
}
struct Updaters {
update_status: watch::Sender<Arc<Status>>,
update_ring: watch::Sender<Arc<Ring>>,
}
#[derive(Debug, Clone)]
pub struct Status {
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
@ -144,6 +150,25 @@ impl Status {
debug!("END --");
self.hash = blake2sum(nodes_txt.as_bytes());
}
fn to_serializable_membership(&self, system: &System) -> Vec<AdvertisedNode> {
let mut mem = vec![];
for (node, status) in self.nodes.iter() {
let state_info = if *node == system.id {
system.state_info.clone()
} else {
status.state_info.clone()
};
mem.push(AdvertisedNode {
id: *node,
addr: status.addr,
is_up: status.is_up(),
last_seen: status.last_seen,
state_info,
});
}
mem
}
}
fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
@ -169,23 +194,6 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
}
}
fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
let mut path = metadata_dir.clone();
path.push("network_config");
let mut file = std::fs::OpenOptions::new()
.read(true)
.open(path.as_path())?;
let mut net_config_bytes = vec![];
file.read_to_end(&mut net_config_bytes)?;
let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])
.expect("Unable to parse network configuration file (has version format changed?).");
Ok(net_config)
}
impl System {
pub fn new(
metadata_dir: PathBuf,
@ -196,7 +204,10 @@ impl System {
let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
info!("Node ID: {}", hex::encode(&id));
let net_config = match read_network_config(&metadata_dir) {
let persist_config = Persister::new(&metadata_dir, "network_config");
let persist_status = Persister::new(&metadata_dir, "peer_info");
let net_config = match persist_config.load() {
Ok(x) => x,
Err(e) => {
info!(
@ -206,6 +217,7 @@ impl System {
NetworkConfig::new()
}
};
let mut status = Status {
nodes: HashMap::new(),
hash: Hash::default(),
@ -231,14 +243,18 @@ impl System {
let sys = Arc::new(System {
id,
metadata_dir,
persist_config,
persist_status,
rpc_local_port: rpc_server.bind_addr.port(),
state_info,
rpc_http_client,
rpc_client,
status,
ring,
update_lock: Mutex::new((update_status, update_ring)),
update_lock: Mutex::new(Updaters {
update_status,
update_ring,
}),
background,
});
sys.clone().register_handler(rpc_server, rpc_path);
@ -272,14 +288,11 @@ impl System {
}
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let mut path = self.metadata_dir.clone();
path.push("network_config");
let ring = self.ring.borrow().clone();
let data = rmp_to_vec_all_named(&ring.config)?;
let mut f = tokio::fs::File::create(path.as_path()).await?;
f.write_all(&data[..]).await?;
self.persist_config
.save_async(&ring.config)
.await
.expect("Cannot save current cluster configuration");
Ok(())
}
@ -308,26 +321,21 @@ impl System {
pub async fn bootstrap(
self: Arc<Self>,
peers: &[SocketAddr],
peers: Vec<SocketAddr>,
consul_host: Option<String>,
consul_service_name: Option<String>,
) {
let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>();
self.clone().ping_nodes(bootstrap_peers).await;
let self2 = self.clone();
self.background
.spawn_worker(format!("discovery loop"), |stop_signal| {
self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal)
});
let self2 = self.clone();
self.background
.spawn_worker(format!("ping loop"), |stop_signal| {
self2.ping_loop(stop_signal)
});
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
self.background
.spawn_worker(format!("Consul loop"), |stop_signal| {
self2.consul_loop(stop_signal, consul_host, consul_service_name)
});
}
}
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
@ -394,9 +402,7 @@ impl System {
if has_changes {
status.recalculate_hash();
}
if let Err(e) = update_locked.0.send(Arc::new(status)) {
error!("In ping_nodes: could not save status update ({})", e);
}
self.update_status(&update_locked, status).await;
drop(update_locked);
if to_advertise.len() > 0 {
@ -420,7 +426,7 @@ impl System {
let status_hash = status.hash;
let config_version = self.ring.borrow().config.version;
update_locked.0.send(Arc::new(status))?;
self.update_status(&update_locked, status).await;
drop(update_locked);
if is_new || status_hash != ping.status_hash {
@ -436,23 +442,9 @@ impl System {
}
fn handle_pull_status(&self) -> Result<Message, Error> {
let status = self.status.borrow().clone();
let mut mem = vec![];
for (node, status) in status.nodes.iter() {
let state_info = if *node == self.id {
self.state_info.clone()
} else {
status.state_info.clone()
};
mem.push(AdvertisedNode {
id: *node,
addr: status.addr,
is_up: status.is_up(),
last_seen: status.last_seen,
state_info,
});
}
Ok(Message::AdvertiseNodesUp(mem))
Ok(Message::AdvertiseNodesUp(
self.status.borrow().to_serializable_membership(self),
))
}
fn handle_pull_config(&self) -> Result<Message, Error> {
@ -502,7 +494,7 @@ impl System {
if has_changed {
status.recalculate_hash();
}
update_lock.0.send(Arc::new(status))?;
self.update_status(&update_lock, status).await;
drop(update_lock);
if to_ping.len() > 0 {
@ -522,7 +514,7 @@ impl System {
if adv.version > ring.config.version {
let ring = Ring::new(adv.clone());
update_lock.1.send(Arc::new(ring))?;
update_lock.update_ring.send(Arc::new(ring))?;
drop(update_lock);
self.background.spawn_cancellable(
@ -537,7 +529,7 @@ impl System {
}
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
while !*stop_signal.borrow() {
let restart_at = tokio::time::sleep(PING_INTERVAL);
let status = self.status.borrow().clone();
@ -552,34 +544,64 @@ impl System {
select! {
_ = restart_at.fuse() => (),
_ = stop_signal.changed().fuse() => {
if *stop_signal.borrow() {
return;
}
}
_ = stop_signal.changed().fuse() => (),
}
}
}
async fn consul_loop(
async fn discovery_loop(
self: Arc<Self>,
bootstrap_peers: Vec<SocketAddr>,
consul_host: Option<String>,
consul_service_name: Option<String>,
mut stop_signal: watch::Receiver<bool>,
consul_host: String,
consul_service_name: String,
) {
while !*stop_signal.borrow() {
let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
let consul_config = match (consul_host, consul_service_name) {
(Some(ch), Some(csn)) => Some((ch, csn)),
_ => None,
};
match get_consul_nodes(&consul_host, &consul_service_name).await {
Ok(mut node_list) => {
let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
self.clone().ping_nodes(ping_addrs).await;
while !*stop_signal.borrow() {
let not_configured = self.ring.borrow().config.members.len() == 0;
let no_peers = self.status.borrow().nodes.len() < 3;
let bad_peers = self
.status
.borrow()
.nodes
.iter()
.filter(|(_, v)| v.is_up())
.count() != self.ring.borrow().config.members.len();
if not_configured || no_peers || bad_peers {
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
let mut ping_list = bootstrap_peers
.iter()
.map(|ip| (*ip, None))
.collect::<Vec<_>>();
match self.persist_status.load_async().await {
Ok(peers) => {
ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
}
_ => (),
}
Err(e) => {
warn!("Could not retrieve node list from Consul: {}", e);
if let Some((consul_host, consul_service_name)) = &consul_config {
match get_consul_nodes(consul_host, consul_service_name).await {
Ok(node_list) => {
ping_list.extend(node_list.iter().map(|a| (*a, None)));
}
Err(e) => {
warn!("Could not retrieve node list from Consul: {}", e);
}
}
}
self.clone().ping_nodes(ping_list).await;
}
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
_ = restart_at.fuse() => (),
_ = stop_signal.changed().fuse() => (),
@ -611,4 +633,35 @@ impl System {
let _: Result<_, _> = self.handle_advertise_config(&config).await;
}
}
async fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) {
if status.hash != self.status.borrow().hash {
let mut list = status.to_serializable_membership(&self);
// Combine with old peer list to make sure no peer is lost
match self.persist_status.load_async().await {
Ok(old_list) => {
for pp in old_list {
if !list.iter().any(|np| pp.id == np.id) {
list.push(pp);
}
}
}
_ => (),
}
if list.len() > 0 {
info!("Persisting new peer list ({} peers)", list.len());
self.persist_status
.save_async(&list)
.await
.expect("Unable to persist peer list");
}
}
updaters
.update_status
.send(Arc::new(status))
.expect("Could not update internal membership status");
}
}

View file

@ -35,7 +35,13 @@ where
F: TableSchema,
R: TableReplication,
{
pub fn new(system: Arc<System>, name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
pub fn new(
system: Arc<System>,
name: String,
instance: F,
replication: R,
db: &sled::Db,
) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", name))
.expect("Unable to open DB tree");

View file

@ -157,7 +157,12 @@ where
if errs.is_empty() {
Ok(true)
} else {
Err(Error::Message(errs.into_iter().map(|x| format!("{}", x)).collect::<Vec<_>>().join(", ")))
Err(Error::Message(
errs.into_iter()
.map(|x| format!("{}", x))
.collect::<Vec<_>>()
.join(", "),
))
}
}

View file

@ -200,12 +200,13 @@ where
let subnode = self.read_node_txn(tx, &key_sub)?;
match subnode {
MerkleNode::Empty => {
warn!("({}) Single subnode in tree is empty Merkle node", self.data.name);
warn!(
"({}) Single subnode in tree is empty Merkle node",
self.data.name
);
Some(MerkleNode::Empty)
}
MerkleNode::Intermediate(_) => {
Some(MerkleNode::Intermediate(children))
}
MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
x @ MerkleNode::Leaf(_, _) => {
tx.remove(key_sub.encode())?;
Some(x)
@ -239,14 +240,24 @@ where
{
let exlf_subkey = key.next_key(&exlf_khash);
let exlf_sub_hash = self.update_item_rec(tx, &exlf_k[..], &exlf_khash, &exlf_subkey, Some(exlf_vhash))?.unwrap();
let exlf_sub_hash = self
.update_item_rec(
tx,
&exlf_k[..],
&exlf_khash,
&exlf_subkey,
Some(exlf_vhash),
)?
.unwrap();
intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash);
assert_eq!(int.len(), 1);
}
{
let key2 = key.next_key(khash);
let subhash = self.update_item_rec(tx, k, khash, &key2, new_vhash)?.unwrap();
let subhash = self
.update_item_rec(tx, k, khash, &key2, new_vhash)?
.unwrap();
intermediate_set_child(&mut int, key2.prefix[i], subhash);
if exlf_khash.as_slice()[i] == khash.as_slice()[i] {
assert_eq!(int.len(), 1);

View file

@ -5,4 +5,5 @@ pub mod background;
pub mod config;
pub mod data;
pub mod error;
pub mod persister;
pub mod time;

72
src/util/persister.rs Normal file
View file

@ -0,0 +1,72 @@
use std::io::{Read, Write};
use std::path::PathBuf;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use serde::{Deserialize, Serialize};
use crate::data::*;
use crate::error::Error;
pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
path: PathBuf,
_marker: std::marker::PhantomData<T>,
}
impl<T> Persister<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
pub fn new(base_dir: &PathBuf, file_name: &str) -> Self {
let mut path = base_dir.clone();
path.push(file_name);
Self {
path,
_marker: Default::default(),
}
}
pub fn load(&self) -> Result<T, Error> {
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
Ok(value)
}
pub fn save(&self, t: &T) -> Result<(), Error> {
let bytes = rmp_to_vec_all_named(t)?;
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&self.path)?;
file.write_all(&bytes[..])?;
Ok(())
}
pub async fn load_async(&self) -> Result<T, Error> {
let mut file = tokio::fs::File::open(&self.path).await?;
let mut bytes = vec![];
file.read_to_end(&mut bytes).await?;
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
Ok(value)
}
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
let bytes = rmp_to_vec_all_named(t)?;
let mut file = tokio::fs::File::create(&self.path).await?;
file.write_all(&bytes[..]).await?;
Ok(())
}
}