WIP: Per-Bucket Consistency #748

Closed
yuka wants to merge 4 commits from consistency-mode into next-0.10
58 changed files with 916 additions and 450 deletions

View file

@ -116,7 +116,7 @@ metadata_dir = "/var/lib/garage/meta"
data_dir = "/var/lib/garage/data"
db_engine = "lmdb"
replication_mode = "3"
replication_factor = 3
compression_level = 2

View file

@ -12,7 +12,7 @@ An introduction to building cluster layouts can be found in the [production depl
In Garage, all of the data that can be stored in a given cluster is divided
into slices which we call *partitions*. Each partition is stored by
one or several nodes in the cluster
(see [`replication_mode`](@/documentation/reference-manual/configuration.md#replication_mode)).
(see [`replication_factor`](@/documentation/reference-manual/configuration.md#replication_factor)).
The layout determines the correspondence between these partitions,
which exist on a logical level, and actual storage nodes.

View file

@ -59,7 +59,7 @@ metadata_dir = "/tmp/meta"
data_dir = "/tmp/data"
db_engine = "lmdb"
replication_mode = "none"
replication_factor = 1
rpc_bind_addr = "[::]:3901"
rpc_public_addr = "127.0.0.1:3901"

View file

@ -8,7 +8,8 @@ weight = 20
Here is an example `garage.toml` configuration file that illustrates all of the possible options:
```toml
replication_mode = "3"
replication_factor = 3
consistency_mode = "consistent"
metadata_dir = "/var/lib/garage/meta"
data_dir = "/var/lib/garage/data"
@ -90,7 +91,8 @@ Top-level configuration options:
[`lmdb_map_size`](#lmdb_map_size),
[`metadata_dir`](#metadata_dir),
[`metadata_fsync`](#metadata_fsync),
[`replication_mode`](#replication_mode),
[`replication_factor`](#replication_factor),
[`consistency_mode`](#consistency_mode),
[`rpc_bind_addr`](#rpc_bind_addr),
[`rpc_bind_outgoing`](#rpc_bind_outgoing),
[`rpc_public_addr`](#rpc_public_addr),
@ -133,11 +135,12 @@ The `[admin]` section:
### Top-level configuration options
#### `replication_mode` {#replication_mode}
#### `replication_factor` {#replication_factor}
Garage supports the following replication modes:
The replication factor can be any positive integer smaller or equal the node count in your cluster.
The chosen replication factor has a big impact on the cluster's failure tolerancy and performance characteristics.
- `none` or `1`: data stored on Garage is stored on a single node. There is no
- `1`: data stored on Garage is stored on a single node. There is no
redundancy, and data will be unavailable as soon as one node fails or its
network is disconnected. Do not use this for anything else than test
deployments.
@ -148,17 +151,6 @@ Garage supports the following replication modes:
before losing data. Data remains available in read-only mode when one node is
down, but write operations will fail.
- `2-dangerous`: a variant of mode `2`, where written objects are written to
the second replica asynchronously. This means that Garage will return `200
OK` to a PutObject request before the second copy is fully written (or even
before it even starts being written). This means that data can more easily
be lost if the node crashes before a second copy can be completed. This
also means that written objects might not be visible immediately in read
operations. In other words, this mode severely breaks the consistency and
durability guarantees of standard Garage cluster operation. Benefits of
this mode: you can still write to your cluster when one node is
unavailable.
- `3`: data stored on Garage will be stored on three different nodes, if
possible each in a different zones. Garage tolerates two node failure, or
several node failures but in no more than two zones (in a deployment with at
@ -166,55 +158,84 @@ Garage supports the following replication modes:
or node failures are only in a single zone, reading and writing data to
Garage can continue normally.
- `3-degraded`: a variant of replication mode `3`, that lowers the read
quorum to `1`, to allow you to read data from your cluster when several
nodes (or nodes in several zones) are unavailable. In this mode, Garage
does not provide read-after-write consistency anymore. The write quorum is
still 2, ensuring that data successfully written to Garage is stored on at
least two nodes.
- `3-dangerous`: a variant of replication mode `3` that lowers both the read
and write quorums to `1`, to allow you to both read and write to your
cluster when several nodes (or nodes in several zones) are unavailable. It
is the least consistent mode of operation proposed by Garage, and also one
that should probably never be used.
- `5`, `7`, ...: When setting the replication factor above 3, it is most useful to
choose an uneven value, since for every two copies added, one more node can fail
before losing the ability to write and read to the cluster.
Note that in modes `2` and `3`,
if at least the same number of zones are available, an arbitrary number of failures in
any given zone is tolerated as copies of data will be spread over several zones.
**Make sure `replication_mode` is the same in the configuration files of all nodes.
**Make sure `replication_factor` is the same in the configuration files of all nodes.
Never run a Garage cluster where that is not the case.**
It is technically possible to change the replication factor although it's a
dangerous operation that is not officially supported. This requires you to
delete the existing cluster layout and create a new layout from scratch,
meaning that a full rebalancing of your cluster's data will be needed. To do
it, shut down your cluster entirely, delete the `custer_layout` files in the
meta directories of all your nodes, update all your configuration files with
the new `replication_factor` parameter, restart your cluster, and then create a
new layout with all the nodes you want to keep. Rebalancing data will take
some time, and data might temporarily appear unavailable to your users.
It is recommended to shut down public access to the cluster while rebalancing
is in progress. In theory, no data should be lost as rebalancing is a
routine operation for Garage, although we cannot guarantee you that everything
will go right in such an extreme scenario.
#### `consistency_mode` {#consistency_mode}
The consistency mode setting determines the read and write behaviour of your cluster.
- `consistent`: The default setting. This is what the paragraph above describes.
The read and write quorum will be determined so that read-after-write consistency
is guaranteed.
- `degraded`: Lowers the read
quorum to `1`, to allow you to read data from your cluster when several
nodes (or nodes in several zones) are unavailable. In this mode, Garage
does not provide read-after-write consistency anymore.
The write quorum stays the same as in the `consistent` mode, ensuring that
data successfully written to Garage is stored on multiple nodes (depending
the replication factor).
- `dangerous`: This mode lowers both the read
and write quorums to `1`, to allow you to both read and write to your
cluster when several nodes (or nodes in several zones) are unavailable. It
is the least consistent mode of operation proposed by Garage, and also one
that should probably never be used.
Changing the `consistency_mode` between modes while leaving the `replication_factor` untouched
(e.g. setting your node's `consistency_mode` to `degraded` when it was previously unset, or from
`dangerous` to `consistent`), can be done easily by just changing the `consistency_mode`
parameter in your config files and restarting all your Garage nodes.
The consistency mode can be used together with various replication factors, to achieve
a wide range of read and write characteristics. Some examples:
- Replication factor `2`, consistency mode `degraded`: While this mode
technically exists, its properties are the same as with consistency mode `consistent`,
since the read quorum with replication factor `2`, consistency mode `consistent` is already 1.
- Replication factor `2`, consistency mode `dangerous`: written objects are written to
the second replica asynchronously. This means that Garage will return `200
OK` to a PutObject request before the second copy is fully written (or even
before it even starts being written). This means that data can more easily
be lost if the node crashes before a second copy can be completed. This
also means that written objects might not be visible immediately in read
operations. In other words, this configuration severely breaks the consistency and
durability guarantees of standard Garage cluster operation. Benefits of
this configuration: you can still write to your cluster when one node is
unavailable.
The quorums associated with each replication mode are described below:
| `replication_mode` | Number of replicas | Write quorum | Read quorum | Read-after-write consistency? |
| ------------------ | ------------------ | ------------ | ----------- | ----------------------------- |
| `none` or `1` | 1 | 1 | 1 | yes |
| `2` | 2 | 2 | 1 | yes |
| `2-dangerous` | 2 | 1 | 1 | NO |
| `3` | 3 | 2 | 2 | yes |
| `3-degraded` | 3 | 2 | 1 | NO |
| `3-dangerous` | 3 | 1 | 1 | NO |
Changing the `replication_mode` between modes with the same number of replicas
(e.g. from `3` to `3-degraded`, or from `2-dangerous` to `2`), can be done easily by
just changing the `replication_mode` parameter in your config files and restarting all your
Garage nodes.
It is also technically possible to change the replication mode to a mode with a
different numbers of replicas, although it's a dangerous operation that is not
officially supported. This requires you to delete the existing cluster layout
and create a new layout from scratch, meaning that a full rebalancing of your
cluster's data will be needed. To do it, shut down your cluster entirely,
delete the `custer_layout` files in the meta directories of all your nodes,
update all your configuration files with the new `replication_mode` parameter,
restart your cluster, and then create a new layout with all the nodes you want
to keep. Rebalancing data will take some time, and data might temporarily
appear unavailable to your users. It is recommended to shut down public access
to the cluster while rebalancing is in progress. In theory, no data should be
lost as rebalancing is a routine operation for Garage, although we cannot
guarantee you that everything will go right in such an extreme scenario.
| `consistency_mode` | `replication_factor` | Write quorum | Read quorum | Read-after-write consistency? |
| ------------------ | -------------------- | ------------ | ----------- | ----------------------------- |
| `consistent` | 1 | 1 | 1 | yes |
| `consistent` | 2 | 2 | 1 | yes |
| `dangerous` | 2 | 1 | 1 | NO |
| `consistent` | 3 | 2 | 2 | yes |
| `degraded` | 3 | 2 | 1 | NO |
| `dangerous` | 3 | 1 | 1 | NO |
#### `metadata_dir` {#metadata_dir}

View file

@ -39,10 +39,10 @@ Read about cluster layout management [here](@/documentation/operations/layout.md
### Several replication modes
Garage supports a variety of replication modes, with 1 copy, 2 copies or 3 copies of your data,
Garage supports a variety of replication modes, with configurable replica count,
and with various levels of consistency, in order to adapt to a variety of usage scenarios.
Read our reference page on [supported replication modes](@/documentation/reference-manual/configuration.md#replication_mode)
to select the replication mode best suited to your use case (hint: in most cases, `replication_mode = "3"` is what you want).
Read our reference page on [supported replication modes](@/documentation/reference-manual/configuration.md#replication_factor)
to select the replication mode best suited to your use case (hint: in most cases, `replication_factor = 3` is what you want).
### Compression and deduplication

View file

@ -38,7 +38,7 @@ data_dir = "/tmp/garage-data-$count"
rpc_bind_addr = "0.0.0.0:$((3900+$count))" # the port other Garage nodes will use to talk to this node
rpc_public_addr = "127.0.0.1:$((3900+$count))"
bootstrap_peers = []
replication_mode = "3"
replication_factor = 3
rpc_secret = "$NETWORK_SECRET"
[s3_api]

View file

@ -18,9 +18,9 @@ garage:
sledCacheCapacity: "134217728"
sledFlushEveryMs: "2000"
# Default to 3 replicas, see the replication_mode section at
# https://garagehq.deuxfleurs.fr/documentation/reference-manual/configuration/#replication-mode
replicationMode: "3"
# Default to 3 replicas, see the replication_factor section at
# https://garagehq.deuxfleurs.fr/documentation/reference-manual/configuration/#replication-factor
replicationFactor: 3
# zstd compression level of stored blocks
# https://garagehq.deuxfleurs.fr/documentation/reference-manual/configuration/#compression-level
@ -55,7 +55,7 @@ garage:
sled_flush_every_ms = {{ .Values.garage.sledFlushEveryMs }}
{{- end }}
replication_mode = "{{ .Values.garage.replicationMode }}"
replication_factor = {{ .Values.garage.replicationFactor }}
compression_level = {{ .Values.garage.compressionLevel }}

View file

@ -43,7 +43,7 @@
"rpc_bind_addr = \"0.0.0.0:3901\"\n"
"rpc_public_addr = \"" node ":3901\"\n"
"db_engine = \"lmdb\"\n"
"replication_mode = \"2\"\n"
"replication_factor = 2\n"
"data_dir = \"" data-dir "\"\n"
"metadata_dir = \"" meta-dir "\"\n"
"[s3_api]\n"

View file

@ -8,7 +8,7 @@ data:
metadata_dir = "/tmp/meta"
data_dir = "/tmp/data"
replication_mode = "3"
replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret = "1799bccfd7411eddcf9ebd316bc1f5287ad12a68094e1c6ac6abde7e6feae1ec"

View file

@ -10,6 +10,8 @@ use garage_util::time::*;
use garage_table::*;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
@ -27,6 +29,7 @@ pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<ResBod
let buckets = garage
.bucket_table
.get_range(
(),
&EmptyKey,
None,
Some(DeletedFilter::NotDeleted),
@ -117,11 +120,13 @@ async fn bucket_info_results(
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket_state = bucket.state.as_option().unwrap();
let c = *bucket_state.consistency_mode.get();
let counters = garage
.object_counter_table
.table
.get(&bucket_id, &EmptyKey)
.get(c, &bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&garage.system.cluster_layout()))
.unwrap_or_default();
@ -129,7 +134,7 @@ async fn bucket_info_results(
let mpu_counters = garage
.mpu_counter_table
.table
.get(&bucket_id, &EmptyKey)
.get(c, &bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&garage.system.cluster_layout()))
.unwrap_or_default();
@ -145,7 +150,7 @@ async fn bucket_info_results(
{
if let Some(key) = garage
.key_table
.get(&EmptyKey, k)
.get((), &EmptyKey, k)
.await?
.filter(|k| !k.is_deleted())
{
@ -165,7 +170,7 @@ async fn bucket_info_results(
if relevant_keys.contains_key(k) {
continue;
}
if let Some(key) = garage.key_table.get(&EmptyKey, k).await? {
if let Some(key) = garage.key_table.get((), &EmptyKey, k).await? {
if !key.state.is_deleted() {
relevant_keys.insert(k.clone(), key);
}
@ -185,6 +190,7 @@ async fn bucket_info_results(
.filter(|(_, _, a)| *a)
.map(|(n, _, _)| n.to_string())
.collect::<Vec<_>>(),
consistency_mode: *state.consistency_mode.get(),
website_access: state.website_config.get().is_some(),
website_config: state.website_config.get().clone().map(|wsc| {
GetBucketInfoWebsiteResult {
@ -238,6 +244,7 @@ async fn bucket_info_results(
struct GetBucketInfoResult {
id: String,
global_aliases: Vec<String>,
consistency_mode: ConsistencyMode,
website_access: bool,
#[serde(default)]
website_config: Option<GetBucketInfoWebsiteResult>,
@ -283,7 +290,7 @@ pub async fn handle_create_bucket(
)));
}
if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? {
if let Some(alias) = garage.bucket_alias_table.get((), &EmptyKey, ga).await? {
if alias.state.get().is_some() {
return Err(CommonError::BucketAlreadyExists.into());
}
@ -306,7 +313,7 @@ pub async fn handle_create_bucket(
}
let bucket = Bucket::new();
garage.bucket_table.insert(&bucket).await?;
garage.bucket_table.insert((), &bucket).await?;
if let Some(ga) = &req.global_alias {
helper.set_global_bucket_alias(bucket.id, ga).await?;
@ -394,7 +401,7 @@ pub async fn handle_delete_bucket(
// 4. delete bucket
bucket.state = Deletable::delete();
garage.bucket_table.insert(&bucket).await?;
garage.bucket_table.insert((), &bucket).await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@ -416,6 +423,10 @@ pub async fn handle_update_bucket(
let state = bucket.state.as_option_mut().unwrap();
if let Some(cm) = req.consistency_mode {
state.consistency_mode.update(cm);
}
if let Some(wa) = req.website_access {
if wa.enabled {
state.website_config.update(Some(WebsiteConfig {
@ -441,7 +452,7 @@ pub async fn handle_update_bucket(
});
}
garage.bucket_table.insert(&bucket).await?;
garage.bucket_table.insert((), &bucket).await?;
bucket_info_results(garage, bucket_id).await
}
@ -449,6 +460,7 @@ pub async fn handle_update_bucket(
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct UpdateBucketRequest {
consistency_mode: Option<ConsistencyMode>,
website_access: Option<UpdateBucketWebsiteAccess>,
quotas: Option<ApiBucketQuotas>,
}

View file

@ -17,6 +17,7 @@ pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<ResBody>,
let res = garage
.key_table
.get_range(
(),
&EmptyKey,
None,
Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
@ -68,7 +69,7 @@ pub async fn handle_create_key(
let req = parse_json_body::<CreateKeyRequest, _, Error>(req).await?;
let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key"));
garage.key_table.insert(&key).await?;
garage.key_table.insert((), &key).await?;
key_info_results(garage, key, true).await
}
@ -85,7 +86,10 @@ pub async fn handle_import_key(
) -> Result<Response<ResBody>, Error> {
let req = parse_json_body::<ImportKeyRequest, _, Error>(req).await?;
let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?;
let prev_key = garage
.key_table
.get((), &EmptyKey, &req.access_key_id)
.await?;
if prev_key.is_some() {
return Err(Error::KeyAlreadyExists(req.access_key_id.to_string()));
}
@ -96,7 +100,7 @@ pub async fn handle_import_key(
req.name.as_deref().unwrap_or("Imported key"),
)
.ok_or_bad_request("Invalid key format")?;
garage.key_table.insert(&imported_key).await?;
garage.key_table.insert((), &imported_key).await?;
key_info_results(garage, imported_key, false).await
}
@ -134,7 +138,7 @@ pub async fn handle_update_key(
}
}
garage.key_table.insert(&key).await?;
garage.key_table.insert((), &key).await?;
key_info_results(garage, key, false).await
}
@ -184,7 +188,7 @@ async fn key_info_results(
.filter_map(|(_, _, v)| v.as_ref()),
) {
if !relevant_buckets.contains_key(id) {
if let Some(b) = garage.bucket_table.get(&EmptyKey, id).await? {
if let Some(b) = garage.bucket_table.get((), &EmptyKey, id).await? {
if b.state.as_option().is_some() {
relevant_buckets.insert(*id, b);
}

View file

@ -17,7 +17,10 @@ pub async fn handle_insert_batch(
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
@ -35,7 +38,11 @@ pub async fn handle_insert_batch(
items2.push((it.pk, it.sk, ct, v));
}
garage.k2v.rpc.insert_batch(*bucket_id, items2).await?;
garage
.k2v
.rpc
.insert_batch(*bucket_params.consistency_mode.get(), *bucket_id, items2)
.await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@ -68,8 +75,12 @@ async fn handle_read_batch_query(
query: ReadBatchQuery,
) -> Result<ReadBatchResponse, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = ctx;
let c = *bucket_params.consistency_mode.get();
let partition = K2VItemPartition {
bucket_id: *bucket_id,
@ -92,7 +103,7 @@ async fn handle_read_batch_query(
let item = garage
.k2v
.item_table
.get(&partition, sk)
.get(c, &partition, sk)
.await?
.filter(|e| K2VItemTable::matches_filter(e, &filter));
match item {
@ -109,6 +120,7 @@ async fn handle_read_batch_query(
query.limit,
Some(filter),
EnumerationOrder::from_reverse(query.reverse),
c,
)
.await?;
@ -162,8 +174,12 @@ async fn handle_delete_batch_query(
query: DeleteBatchQuery,
) -> Result<DeleteBatchResponse, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let c = *bucket_params.consistency_mode.get();
let partition = K2VItemPartition {
bucket_id: *bucket_id,
@ -186,7 +202,7 @@ async fn handle_delete_batch_query(
let item = garage
.k2v
.item_table
.get(&partition, sk)
.get(c, &partition, sk)
.await?
.filter(|e| K2VItemTable::matches_filter(e, &filter));
match item {
@ -196,6 +212,7 @@ async fn handle_delete_batch_query(
.k2v
.rpc
.insert(
c,
*bucket_id,
i.partition.partition_key,
i.sort_key,
@ -217,6 +234,7 @@ async fn handle_delete_batch_query(
None,
Some(filter),
EnumerationOrder::Forward,
c,
)
.await?;
assert!(!more);
@ -236,7 +254,7 @@ async fn handle_delete_batch_query(
.collect::<Vec<_>>();
let n = items.len();
garage.k2v.rpc.insert_batch(*bucket_id, items).await?;
garage.k2v.rpc.insert_batch(c, *bucket_id, items).await?;
n
};
@ -257,7 +275,10 @@ pub(crate) async fn handle_poll_range(
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = ctx;
use garage_model::k2v::sub::PollRange;
@ -269,6 +290,7 @@ pub(crate) async fn handle_poll_range(
.k2v
.rpc
.poll_range(
*bucket_params.consistency_mode.get(),
PollRange {
partition: K2VItemPartition {
bucket_id,

View file

@ -19,7 +19,10 @@ pub async fn handle_read_index(
reverse: Option<bool>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let reverse = reverse.unwrap_or(false);
@ -39,6 +42,7 @@ pub async fn handle_read_index(
limit,
Some((DeletedFilter::NotDeleted, node_id_vec)),
EnumerationOrder::from_reverse(reverse),
*bucket_params.consistency_mode.get(),
)
.await?;

View file

@ -101,7 +101,10 @@ pub async fn handle_read_item(
sort_key: &String,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let format = ReturnFormat::from(req)?;
@ -110,6 +113,7 @@ pub async fn handle_read_item(
.k2v
.item_table
.get(
*bucket_params.consistency_mode.get(),
&K2VItemPartition {
bucket_id: *bucket_id,
partition_key: partition_key.to_string(),
@ -129,7 +133,10 @@ pub async fn handle_insert_item(
sort_key: &str,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let causal_context = req
.headers()
@ -149,6 +156,7 @@ pub async fn handle_insert_item(
.k2v
.rpc
.insert(
*bucket_params.consistency_mode.get(),
*bucket_id,
partition_key.to_string(),
sort_key.to_string(),
@ -169,7 +177,10 @@ pub async fn handle_delete_item(
sort_key: &str,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let causal_context = req
.headers()
@ -185,6 +196,7 @@ pub async fn handle_delete_item(
.k2v
.rpc
.insert(
*bucket_params.consistency_mode.get(),
*bucket_id,
partition_key.to_string(),
sort_key.to_string(),
@ -209,7 +221,10 @@ pub async fn handle_poll_item(
timeout_secs: Option<u64>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let format = ReturnFormat::from(req)?;
@ -222,6 +237,7 @@ pub async fn handle_poll_item(
.k2v
.rpc
.poll_item(
*bucket_params.consistency_mode.get(),
*bucket_id,
partition_key,
sort_key,

View file

@ -4,6 +4,7 @@
use std::sync::Arc;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
@ -22,6 +23,7 @@ pub(crate) async fn read_range<F>(
limit: Option<u64>,
filter: Option<F::Filter>,
enumeration_order: EnumerationOrder,
c: ConsistencyMode,
) -> Result<(Vec<F::E>, bool, Option<String>), Error>
where
F: TableSchema<S = String> + 'static,
@ -54,6 +56,7 @@ where
);
let get_ret = table
.get_range(
c,
partition_key,
start.clone(),
filter.clone(),

View file

@ -70,10 +70,10 @@ pub async fn handle_list_buckets(
let mut aliases = HashMap::new();
for bucket_id in ids.iter() {
let bucket = garage.bucket_table.get(&EmptyKey, bucket_id).await?;
let bucket = garage.bucket_table.get((), &EmptyKey, bucket_id).await?;
if let Some(bucket) = bucket {
for (alias, _, _active) in bucket.aliases().iter().filter(|(_, _, active)| *active) {
let alias_opt = garage.bucket_alias_table.get(&EmptyKey, alias).await?;
let alias_opt = garage.bucket_alias_table.get((), &EmptyKey, alias).await?;
if let Some(alias_ent) = alias_opt {
if *alias_ent.state.get() == Some(*bucket_id) {
aliases.insert(alias_ent.name().to_string(), *bucket_id);
@ -187,7 +187,7 @@ pub async fn handle_create_bucket(
}
let bucket = Bucket::new();
garage.bucket_table.insert(&bucket).await?;
garage.bucket_table.insert((), &bucket).await?;
helper
.set_bucket_key_permissions(bucket.id, &api_key.key_id, BucketKeyPerm::ALL_PERMISSIONS)
@ -268,7 +268,7 @@ pub async fn handle_delete_bucket(ctx: ReqCtx) -> Result<Response<ResBody>, Erro
state: Deletable::delete(),
};
// 3. delete bucket
garage.bucket_table.insert(&bucket).await?;
garage.bucket_table.insert((), &bucket).await?;
} else if is_local_alias {
// Just unalias
helper

View file

@ -9,6 +9,7 @@ use hyper::{Request, Response};
use serde::Serialize;
use garage_net::bytes_buf::BytesBuf;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::data::*;
@ -33,13 +34,15 @@ pub async fn handle_copy(
) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let source_object = get_copy_source(&ctx, req).await?;
let (source_object, source_c) = get_copy_source(&ctx, req).await?;
let ReqCtx {
garage,
bucket_id: dest_bucket_id,
bucket_params: dest_bucket_params,
..
} = ctx;
let dest_c = *dest_bucket_params.consistency_mode.get();
let (source_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
@ -80,13 +83,13 @@ pub async fn handle_copy(
dest_key.to_string(),
vec![dest_object_version],
);
garage.object_table.insert(&dest_object).await?;
garage.object_table.insert(dest_c, &dest_object).await?;
}
ObjectVersionData::FirstBlock(_meta, first_block_hash) => {
// Get block list from source version
let source_version = garage
.version_table
.get(&source_version.uuid, &EmptyKey)
.get(source_c, &source_version.uuid, &EmptyKey)
.await?;
let source_version = source_version.ok_or(Error::NoSuchKey)?;
@ -106,7 +109,7 @@ pub async fn handle_copy(
dest_key.to_string(),
vec![tmp_dest_object_version],
);
garage.object_table.insert(&tmp_dest_object).await?;
garage.object_table.insert(dest_c, &tmp_dest_object).await?;
// Write version in the version table. Even with empty block list,
// this means that the BlockRef entries linked to this version cannot be
@ -120,7 +123,7 @@ pub async fn handle_copy(
},
false,
);
garage.version_table.insert(&dest_version).await?;
garage.version_table.insert(dest_c, &dest_version).await?;
// Fill in block list for version and insert block refs
for (bk, bv) in source_version.blocks.items().iter() {
@ -137,8 +140,10 @@ pub async fn handle_copy(
})
.collect::<Vec<_>>();
futures::try_join!(
garage.version_table.insert(&dest_version),
garage.block_ref_table.insert_many(&dest_block_refs[..]),
garage.version_table.insert(dest_c, &dest_version),
garage
.block_ref_table
.insert_many(dest_c, &dest_block_refs[..]),
)?;
// Insert final object
@ -160,7 +165,7 @@ pub async fn handle_copy(
dest_key.to_string(),
vec![dest_object_version],
);
garage.object_table.insert(&dest_object).await?;
garage.object_table.insert(dest_c, &dest_object).await?;
}
}
@ -193,12 +198,17 @@ pub async fn handle_upload_part_copy(
let dest_upload_id = multipart::decode_upload_id(upload_id)?;
let dest_key = dest_key.to_string();
let (source_object, (_, _, mut dest_mpu)) = futures::try_join!(
let ((source_object, source_c), (_, _, mut dest_mpu)) = futures::try_join!(
get_copy_source(&ctx, req),
multipart::get_upload(&ctx, &dest_key, &dest_upload_id)
)?;
let ReqCtx { garage, .. } = ctx;
let ReqCtx {
garage,
bucket_params: dest_bucket_params,
..
} = ctx;
let dest_c = *dest_bucket_params.consistency_mode.get();
let (source_object_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
@ -244,7 +254,7 @@ pub async fn handle_upload_part_copy(
// and destination version to check part hasn't yet been uploaded
let source_version = garage
.version_table
.get(&source_object_version.uuid, &EmptyKey)
.get(source_c, &source_object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?;
@ -304,7 +314,7 @@ pub async fn handle_upload_part_copy(
size: None,
},
);
garage.mpu_table.insert(&dest_mpu).await?;
garage.mpu_table.insert(dest_c, &dest_mpu).await?;
let mut dest_version = Version::new(
dest_version_id,
@ -390,7 +400,7 @@ pub async fn handle_upload_part_copy(
if must_upload {
garage2
.block_manager
.rpc_put_block(final_hash, data, None)
.rpc_put_block(dest_c, final_hash, data, None)
.await
} else {
Ok(())
@ -398,9 +408,9 @@ pub async fn handle_upload_part_copy(
},
async {
// Thing 2: we need to insert the block in the version
garage.version_table.insert(&dest_version).await?;
garage.version_table.insert(dest_c, &dest_version).await?;
// Thing 3: we need to add a block reference
garage.block_ref_table.insert(&block_ref).await
garage.block_ref_table.insert(dest_c, &block_ref).await
},
// Thing 4: we need to prefetch the next block
defragmenter.next(),
@ -422,7 +432,7 @@ pub async fn handle_upload_part_copy(
size: Some(current_offset),
},
);
garage.mpu_table.insert(&dest_mpu).await?;
garage.mpu_table.insert(dest_c, &dest_mpu).await?;
// LGTM
let resp_xml = s3_xml::to_xml_with_header(&CopyPartResult {
@ -440,24 +450,33 @@ pub async fn handle_upload_part_copy(
.body(string_body(resp_xml))?)
}
async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> {
async fn get_copy_source(
ctx: &ReqCtx,
req: &Request<ReqBody>,
) -> Result<(Object, ConsistencyMode), Error> {
let ReqCtx {
garage, api_key, ..
} = ctx;
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
let (source_bucket, source_key) = parse_bucket_key(&copy_source, None)?;
let (source_bucket_name, source_key) = parse_bucket_key(&copy_source, None)?;
let source_bucket_id = garage
.bucket_helper()
.resolve_bucket(&source_bucket.to_string(), api_key)
.resolve_bucket(&source_bucket_name.to_string(), api_key)
.await?;
let source_bucket = garage
.bucket_helper()
.get_existing_bucket(source_bucket_id)
.await?;
let source_bucket_state = source_bucket.state.as_option().unwrap();
let source_c = *source_bucket_state.consistency_mode.get();
if !api_key.allow_read(&source_bucket_id) {
return Err(Error::forbidden(format!(
"Reading from bucket {} not allowed for this key",
source_bucket
source_bucket_name
)));
}
@ -465,11 +484,11 @@ async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object,
let source_object = garage
.object_table
.get(&source_bucket_id, &source_key.to_string())
.get(source_c, &source_bucket_id, &source_key.to_string())
.await?
.ok_or(Error::NoSuchKey)?;
Ok(source_object)
Ok((source_object, source_c))
}
fn extract_source_info(

View file

@ -57,7 +57,7 @@ pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error>
bucket_params.cors_config.update(None);
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.insert((), &Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()
@ -91,7 +91,7 @@ pub async fn handle_put_cors(
.update(Some(conf.into_garage_cors_config()?));
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.insert((), &Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()

View file

@ -14,11 +14,16 @@ use crate::signature::verify_signed_content;
async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = ctx;
let c = *bucket_params.consistency_mode.get();
let object = garage
.object_table
.get(bucket_id, &key.to_string())
.get(c, bucket_id, &key.to_string())
.await?
.ok_or(Error::NoSuchKey)?; // No need to delete
@ -49,7 +54,7 @@ async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid),
}],
);
garage.object_table.insert(&object).await?;
garage.object_table.insert(c, &object).await?;
Ok((deleted_version, del_uuid))
}

View file

@ -14,11 +14,13 @@ use hyper::{body::Body, Request, Response, StatusCode};
use tokio::sync::mpsc;
use garage_net::stream::ByteStream;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::EmptyKey;
use garage_util::data::*;
use garage_util::error::OkOrMessage;
use garage_model::bucket_table::BucketParams;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
@ -136,7 +138,15 @@ pub async fn handle_head(
key: &str,
part_number: Option<u64>,
) -> Result<Response<ResBody>, Error> {
handle_head_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number).await
handle_head_without_ctx(
ctx.garage,
req,
ctx.bucket_id,
&ctx.bucket_params,
key,
part_number,
)
.await
}
/// Handle HEAD request for website
@ -144,12 +154,15 @@ pub async fn handle_head_without_ctx(
garage: Arc<Garage>,
req: &Request<impl Body>,
bucket_id: Uuid,
bucket_params: &BucketParams,
key: &str,
part_number: Option<u64>,
) -> Result<Response<ResBody>, Error> {
let c = *bucket_params.consistency_mode.get();
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
.get(c, &bucket_id, &key.to_string())
.await?
.ok_or(Error::NoSuchKey)?;
@ -194,7 +207,7 @@ pub async fn handle_head_without_ctx(
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
.version_table
.get(&object_version.uuid, &EmptyKey)
.get(c, &object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?;
@ -234,7 +247,16 @@ pub async fn handle_get(
part_number: Option<u64>,
overrides: GetObjectOverrides,
) -> Result<Response<ResBody>, Error> {
handle_get_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number, overrides).await
handle_get_without_ctx(
ctx.garage,
req,
ctx.bucket_id,
&ctx.bucket_params,
key,
part_number,
overrides,
)
.await
}
/// Handle GET request
@ -242,13 +264,16 @@ pub async fn handle_get_without_ctx(
garage: Arc<Garage>,
req: &Request<impl Body>,
bucket_id: Uuid,
bucket_params: &BucketParams,
key: &str,
part_number: Option<u64>,
overrides: GetObjectOverrides,
) -> Result<Response<ResBody>, Error> {
let c = *bucket_params.consistency_mode.get();
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
.get(c, &bucket_id, &key.to_string())
.await?
.ok_or(Error::NoSuchKey)?;
@ -277,10 +302,11 @@ pub async fn handle_get_without_ctx(
(Some(_), Some(_)) => Err(Error::bad_request(
"Cannot specify both partNumber and Range header",
)),
(Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await,
(Some(pn), None) => handle_get_part(garage, c, last_v, last_v_data, last_v_meta, pn).await,
(None, Some(range)) => {
handle_get_range(
garage,
c,
last_v,
last_v_data,
last_v_meta,
@ -289,12 +315,15 @@ pub async fn handle_get_without_ctx(
)
.await
}
(None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await,
(None, None) => {
handle_get_full(garage, c, last_v, last_v_data, last_v_meta, overrides).await
}
}
}
async fn handle_get_full(
garage: Arc<Garage>,
c: ConsistencyMode,
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
@ -321,7 +350,7 @@ async fn handle_get_full(
match async {
let garage2 = garage.clone();
let version_fut = tokio::spawn(async move {
garage2.version_table.get(&version_uuid, &EmptyKey).await
garage2.version_table.get(c, &version_uuid, &EmptyKey).await
});
let stream_block_0 = garage
@ -362,6 +391,7 @@ async fn handle_get_full(
async fn handle_get_range(
garage: Arc<Garage>,
c: ConsistencyMode,
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
@ -394,7 +424,7 @@ async fn handle_get_range(
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
let version = garage
.version_table
.get(&version.uuid, &EmptyKey)
.get(c, &version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?;
@ -406,6 +436,7 @@ async fn handle_get_range(
async fn handle_get_part(
garage: Arc<Garage>,
c: ConsistencyMode,
object_version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
@ -432,7 +463,7 @@ async fn handle_get_part(
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
.version_table
.get(&object_version.uuid, &EmptyKey)
.get(c, &object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?;

View file

@ -44,7 +44,7 @@ pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, E
bucket_params.lifecycle_config.update(None);
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.insert((), &Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()
@ -78,7 +78,7 @@ pub async fn handle_put_lifecycle(
bucket_params.lifecycle_config.update(Some(config));
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.insert((), &Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()

View file

@ -63,11 +63,17 @@ pub async fn handle_list(
ctx: ReqCtx,
query: &ListObjectsQuery,
) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = &ctx;
let ReqCtx {
garage,
bucket_params,
..
} = &ctx;
let c = *bucket_params.consistency_mode.get();
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
t.get_range(
c,
&bucket,
key,
Some(ObjectFilter::IsData),
@ -169,12 +175,18 @@ pub async fn handle_list_multipart_upload(
ctx: ReqCtx,
query: &ListMultipartUploadsQuery,
) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = &ctx;
let ReqCtx {
garage,
bucket_params,
..
} = &ctx;
let c = *bucket_params.consistency_mode.get();
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
t.get_range(
c,
&bucket,
key,
Some(ObjectFilter::IsUploading {

View file

@ -5,6 +5,7 @@ use futures::prelude::*;
use hyper::{Request, Response};
use md5::{Digest as Md5Digest, Md5};
use garage_rpc::replication_mode::ConsistencyMode;
use garage_table::*;
use garage_util::data::*;
@ -32,9 +33,12 @@ pub async fn handle_create_multipart_upload(
garage,
bucket_id,
bucket_name,
bucket_params,
..
} = &ctx;
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
let c = *bucket_params.consistency_mode.get();
let existing_object = garage.object_table.get(c, &bucket_id, key).await?;
let upload_id = gen_uuid();
let timestamp = next_timestamp(existing_object.as_ref());
@ -51,13 +55,13 @@ pub async fn handle_create_multipart_upload(
},
};
let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&object).await?;
garage.object_table.insert(c, &object).await?;
// Create multipart upload in mpu table
// This multipart upload will hold references to uploaded parts
// (which are entries in the Version table)
let mpu = MultipartUpload::new(upload_id, timestamp, *bucket_id, key.into(), false);
garage.mpu_table.insert(&mpu).await?;
garage.mpu_table.insert(c, &mpu).await?;
// Send success response
let result = s3_xml::InitiateMultipartUploadResult {
@ -79,7 +83,12 @@ pub async fn handle_put_part(
upload_id: &str,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = &ctx;
let ReqCtx {
garage,
bucket_params,
..
} = &ctx;
let c = *bucket_params.consistency_mode.get();
let upload_id = decode_upload_id(upload_id)?;
@ -112,6 +121,7 @@ pub async fn handle_put_part(
// before everything is finished (cleanup is done using the Drop trait).
let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
garage: garage.clone(),
consistency_mode: c,
upload_id,
version_uuid,
}));
@ -126,14 +136,14 @@ pub async fn handle_put_part(
size: None,
},
);
garage.mpu_table.insert(&mpu).await?;
garage.mpu_table.insert(c, &mpu).await?;
let version = Version::new(
version_uuid,
VersionBacklink::MultipartUpload { upload_id },
false,
);
garage.version_table.insert(&version).await?;
garage.version_table.insert(c, &version).await?;
// Copy data to version
let (total_size, data_md5sum, data_sha256sum, _) =
@ -157,7 +167,7 @@ pub async fn handle_put_part(
size: Some(total_size),
},
);
garage.mpu_table.insert(&mpu).await?;
garage.mpu_table.insert(c, &mpu).await?;
// We were not interrupted, everything went fine.
// We won't have to clean up on drop.
@ -173,6 +183,7 @@ pub async fn handle_put_part(
struct InterruptedCleanup(Option<InterruptedCleanupInner>);
struct InterruptedCleanupInner {
garage: Arc<Garage>,
consistency_mode: ConsistencyMode,
upload_id: Uuid,
version_uuid: Uuid,
}
@ -193,7 +204,12 @@ impl Drop for InterruptedCleanup {
},
true,
);
if let Err(e) = info.garage.version_table.insert(&version).await {
if let Err(e) = info
.garage
.version_table
.insert(info.consistency_mode, &version)
.await
{
warn!("Cannot cleanup after aborted UploadPart: {}", e);
}
});
@ -212,8 +228,10 @@ pub async fn handle_complete_multipart_upload(
garage,
bucket_id,
bucket_name,
bucket_params,
..
} = &ctx;
let c = *bucket_params.consistency_mode.get();
let body = http_body_util::BodyExt::collect(req.into_body())
.await?
@ -279,7 +297,7 @@ pub async fn handle_complete_multipart_upload(
let grg = &garage;
let parts_versions = futures::future::try_join_all(parts.iter().map(|p| async move {
grg.version_table
.get(&p.version, &EmptyKey)
.get(c, &p.version, &EmptyKey)
.await?
.ok_or_internal_error("Part version missing from version table")
}))
@ -308,14 +326,14 @@ pub async fn handle_complete_multipart_upload(
);
}
}
garage.version_table.insert(&final_version).await?;
garage.version_table.insert(c, &final_version).await?;
let block_refs = final_version.blocks.items().iter().map(|(_, b)| BlockRef {
block: b.hash,
version: upload_id,
deleted: false.into(),
});
garage.block_ref_table.insert_many(block_refs).await?;
garage.block_ref_table.insert_many(c, block_refs).await?;
// Calculate etag of final object
// To understand how etags are calculated, read more here:
@ -336,7 +354,7 @@ pub async fn handle_complete_multipart_upload(
if let Err(e) = check_quotas(&ctx, total_size, Some(&object)).await {
object_version.state = ObjectVersionState::Aborted;
let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
garage.object_table.insert(c, &final_object).await?;
return Err(e);
}
@ -352,7 +370,7 @@ pub async fn handle_complete_multipart_upload(
));
let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
garage.object_table.insert(c, &final_object).await?;
// Send response saying ok we're done
let result = s3_xml::CompleteMultipartUploadResult {
@ -373,8 +391,12 @@ pub async fn handle_abort_multipart_upload(
upload_id: &str,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let c = *bucket_params.consistency_mode.get();
let upload_id = decode_upload_id(upload_id)?;
@ -382,7 +404,7 @@ pub async fn handle_abort_multipart_upload(
object_version.state = ObjectVersionState::Aborted;
let final_object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
garage.object_table.insert(c, &final_object).await?;
Ok(Response::new(empty_body()))
}
@ -396,13 +418,21 @@ pub(crate) async fn get_upload(
upload_id: &Uuid,
) -> Result<(Object, ObjectVersion, MultipartUpload), Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = ctx;
let c = *bucket_params.consistency_mode.get();
let (object, mpu) = futures::try_join!(
garage.object_table.get(bucket_id, key).map_err(Error::from),
garage
.object_table
.get(c, bucket_id, key)
.map_err(Error::from),
garage
.mpu_table
.get(upload_id, &EmptyKey)
.get(c, upload_id, &EmptyKey)
.map_err(Error::from),
)?;

View file

@ -20,6 +20,7 @@ use opentelemetry::{
};
use garage_net::bytes_buf::BytesBuf;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::async_hash::*;
@ -71,13 +72,20 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256: Option<FixedBytes32>,
) -> Result<(Uuid, String), Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = ctx;
let c = *bucket_params.consistency_mode.get();
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let (first_block_opt, existing_object) = try_join!(
chunker.next(),
garage.object_table.get(bucket_id, key).map_err(Error::from),
garage
.object_table
.get(c, bucket_id, key)
.map_err(Error::from),
)?;
let first_block = first_block_opt.unwrap_or_default();
@ -120,7 +128,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
};
let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
garage.object_table.insert(c, &object).await?;
return Ok((version_uuid, data_md5sum_hex));
}
@ -131,6 +139,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
garage: garage.clone(),
bucket_id: *bucket_id,
consistency_mode: c,
key: key.into(),
version_uuid,
version_timestamp,
@ -147,7 +156,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
},
};
let object = Object::new(*bucket_id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
garage.object_table.insert(c, &object).await?;
// Initialize corresponding entry in version table
// Write this entry now, even with empty block list,
@ -161,7 +170,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
},
false,
);
garage.version_table.insert(&version).await?;
garage.version_table.insert(c, &version).await?;
// Transfer data and verify checksum
let (total_size, data_md5sum, data_sha256sum, first_block_hash) =
@ -187,7 +196,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
first_block_hash,
));
let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
garage.object_table.insert(c, &object).await?;
// We were not interrupted, everything went fine.
// We won't have to clean up on drop.
@ -235,6 +244,7 @@ pub(crate) async fn check_quotas(
bucket_params,
..
} = ctx;
let c = *bucket_params.consistency_mode.get();
let quotas = bucket_params.quotas.get();
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
@ -244,7 +254,7 @@ pub(crate) async fn check_quotas(
let counters = garage
.object_counter_table
.table
.get(bucket_id, &EmptyKey)
.get(c, bucket_id, &EmptyKey)
.await?;
let counters = counters
@ -451,7 +461,12 @@ async fn put_block_and_meta(
block: Bytes,
order_tag: OrderTag,
) -> Result<(), GarageError> {
let ReqCtx { garage, .. } = ctx;
let ReqCtx {
garage,
bucket_params,
..
} = ctx;
let c = *bucket_params.consistency_mode.get();
let mut version = version.clone();
version.blocks.put(
@ -474,9 +489,9 @@ async fn put_block_and_meta(
futures::try_join!(
garage
.block_manager
.rpc_put_block(hash, block, Some(order_tag)),
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
.rpc_put_block(c, hash, block, Some(order_tag)),
garage.version_table.insert(c, &version),
garage.block_ref_table.insert(c, &block_ref),
)?;
Ok(())
}
@ -529,6 +544,7 @@ struct InterruptedCleanup(Option<InterruptedCleanupInner>);
struct InterruptedCleanupInner {
garage: Arc<Garage>,
bucket_id: Uuid,
consistency_mode: ConsistencyMode,
key: String,
version_uuid: Uuid,
version_timestamp: u64,
@ -549,7 +565,12 @@ impl Drop for InterruptedCleanup {
state: ObjectVersionState::Aborted,
};
let object = Object::new(info.bucket_id, info.key, vec![object_version]);
if let Err(e) = info.garage.object_table.insert(&object).await {
if let Err(e) = info
.garage
.object_table
.insert(info.consistency_mode, &object)
.await
{
warn!("Cannot cleanup after aborted PutObject: {}", e);
}
});

View file

@ -49,7 +49,7 @@ pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Err
bucket_params.website_config.update(None);
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.insert((), &Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()
@ -83,7 +83,7 @@ pub async fn handle_put_website(
.update(Some(conf.into_garage_website_config()?));
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.insert((), &Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()

View file

@ -370,7 +370,7 @@ pub async fn verify_v4(
let key = garage
.key_table
.get(&EmptyKey, &auth.key_id)
.get((), &EmptyKey, &auth.key_id)
.await?
.filter(|k| !k.state.is_deleted())
.ok_or_else(|| Error::forbidden(format!("No such key: {}", &auth.key_id)))?;

View file

@ -29,12 +29,11 @@ use garage_util::metrics::RecordDuration;
use garage_util::persister::{Persister, PersisterShared};
use garage_util::time::msec_to_rfc3339;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System;
use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
use crate::layout::*;
use crate::metrics::*;
@ -74,9 +73,6 @@ impl Rpc for BlockRpc {
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
/// Data layout
pub(crate) data_layout: ArcSwap<DataLayout>,
/// Data layout persister
@ -122,7 +118,6 @@ impl BlockManager {
data_dir: DataDirEnum,
data_fsync: bool,
compression_level: Option<i32>,
replication: TableShardedReplication,
system: Arc<System>,
) -> Result<Arc<Self>, Error> {
// Load or compute layout, i.e. assignment of data blocks to the different data directories
@ -163,7 +158,6 @@ impl BlockManager {
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
let block_manager = Arc::new(Self {
replication,
data_layout: ArcSwap::new(Arc::new(data_layout)),
data_layout_persister,
data_fsync,
@ -350,11 +344,12 @@ impl BlockManager {
/// Send block to nodes that should have it
pub async fn rpc_put_block(
&self,
consistency_mode: ConsistencyMode,
hash: Hash,
data: Bytes,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
let who = self.replication.write_sets(&hash);
let who = self.system.layout_manager.write_sets_of(&hash);
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
.await
@ -374,7 +369,7 @@ impl BlockManager {
who.as_ref(),
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_quorum(self.replication.write_quorum()),
.with_quorum(self.system.layout_manager.write_quorum(consistency_mode)),
)
.await?;

View file

@ -28,8 +28,6 @@ use garage_util::tranquilizer::Tranquilizer;
use garage_rpc::system::System;
use garage_rpc::*;
use garage_table::replication::TableReplication;
use crate::manager::*;
// The delay between the time where a resync operation fails
@ -377,8 +375,17 @@ impl BlockResyncManager {
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
let mut who = manager.replication.storage_nodes(hash);
if who.len() < manager.replication.write_quorum() {
let mut who = manager
.system
.layout_manager
.layout()
.storage_nodes_of(hash);
if who.len()
< manager
.system
.layout_manager
.write_quorum(Default::default())
{
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
}
who.retain(|id| *id != manager.system.id);

View file

@ -30,7 +30,14 @@ impl AdminRpcHandler {
let block_refs = self
.garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.get_range(
Default::default(),
&hash,
None,
None,
10000,
Default::default(),
)
.await?;
let mut versions = vec![];
let mut uploads = vec![];
@ -38,11 +45,16 @@ impl AdminRpcHandler {
if let Some(v) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.get(Default::default(), &br.version, &EmptyKey)
.await?
{
if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink {
if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
if let Some(u) = self
.garage
.mpu_table
.get(Default::default(), upload_id, &EmptyKey)
.await?
{
uploads.push(u);
}
}
@ -108,14 +120,21 @@ impl AdminRpcHandler {
let block_refs = self
.garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.get_range(
Default::default(),
&hash,
None,
None,
10000,
Default::default(),
)
.await?;
for br in block_refs {
if let Some(version) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.get(Default::default(), &br.version, &EmptyKey)
.await?
{
self.handle_block_purge_version_backlink(
@ -127,7 +146,10 @@ impl AdminRpcHandler {
if !version.deleted.get() {
let deleted_version = Version::new(version.uuid, version.backlink, true);
self.garage.version_table.insert(&deleted_version).await?;
self.garage
.version_table
.insert(Default::default(), &deleted_version)
.await?;
ver_dels += 1;
}
}
@ -152,11 +174,19 @@ impl AdminRpcHandler {
let (bucket_id, key, ov_id) = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
VersionBacklink::MultipartUpload { upload_id } => {
if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
if let Some(mut mpu) = self
.garage
.mpu_table
.get(Default::default(), upload_id, &EmptyKey)
.await?
{
if !mpu.deleted.get() {
mpu.parts.clear();
mpu.deleted.set();
self.garage.mpu_table.insert(&mpu).await?;
self.garage
.mpu_table
.insert(Default::default(), &mpu)
.await?;
*mpu_dels += 1;
}
(mpu.bucket_id, mpu.key.clone(), *upload_id)
@ -166,7 +196,12 @@ impl AdminRpcHandler {
}
};
if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? {
if let Some(object) = self
.garage
.object_table
.get(Default::default(), &bucket_id, &key)
.await?
{
let ov = object.versions().iter().rev().find(|v| v.is_complete());
if let Some(ov) = ov {
if ov.uuid == ov_id {
@ -180,7 +215,10 @@ impl AdminRpcHandler {
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
self.garage.object_table.insert(&deleted_object).await?;
self.garage
.object_table
.insert(Default::default(), &deleted_object)
.await?;
*obj_dels += 1;
}
}

View file

@ -39,6 +39,7 @@ impl AdminRpcHandler {
.garage
.bucket_table
.get_range(
(),
&EmptyKey,
None,
Some(DeletedFilter::NotDeleted),
@ -63,12 +64,14 @@ impl AdminRpcHandler {
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket_params = bucket.state.as_option().unwrap();
let c = *bucket_params.consistency_mode.get();
let counters = self
.garage
.object_counter_table
.table
.get(&bucket_id, &EmptyKey)
.get(c, &bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&self.garage.system.cluster_layout()))
.unwrap_or_default();
@ -77,42 +80,28 @@ impl AdminRpcHandler {
.garage
.mpu_counter_table
.table
.get(&bucket_id, &EmptyKey)
.get(c, &bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&self.garage.system.cluster_layout()))
.unwrap_or_default();
let mut relevant_keys = HashMap::new();
for (k, _) in bucket
.state
.as_option()
.unwrap()
.authorized_keys
.items()
.iter()
{
for (k, _) in bucket_params.authorized_keys.items().iter() {
if let Some(key) = self
.garage
.key_table
.get(&EmptyKey, k)
.get((), &EmptyKey, k)
.await?
.filter(|k| !k.is_deleted())
{
relevant_keys.insert(k.clone(), key);
}
}
for ((k, _), _, _) in bucket
.state
.as_option()
.unwrap()
.local_aliases
.items()
.iter()
{
for ((k, _), _, _) in bucket_params.local_aliases.items().iter() {
if relevant_keys.contains_key(k) {
continue;
}
if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? {
if let Some(key) = self.garage.key_table.get((), &EmptyKey, k).await? {
relevant_keys.insert(k.clone(), key);
}
}
@ -136,7 +125,12 @@ impl AdminRpcHandler {
let helper = self.garage.locked_helper().await;
if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
if let Some(alias) = self
.garage
.bucket_alias_table
.get((), &EmptyKey, name)
.await?
{
if alias.state.get().is_some() {
return Err(Error::BadRequest(format!("Bucket {} already exists", name)));
}
@ -145,7 +139,7 @@ impl AdminRpcHandler {
// ---- done checking, now commit ----
let bucket = Bucket::new();
self.garage.bucket_table.insert(&bucket).await?;
self.garage.bucket_table.insert((), &bucket).await?;
helper.set_global_bucket_alias(bucket.id, name).await?;
@ -170,7 +164,7 @@ impl AdminRpcHandler {
let bucket_alias = self
.garage
.bucket_alias_table
.get(&EmptyKey, &query.name)
.get((), &EmptyKey, &query.name)
.await?;
// Check bucket doesn't have other aliases
@ -225,7 +219,7 @@ impl AdminRpcHandler {
// 3. delete bucket
bucket.state = Deletable::delete();
self.garage.bucket_table.insert(&bucket).await?;
self.garage.bucket_table.insert((), &bucket).await?;
Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
}
@ -405,7 +399,7 @@ impl AdminRpcHandler {
};
bucket_state.website_config.update(website);
self.garage.bucket_table.insert(&bucket).await?;
self.garage.bucket_table.insert((), &bucket).await?;
let msg = if query.allow {
format!("Website access allowed for {}", &query.bucket)
@ -462,7 +456,7 @@ impl AdminRpcHandler {
}
bucket_state.quotas.update(quotas);
self.garage.bucket_table.insert(&bucket).await?;
self.garage.bucket_table.insert((), &bucket).await?;
Ok(AdminRpc::Ok(format!(
"Quotas updated for {}",

View file

@ -28,6 +28,7 @@ impl AdminRpcHandler {
.garage
.key_table
.get_range(
(),
&EmptyKey,
None,
Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
@ -57,7 +58,7 @@ impl AdminRpcHandler {
async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> {
let key = Key::new(&query.name);
self.garage.key_table.insert(&key).await?;
self.garage.key_table.insert((), &key).await?;
self.key_info_result(key).await
}
@ -71,7 +72,7 @@ impl AdminRpcHandler {
.unwrap()
.name
.update(query.new_name.clone());
self.garage.key_table.insert(&key).await?;
self.garage.key_table.insert((), &key).await?;
self.key_info_result(key).await
}
@ -106,7 +107,7 @@ impl AdminRpcHandler {
if query.create_bucket {
key.params_mut().unwrap().allow_create_bucket.update(true);
}
self.garage.key_table.insert(&key).await?;
self.garage.key_table.insert((), &key).await?;
self.key_info_result(key).await
}
@ -119,7 +120,7 @@ impl AdminRpcHandler {
if query.create_bucket {
key.params_mut().unwrap().allow_create_bucket.update(false);
}
self.garage.key_table.insert(&key).await?;
self.garage.key_table.insert((), &key).await?;
self.key_info_result(key).await
}
@ -128,14 +129,18 @@ impl AdminRpcHandler {
return Err(Error::BadRequest("This command is intended to re-import keys that were previously generated by Garage. If you want to create a new key, use `garage key new` instead. Add the --yes flag if you really want to re-import a key.".to_string()));
}
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
let prev_key = self
.garage
.key_table
.get((), &EmptyKey, &query.key_id)
.await?;
if prev_key.is_some() {
return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
}
let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name)
.ok_or_bad_request("Invalid key format")?;
self.garage.key_table.insert(&imported_key).await?;
self.garage.key_table.insert((), &imported_key).await?;
self.key_info_result(imported_key).await
}
@ -151,7 +156,7 @@ impl AdminRpcHandler {
.items()
.iter()
{
if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? {
if let Some(b) = self.garage.bucket_table.get((), &EmptyKey, id).await? {
relevant_buckets.insert(*id, b);
}
}

View file

@ -176,7 +176,7 @@ impl TableRepair for RepairVersions {
let ref_exists = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => garage
.object_table
.get(bucket_id, key)
.get(Default::default(), bucket_id, key)
.await?
.map(|o| {
o.versions().iter().any(|x| {
@ -186,7 +186,7 @@ impl TableRepair for RepairVersions {
.unwrap_or(false),
VersionBacklink::MultipartUpload { upload_id } => garage
.mpu_table
.get(upload_id, &EmptyKey)
.get(Default::default(), upload_id, &EmptyKey)
.await?
.map(|u| !u.deleted.get())
.unwrap_or(false),
@ -196,7 +196,10 @@ impl TableRepair for RepairVersions {
info!("Repair versions: marking version as deleted: {:?}", version);
garage
.version_table
.insert(&Version::new(version.uuid, version.backlink, true))
.insert(
Default::default(),
&Version::new(version.uuid, version.backlink, true),
)
.await?;
return Ok(true);
}
@ -222,7 +225,7 @@ impl TableRepair for RepairBlockRefs {
if !block_ref.deleted.get() {
let ref_exists = garage
.version_table
.get(&block_ref.version, &EmptyKey)
.get(Default::default(), &block_ref.version, &EmptyKey)
.await?
.map(|v| !v.deleted.get())
.unwrap_or(false);
@ -233,7 +236,10 @@ impl TableRepair for RepairBlockRefs {
block_ref
);
block_ref.deleted.set();
garage.block_ref_table.insert(&block_ref).await?;
garage
.block_ref_table
.insert(Default::default(), &block_ref)
.await?;
return Ok(true);
}
}
@ -258,7 +264,7 @@ impl TableRepair for RepairMpu {
if !mpu.deleted.get() {
let ref_exists = garage
.object_table
.get(&mpu.bucket_id, &mpu.key)
.get(Default::default(), &mpu.bucket_id, &mpu.key)
.await?
.map(|o| {
o.versions()
@ -274,7 +280,7 @@ impl TableRepair for RepairMpu {
);
mpu.parts.clear();
mpu.deleted.set();
garage.mpu_table.insert(&mpu).await?;
garage.mpu_table.insert(Default::default(), &mpu).await?;
return Ok(true);
}
}

View file

@ -163,7 +163,7 @@ mod tests {
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
replication_mode = "3"
replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret_file = "{}"
@ -185,7 +185,7 @@ mod tests {
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
replication_mode = "3"
replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret_file = "{}"
allow_world_readable_secrets = true
@ -296,7 +296,7 @@ mod tests {
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
replication_mode = "3"
replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret= "dummy"
rpc_secret_file = "dummy"

View file

@ -54,7 +54,7 @@ metadata_dir = "{path}/meta"
data_dir = "{path}/data"
db_engine = "lmdb"
replication_mode = "1"
replication_factor = 1
rpc_bind_addr = "127.0.0.1:{rpc_port}"
rpc_public_addr = "127.0.0.1:{rpc_port}"

View file

@ -1,3 +1,4 @@
use garage_rpc::replication_mode::ConsistencyMode;
use garage_table::crdt::*;
use garage_table::*;
use garage_util::data::*;
@ -119,7 +120,94 @@ mod v08 {
impl garage_util::migrate::InitialFormat for Bucket {}
}
pub use v08::*;
mod v011 {
use super::v08;
pub use super::v08::{
BucketQuotas, CorsRule, LifecycleExpiration, LifecycleFilter, LifecycleRule, WebsiteConfig,
};
use crate::permission::BucketKeyPerm;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_util::crdt;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
/// A bucket is a collection of objects
///
/// Its parameters are not directly accessible as:
/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
/// ID of the bucket
pub id: Uuid,
/// State, and configuration if not deleted, of the bucket
pub state: crdt::Deletable<BucketParams>,
}
/// Configuration for a bucket
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
/// Bucket's creation date
pub creation_date: u64,
/// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::Map<String, BucketKeyPerm>,
/// Map of aliases that are or have been given to this bucket
/// in the global namespace
/// (not authoritative: this is just used as an indication to
/// map back to aliases when doing ListBuckets)
pub aliases: crdt::LwwMap<String, bool>,
/// Map of aliases that are or have been given to this bucket
/// in namespaces local to keys
/// key = (access key id, alias name)
pub local_aliases: crdt::LwwMap<(String, String), bool>,
/// Whether to enable read-after-write consistency for this bucket
pub consistency_mode: crdt::Lww<ConsistencyMode>,
/// Whether this bucket is allowed for website access
/// (under all of its global alias names),
/// and if so, the website configuration XML document
pub website_config: crdt::Lww<Option<WebsiteConfig>>,
/// CORS rules
pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
/// Lifecycle configuration
#[serde(default)]
pub lifecycle_config: crdt::Lww<Option<Vec<LifecycleRule>>>,
/// Bucket quotas
#[serde(default)]
pub quotas: crdt::Lww<BucketQuotas>,
}
impl garage_util::migrate::Migrate for Bucket {
const VERSION_MARKER: &'static [u8] = b"G011lh";
type Previous = v08::Bucket;
fn migrate(previous: Self::Previous) -> Self {
Self {
id: previous.id,
state: match previous.state {
crdt::Deletable::Present(prev_state) => {
crdt::Deletable::Present(BucketParams {
creation_date: prev_state.creation_date,
authorized_keys: prev_state.authorized_keys,
aliases: prev_state.aliases,
local_aliases: prev_state.local_aliases,
website_config: prev_state.website_config,
cors_config: prev_state.cors_config,
lifecycle_config: prev_state.lifecycle_config,
quotas: prev_state.quotas,
consistency_mode: crdt::Lww::new(ConsistencyMode::Consistent),
})
}
crdt::Deletable::Deleted => crdt::Deletable::Deleted,
},
}
}
}
}
pub use v011::*;
impl AutoCrdt for BucketQuotas {
const WARN_IF_DIFFERENT: bool = true;
@ -133,6 +221,7 @@ impl BucketParams {
authorized_keys: crdt::Map::new(),
aliases: crdt::LwwMap::new(),
local_aliases: crdt::LwwMap::new(),
consistency_mode: crdt::Lww::new(ConsistencyMode::Consistent),
website_config: crdt::Lww::new(None),
cors_config: crdt::Lww::new(None),
lifecycle_config: crdt::Lww::new(None),

View file

@ -9,7 +9,7 @@ use garage_util::config::*;
use garage_util::error::*;
use garage_util::persister::PersisterShared;
use garage_rpc::replication_mode::ReplicationMode;
use garage_rpc::replication_mode::*;
use garage_rpc::system::System;
use garage_block::manager::*;
@ -39,8 +39,8 @@ pub struct Garage {
/// The set of background variables that can be viewed/modified at runtime
pub bg_vars: vars::BgVars,
/// The replication mode of this cluster
pub replication_mode: ReplicationMode,
/// The replication factor of this cluster
pub replication_factor: ReplicationFactor,
/// The local database
pub db: db::Db,
@ -222,27 +222,16 @@ impl Garage {
.and_then(|x| NetworkKey::from_slice(&x))
.ok_or_message("Invalid RPC secret key")?;
let replication_mode = ReplicationMode::parse(&config.replication_mode)
.ok_or_message("Invalid replication_mode in config file.")?;
let (replication_factor, consistency_mode) = parse_replication_mode(&config)?;
info!("Initialize background variable system...");
let mut bg_vars = vars::BgVars::new();
info!("Initialize membership management system...");
let system = System::new(network_key, replication_mode, &config)?;
let data_rep_param = TableShardedReplication {
system: system.clone(),
replication_factor: replication_mode.replication_factor(),
write_quorum: replication_mode.write_quorum(),
read_quorum: 1,
};
let system = System::new(network_key, replication_factor, consistency_mode, &config)?;
let meta_rep_param = TableShardedReplication {
system: system.clone(),
replication_factor: replication_mode.replication_factor(),
write_quorum: replication_mode.write_quorum(),
read_quorum: replication_mode.read_quorum(),
layout_manager: system.layout_manager.clone(),
};
let control_rep_param = TableFullReplication {
@ -255,7 +244,6 @@ impl Garage {
config.data_dir.clone(),
config.data_fsync,
config.compression_level,
data_rep_param,
system.clone(),
)?;
block_manager.register_bg_vars(&mut bg_vars);
@ -338,7 +326,7 @@ impl Garage {
Ok(Arc::new(Self {
config,
bg_vars,
replication_mode,
replication_factor,
db,
system,
block_manager,

View file

@ -36,7 +36,7 @@ impl<'a> BucketHelper<'a> {
Ok(self
.0
.bucket_table
.get(&EmptyKey, &bucket_id)
.get((), &EmptyKey, &bucket_id)
.await?
.filter(|x| !x.state.is_deleted())
.map(|_| bucket_id))
@ -44,7 +44,7 @@ impl<'a> BucketHelper<'a> {
Ok(self
.0
.bucket_alias_table
.get(&EmptyKey, bucket_name)
.get((), &EmptyKey, bucket_name)
.await?
.and_then(|x| *x.state.get()))
}
@ -74,7 +74,7 @@ impl<'a> BucketHelper<'a> {
Ok(self
.0
.bucket_table
.get(&EmptyKey, &bucket_id)
.get((), &EmptyKey, &bucket_id)
.await?
.ok_or_message(format!("Bucket {:?} does not exist", bucket_id))?)
}
@ -86,7 +86,7 @@ impl<'a> BucketHelper<'a> {
pub async fn get_existing_bucket(&self, bucket_id: Uuid) -> Result<Bucket, Error> {
self.0
.bucket_table
.get(&EmptyKey, &bucket_id)
.get((), &EmptyKey, &bucket_id)
.await?
.filter(|b| !b.is_deleted())
.ok_or_else(|| Error::NoSuchBucket(hex::encode(bucket_id)))
@ -95,10 +95,20 @@ impl<'a> BucketHelper<'a> {
// ----
pub async fn is_bucket_empty(&self, bucket_id: Uuid) -> Result<bool, Error> {
let consistency_mode = *self
.get_existing_bucket(bucket_id)
.await?
.state
.as_option()
.unwrap()
.consistency_mode
.get();
let objects = self
.0
.object_table
.get_range(
consistency_mode,
&bucket_id,
None,
Some(ObjectFilter::IsData),
@ -124,6 +134,7 @@ impl<'a> BucketHelper<'a> {
.counter_table
.table
.get_range(
consistency_mode,
&bucket_id,
None,
Some((DeletedFilter::NotDeleted, node_id_vec)),
@ -151,6 +162,12 @@ impl<'a> BucketHelper<'a> {
older_than: Duration,
) -> Result<usize, Error> {
let older_than = now_msec() - older_than.as_millis() as u64;
let consistency_mode = self
.get_existing_bucket(*bucket_id)
.await?
.params()
.map(|params| *params.consistency_mode.get())
.unwrap_or_default();
let mut ret = 0usize;
let mut start = None;
@ -160,6 +177,7 @@ impl<'a> BucketHelper<'a> {
.0
.object_table
.get_range(
consistency_mode,
bucket_id,
start,
Some(ObjectFilter::IsUploading {
@ -196,7 +214,10 @@ impl<'a> BucketHelper<'a> {
.collect::<Vec<_>>();
ret += abortions.len();
self.0.object_table.insert_many(abortions).await?;
self.0
.object_table
.insert_many(consistency_mode, abortions)
.await?;
if objects.len() < 1000 {
break;

View file

@ -16,7 +16,7 @@ impl<'a> KeyHelper<'a> {
Ok(self
.0
.key_table
.get(&EmptyKey, key_id)
.get((), &EmptyKey, key_id)
.await?
.ok_or_message(format!("Key {} does not exist", key_id))?)
}
@ -28,7 +28,7 @@ impl<'a> KeyHelper<'a> {
pub async fn get_existing_key(&self, key_id: &String) -> Result<Key, Error> {
self.0
.key_table
.get(&EmptyKey, key_id)
.get((), &EmptyKey, key_id)
.await?
.filter(|b| !b.state.is_deleted())
.ok_or_else(|| Error::NoSuchAccessKey(key_id.to_string()))
@ -44,6 +44,7 @@ impl<'a> KeyHelper<'a> {
.0
.key_table
.get_range(
(),
&EmptyKey,
None,
Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())),

View file

@ -56,7 +56,11 @@ impl<'a> LockedHelper<'a> {
let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?;
let alias = self.0.bucket_alias_table.get(&EmptyKey, alias_name).await?;
let alias = self
.0
.bucket_alias_table
.get((), &EmptyKey, alias_name)
.await?;
if let Some(existing_alias) = alias.as_ref() {
if let Some(p_bucket) = existing_alias.state.get() {
@ -88,10 +92,10 @@ impl<'a> LockedHelper<'a> {
a
}
};
self.0.bucket_alias_table.insert(&alias).await?;
self.0.bucket_alias_table.insert((), &alias).await?;
bucket_p.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, true);
self.0.bucket_table.insert(&bucket).await?;
self.0.bucket_table.insert((), &bucket).await?;
Ok(())
}
@ -112,7 +116,7 @@ impl<'a> LockedHelper<'a> {
let mut alias = self
.0
.bucket_alias_table
.get(&EmptyKey, alias_name)
.get((), &EmptyKey, alias_name)
.await?
.filter(|a| a.state.get().map(|x| x == bucket_id).unwrap_or(false))
.ok_or_message(format!(
@ -144,10 +148,10 @@ impl<'a> LockedHelper<'a> {
// writes are now done and all writes use timestamp alias_ts
alias.state = Lww::raw(alias_ts, None);
self.0.bucket_alias_table.insert(&alias).await?;
self.0.bucket_alias_table.insert((), &alias).await?;
bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
self.0.bucket_table.insert((), &bucket).await?;
Ok(())
}
@ -168,7 +172,7 @@ impl<'a> LockedHelper<'a> {
let mut alias = self
.0
.bucket_alias_table
.get(&EmptyKey, alias_name)
.get((), &EmptyKey, alias_name)
.await?
.ok_or_else(|| Error::NoSuchBucket(alias_name.to_string()))?;
@ -186,12 +190,12 @@ impl<'a> LockedHelper<'a> {
if alias.state.get() == &Some(bucket_id) {
alias.state = Lww::raw(alias_ts, None);
self.0.bucket_alias_table.insert(&alias).await?;
self.0.bucket_alias_table.insert((), &alias).await?;
}
if let Some(bucket_state) = bucket.state.as_option_mut() {
bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
self.0.bucket_table.insert((), &bucket).await?;
}
Ok(())
@ -245,10 +249,10 @@ impl<'a> LockedHelper<'a> {
// writes are now done and all writes use timestamp alias_ts
key_param.local_aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, Some(bucket_id));
self.0.key_table.insert(&key).await?;
self.0.key_table.insert((), &key).await?;
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true);
self.0.bucket_table.insert(&bucket).await?;
self.0.bucket_table.insert((), &bucket).await?;
Ok(())
}
@ -317,10 +321,10 @@ impl<'a> LockedHelper<'a> {
// writes are now done and all writes use timestamp alias_ts
key_param.local_aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, None);
self.0.key_table.insert(&key).await?;
self.0.key_table.insert((), &key).await?;
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
self.0.bucket_table.insert((), &bucket).await?;
Ok(())
}
@ -365,12 +369,12 @@ impl<'a> LockedHelper<'a> {
if let Some(bstate) = bucket.state.as_option_mut() {
bstate.authorized_keys = Map::put_mutator(key_id.clone(), perm);
self.0.bucket_table.insert(&bucket).await?;
self.0.bucket_table.insert((), &bucket).await?;
}
if let Some(kstate) = key.state.as_option_mut() {
kstate.authorized_buckets = Map::put_mutator(bucket_id, perm);
self.0.key_table.insert(&key).await?;
self.0.key_table.insert((), &key).await?;
}
Ok(())
@ -403,7 +407,7 @@ impl<'a> LockedHelper<'a> {
// 3. Actually delete key
key.state = Deletable::delete();
self.0.key_table.insert(key).await?;
self.0.key_table.insert((), key).await?;
Ok(())
}

View file

@ -23,6 +23,7 @@ use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::now_msec;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_rpc::system::System;
use garage_rpc::*;
@ -43,8 +44,8 @@ const TIMESTAMP_KEY: &[u8] = b"timestamp";
#[derive(Debug, Serialize, Deserialize)]
enum K2VRpc {
Ok,
InsertItem(InsertedItem),
InsertManyItems(Vec<InsertedItem>),
InsertItem(ConsistencyMode, InsertedItem),
InsertManyItems(ConsistencyMode, Vec<InsertedItem>),
PollItem {
key: PollKey,
causal_context: CausalContext,
@ -113,6 +114,7 @@ impl K2VRpcHandler {
pub async fn insert(
&self,
consistency_mode: ConsistencyMode,
bucket_id: Uuid,
partition_key: String,
sort_key: String,
@ -135,12 +137,15 @@ impl K2VRpcHandler {
.try_call_many(
&self.endpoint,
&who,
K2VRpc::InsertItem(InsertedItem {
K2VRpc::InsertItem(
consistency_mode,
InsertedItem {
partition,
sort_key,
causal_context,
value,
}),
},
),
RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1),
)
.await?;
@ -150,6 +155,7 @@ impl K2VRpcHandler {
pub async fn insert_batch(
&self,
consistency_mode: ConsistencyMode,
bucket_id: Uuid,
items: Vec<(String, String, Option<CausalContext>, DvvsValue)>,
) -> Result<(), Error> {
@ -189,7 +195,7 @@ impl K2VRpcHandler {
.try_call_many(
&self.endpoint,
&nodes[..],
K2VRpc::InsertManyItems(items),
K2VRpc::InsertManyItems(consistency_mode, items),
RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1),
)
.await?;
@ -206,6 +212,7 @@ impl K2VRpcHandler {
pub async fn poll_item(
&self,
consistency_mode: ConsistencyMode,
bucket_id: Uuid,
partition_key: String,
sort_key: String,
@ -235,7 +242,12 @@ impl K2VRpcHandler {
timeout_msec,
},
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.item_table.data.replication.read_quorum())
.with_quorum(
self.item_table
.data
.replication
.read_quorum(consistency_mode),
)
.send_all_at_once(true)
.without_timeout(),
);
@ -266,6 +278,7 @@ impl K2VRpcHandler {
pub async fn poll_range(
&self,
consistency_mode: ConsistencyMode,
range: PollRange,
seen_str: Option<String>,
timeout_msec: u64,
@ -288,7 +301,11 @@ impl K2VRpcHandler {
.data
.replication
.read_nodes(&range.partition.hash());
let quorum = self.item_table.data.replication.read_quorum();
let quorum = self
.item_table
.data
.replication
.read_quorum(consistency_mode);
let msg = K2VRpc::PollRange {
range,
seen_str,
@ -376,7 +393,11 @@ impl K2VRpcHandler {
// ---- internal handlers ----
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
async fn handle_insert(
&self,
c: ConsistencyMode,
item: &InsertedItem,
) -> Result<K2VRpc, Error> {
let new = {
let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
self.local_insert(&local_timestamp_tree, item)?
@ -384,13 +405,17 @@ impl K2VRpcHandler {
// Propagate to rest of network
if let Some(updated) = new {
self.item_table.insert(&updated).await?;
self.item_table.insert(c, &updated).await?;
}
Ok(K2VRpc::Ok)
}
async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
async fn handle_insert_many(
&self,
c: ConsistencyMode,
items: &[InsertedItem],
) -> Result<K2VRpc, Error> {
let mut updated_vec = vec![];
{
@ -406,7 +431,7 @@ impl K2VRpcHandler {
// Propagate to rest of network
if !updated_vec.is_empty() {
self.item_table.insert_many(&updated_vec).await?;
self.item_table.insert_many(c, &updated_vec).await?;
}
Ok(K2VRpc::Ok)
@ -546,8 +571,8 @@ impl K2VRpcHandler {
impl EndpointHandler<K2VRpc> for K2VRpcHandler {
async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> {
match message {
K2VRpc::InsertItem(item) => self.handle_insert(item).await,
K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await,
K2VRpc::InsertItem(c, item) => self.handle_insert(*c, item).await,
K2VRpc::InsertManyItems(c, items) => self.handle_insert_many(*c, &items[..]).await,
K2VRpc::PollItem {
key,
causal_context,

View file

@ -1,5 +1,6 @@
use std::sync::Arc;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::encode::nonversioned_decode;
@ -71,19 +72,23 @@ impl Migrate {
self.garage
.bucket_table
.insert(&Bucket {
.insert(
(),
&Bucket {
id: bucket_id,
state: Deletable::Present(BucketParams {
creation_date: now_msec(),
authorized_keys: Map::new(),
aliases: LwwMap::new(),
local_aliases: LwwMap::new(),
consistency_mode: Lww::new(ConsistencyMode::Consistent),
website_config: Lww::new(website),
cors_config: Lww::new(None),
lifecycle_config: Lww::new(None),
quotas: Lww::new(Default::default()),
}),
})
},
)
.await?;
helper.set_global_bucket_alias(bucket_id, &new_name).await?;

View file

@ -253,7 +253,7 @@ async fn process_object(
_ => {
match garage
.bucket_table
.get(&EmptyKey, &object.bucket_id)
.get((), &EmptyKey, &object.bucket_id)
.await?
{
Some(b) => b,

View file

@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
use super::*;
use crate::replication_mode::ReplicationMode;
use crate::replication_mode::ReplicationFactor;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
pub struct RpcLayoutDigest {
@ -29,7 +29,7 @@ pub struct SyncLayoutDigest {
}
pub struct LayoutHelper {
replication_mode: ReplicationMode,
replication_factor: ReplicationFactor,
layout: Option<LayoutHistory>,
// cached values
@ -57,7 +57,7 @@ impl Deref for LayoutHelper {
impl LayoutHelper {
pub fn new(
replication_mode: ReplicationMode,
replication_factor: ReplicationFactor,
mut layout: LayoutHistory,
mut ack_lock: HashMap<u64, AtomicUsize>,
) -> Self {
@ -66,13 +66,6 @@ impl LayoutHelper {
// correct and we have rapid access to important values such as
// the layout versions to use when reading to ensure consistency.
if !replication_mode.is_read_after_write_consistent() {
// Fast path for when no consistency is required.
// In this case we only need to keep the last version of the layout,
// we don't care about coordinating stuff in the cluster.
layout.keep_current_version_only();
}
layout.cleanup_old_versions();
let all_nodes = layout.get_all_nodes();
@ -103,7 +96,7 @@ impl LayoutHelper {
// This value is calculated using quorums to allow progress even
// if not all nodes have successfully completed a sync.
let sync_map_min =
layout.calculate_sync_map_min_with_quorum(replication_mode, &all_nongateway_nodes);
layout.calculate_sync_map_min_with_quorum(replication_factor, &all_nongateway_nodes);
let trackers_hash = layout.calculate_trackers_hash();
let staging_hash = layout.calculate_staging_hash();
@ -114,7 +107,7 @@ impl LayoutHelper {
.or_insert(AtomicUsize::new(0));
LayoutHelper {
replication_mode,
replication_factor,
layout: Some(layout),
ack_map_min,
sync_map_min,
@ -139,7 +132,7 @@ impl LayoutHelper {
let changed = f(self.layout.as_mut().unwrap());
if changed {
*self = Self::new(
self.replication_mode,
self.replication_factor,
self.layout.take().unwrap(),
std::mem::take(&mut self.ack_lock),
);

View file

@ -6,11 +6,11 @@ use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
use super::*;
use crate::replication_mode::ReplicationMode;
use crate::replication_mode::*;
impl LayoutHistory {
pub fn new(replication_factor: usize) -> Self {
let version = LayoutVersion::new(replication_factor);
pub fn new(replication_factor: ReplicationFactor) -> Self {
let version = LayoutVersion::new(replication_factor.into());
let staging = LayoutStaging {
parameters: Lww::<LayoutParameters>::new(version.parameters),
@ -119,7 +119,7 @@ impl LayoutHistory {
pub(crate) fn calculate_sync_map_min_with_quorum(
&self,
replication_mode: ReplicationMode,
replication_factor: ReplicationFactor,
all_nongateway_nodes: &[Uuid],
) -> u64 {
// This function calculates the minimum layout version from which
@ -133,7 +133,7 @@ impl LayoutHistory {
return self.current().version;
}
let quorum = replication_mode.write_quorum();
let quorum = replication_factor.write_quorum(ConsistencyMode::Consistent);
let min_version = self.min_stored();
let global_min = self

View file

@ -14,13 +14,14 @@ use garage_util::error::*;
use garage_util::persister::Persister;
use super::*;
use crate::replication_mode::ReplicationMode;
use crate::replication_mode::*;
use crate::rpc_helper::*;
use crate::system::*;
pub struct LayoutManager {
node_id: Uuid,
replication_mode: ReplicationMode,
replication_factor: ReplicationFactor,
consistency_mode: ConsistencyMode,
persist_cluster_layout: Persister<LayoutHistory>,
layout: Arc<RwLock<LayoutHelper>>,
@ -38,20 +39,19 @@ impl LayoutManager {
node_id: NodeID,
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
peering: Arc<PeeringManager>,
replication_mode: ReplicationMode,
replication_factor: ReplicationFactor,
consistency_mode: ConsistencyMode,
) -> Result<Arc<Self>, Error> {
let replication_factor = replication_mode.replication_factor();
let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout");
let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => {
if x.current().replication_factor != replication_mode.replication_factor() {
if x.current().replication_factor != replication_factor.replication_factor() {
return Err(Error::Message(format!(
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
x.current().replication_factor,
replication_factor
replication_factor.replication_factor()
)));
}
x
@ -66,7 +66,7 @@ impl LayoutManager {
};
let mut cluster_layout =
LayoutHelper::new(replication_mode, cluster_layout, Default::default());
LayoutHelper::new(replication_factor, cluster_layout, Default::default());
cluster_layout.update_trackers(node_id.into());
let layout = Arc::new(RwLock::new(cluster_layout));
@ -81,7 +81,8 @@ impl LayoutManager {
Ok(Arc::new(Self {
node_id: node_id.into(),
replication_mode,
replication_factor,
consistency_mode,
persist_cluster_layout,
layout,
change_notify,
@ -138,6 +139,16 @@ impl LayoutManager {
}
}
pub fn read_quorum(self: &Arc<Self>, bucket_consistency_mode: ConsistencyMode) -> usize {
self.replication_factor
.read_quorum(bucket_consistency_mode.min(self.consistency_mode))
}
pub fn write_quorum(self: &Arc<Self>, bucket_consistency_mode: ConsistencyMode) -> usize {
self.replication_factor
.write_quorum(bucket_consistency_mode.min(self.consistency_mode))
}
// ---- ACK LOCKING ----
pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> {
@ -295,11 +306,11 @@ impl LayoutManager {
adv.update_trackers
);
if adv.current().replication_factor != self.replication_mode.replication_factor() {
if adv.current().replication_factor != self.replication_factor.replication_factor() {
let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
adv.current().replication_factor,
self.replication_mode.replication_factor()
self.replication_factor.replication_factor()
);
error!("{}", msg);
return Err(Error::Message(msg));

View file

@ -5,6 +5,7 @@ use garage_util::crdt::Crdt;
use garage_util::error::*;
use crate::layout::*;
use crate::replication_mode::ReplicationFactor;
// This function checks that the partition size S computed is at least better than the
// one given by a very naive algorithm. To do so, we try to run the naive algorithm
@ -120,7 +121,7 @@ fn test_assignment() {
let mut node_capacity_vec = vec![4000, 1000, 2000];
let mut node_zone_vec = vec!["A", "B", "C"];
let mut cl = LayoutHistory::new(3);
let mut cl = LayoutHistory::new(ReplicationFactor::new(3).unwrap());
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3);
let v = cl.current().version;
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();

View file

@ -1,57 +1,94 @@
#[derive(Clone, Copy)]
pub enum ReplicationMode {
None,
TwoWay,
TwoWayDangerous,
ThreeWay,
ThreeWayDegraded,
ThreeWayDangerous,
use garage_util::config::Config;
use garage_util::crdt::AutoCrdt;
use garage_util::error::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ReplicationFactor(usize);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ConsistencyMode {
/// Read- and Write-quorum are 1
Dangerous,
/// Read-quorum is 1
Degraded,
/// Read- and Write-quorum are determined for read-after-write-consistency
#[default]
Consistent,
}
impl ReplicationMode {
pub fn parse(v: &str) -> Option<Self> {
match v {
"none" | "1" => Some(Self::None),
"2" => Some(Self::TwoWay),
"2-dangerous" => Some(Self::TwoWayDangerous),
"3" => Some(Self::ThreeWay),
"3-degraded" => Some(Self::ThreeWayDegraded),
"3-dangerous" => Some(Self::ThreeWayDangerous),
_ => None,
impl ConsistencyMode {
pub fn parse(s: &str) -> Option<Self> {
serde_json::from_value(serde_json::Value::String(s.to_string())).ok()
}
}
impl AutoCrdt for ConsistencyMode {
const WARN_IF_DIFFERENT: bool = true;
}
impl ReplicationFactor {
pub fn new(replication_factor: usize) -> Option<Self> {
if replication_factor < 1 {
None
} else {
Some(Self(replication_factor))
}
}
pub fn replication_factor(&self) -> usize {
match self {
Self::None => 1,
Self::TwoWay | Self::TwoWayDangerous => 2,
Self::ThreeWay | Self::ThreeWayDegraded | Self::ThreeWayDangerous => 3,
self.0
}
pub fn read_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
match consistency_mode {
ConsistencyMode::Dangerous | ConsistencyMode::Degraded => 1,
ConsistencyMode::Consistent => self.replication_factor().div_ceil(2),
}
}
pub fn read_quorum(&self) -> usize {
match self {
Self::None => 1,
Self::TwoWay | Self::TwoWayDangerous => 1,
Self::ThreeWay => 2,
Self::ThreeWayDegraded | Self::ThreeWayDangerous => 1,
pub fn write_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
match consistency_mode {
ConsistencyMode::Dangerous => 1,
ConsistencyMode::Degraded | ConsistencyMode::Consistent => {
(self.replication_factor() + 1) - self.read_quorum(ConsistencyMode::Consistent)
}
}
}
}
pub fn write_quorum(&self) -> usize {
match self {
Self::None => 1,
Self::TwoWay => 2,
Self::TwoWayDangerous => 1,
Self::ThreeWay | Self::ThreeWayDegraded => 2,
Self::ThreeWayDangerous => 1,
impl std::convert::From<ReplicationFactor> for usize {
fn from(replication_factor: ReplicationFactor) -> usize {
replication_factor.0
}
}
pub fn is_read_after_write_consistent(&self) -> bool {
match self {
Self::None | Self::TwoWay | Self::ThreeWay => true,
_ => false,
}
pub fn parse_replication_mode(
config: &Config,
) -> Result<(ReplicationFactor, ConsistencyMode), Error> {
match (&config.replication_mode, config.replication_factor, config.consistency_mode.as_str()) {
(Some(replication_mode), None, "consistent") => {
tracing::warn!("Legacy config option replication_mode in use. Please migrate to replication_factor and consistency_mode");
let parsed_replication_mode = match replication_mode.as_str() {
"1" | "none" => Some((ReplicationFactor(1), ConsistencyMode::Consistent)),
"2" => Some((ReplicationFactor(2), ConsistencyMode::Consistent)),
"2-dangerous" => Some((ReplicationFactor(2), ConsistencyMode::Dangerous)),
"3" => Some((ReplicationFactor(3), ConsistencyMode::Consistent)),
"3-degraded" => Some((ReplicationFactor(3), ConsistencyMode::Degraded)),
"3-dangerous" => Some((ReplicationFactor(3), ConsistencyMode::Dangerous)),
_ => None,
};
Some(parsed_replication_mode.ok_or_message("Invalid replication_mode in config file.")?)
},
(None, Some(replication_factor), consistency_mode) => {
let replication_factor = ReplicationFactor::new(replication_factor)
.ok_or_message("Invalid replication_factor in config file.")?;
let consistency_mode = ConsistencyMode::parse(consistency_mode)
.ok_or_message("Invalid consistency_mode in config file.")?;
Some((replication_factor, consistency_mode))
}
_ => None,
}.ok_or_message("Either the legacy replication_mode or replication_level and consistency_mode can be set, not both.")
}

View file

@ -112,8 +112,7 @@ pub struct System {
metrics: ArcSwapOption<SystemMetrics>,
replication_mode: ReplicationMode,
pub(crate) replication_factor: usize,
pub(crate) replication_factor: ReplicationFactor,
/// Path to metadata directory
pub metadata_dir: PathBuf,
@ -243,7 +242,8 @@ impl System {
/// Create this node's membership manager
pub fn new(
network_key: NetworkKey,
replication_mode: ReplicationMode,
replication_factor: ReplicationFactor,
consistency_mode: ConsistencyMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
// ---- setup netapp RPC protocol ----
@ -274,14 +274,13 @@ impl System {
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
// ---- setup cluster layout and layout manager ----
let replication_factor = replication_mode.replication_factor();
let layout_manager = LayoutManager::new(
config,
netapp.id,
system_endpoint.clone(),
peering.clone(),
replication_mode,
replication_factor,
consistency_mode,
)?;
let mut local_status = NodeStatus::initial(replication_factor, &layout_manager);
@ -315,7 +314,6 @@ impl System {
netapp: netapp.clone(),
peering: peering.clone(),
system_endpoint,
replication_mode,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
rpc_public_addr,
@ -427,7 +425,9 @@ impl System {
}
pub fn health(&self) -> ClusterHealth {
let quorum = self.replication_mode.write_quorum();
let quorum = self
.replication_factor
.write_quorum(ConsistencyMode::Consistent);
// Gather information about running nodes.
// Technically, `nodes` contains currently running nodes, as well
@ -631,7 +631,7 @@ impl System {
.count();
let not_configured = self.cluster_layout().check().is_err();
let no_peers = n_connected < self.replication_factor;
let no_peers = n_connected < self.replication_factor.into();
let expected_n_nodes = self.cluster_layout().all_nodes().len();
let bad_peers = n_connected != expected_n_nodes;
@ -774,14 +774,14 @@ impl EndpointHandler<SystemRpc> for System {
}
impl NodeStatus {
fn initial(replication_factor: usize, layout_manager: &LayoutManager) -> Self {
fn initial(replication_factor: ReplicationFactor, layout_manager: &LayoutManager) -> Self {
NodeStatus {
hostname: Some(
gethostname::gethostname()
.into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
),
replication_factor,
replication_factor: replication_factor.into(),
layout_digest: layout_manager.layout().digest(),
meta_disk_avail: None,
data_disk_avail: None,

View file

@ -68,7 +68,7 @@ impl SystemMetrics {
let replication_factor = system.replication_factor;
meter
.u64_value_observer("garage_replication_factor", move |observer| {
observer.observe(replication_factor as u64, &[])
observer.observe(replication_factor.replication_factor() as u64, &[])
})
.with_description("Garage replication factor setting")
.init()

View file

@ -51,7 +51,7 @@ impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
return Ok(WorkerState::Idle);
}
self.0.insert_many(values).await?;
self.0.insert_many(Default::default(), values).await?;
self.0.data.insert_queue.db().transaction(|tx| {
for (k, v) in kv_pairs.iter() {

View file

@ -25,6 +25,7 @@ pub struct TableFullReplication {
impl TableReplication for TableFullReplication {
type WriteSets = Vec<Vec<Uuid>>;
type ConsistencyParam = ();
fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
let layout = self.system.cluster_layout();
@ -34,14 +35,14 @@ impl TableReplication for TableFullReplication {
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
}
fn read_quorum(&self) -> usize {
fn read_quorum(&self, _: ()) -> usize {
1
}
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
vec![self.storage_nodes(hash)]
}
fn write_quorum(&self) -> usize {
fn write_quorum(&self, _: ()) -> usize {
let nmembers = self.system.cluster_layout().current().all_nodes().len();
let max_faults = if nmembers > 1 { 1 } else { 0 };

View file

@ -4,6 +4,7 @@ use garage_util::data::*;
/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync + 'static {
type WriteSets: AsRef<Vec<Vec<Uuid>>> + AsMut<Vec<Vec<Uuid>>> + Send + Sync + 'static;
type ConsistencyParam: Send + Default;
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
@ -14,12 +15,12 @@ pub trait TableReplication: Send + Sync + 'static {
/// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
fn read_quorum(&self, consistency_param: Self::ConsistencyParam) -> usize;
/// Which nodes to send writes to
fn write_sets(&self, hash: &Hash) -> Self::WriteSets;
/// Responses needed to consider a write succesfull in each set
fn write_quorum(&self) -> usize;
fn write_quorum(&self, consistency_param: Self::ConsistencyParam) -> usize;
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash

View file

@ -1,7 +1,8 @@
use std::sync::Arc;
use garage_rpc::layout::manager::LayoutManager;
use garage_rpc::layout::*;
use garage_rpc::system::System;
use garage_rpc::replication_mode::*;
use garage_util::data::*;
use crate::replication::*;
@ -15,42 +16,37 @@ use crate::replication::*;
#[derive(Clone)]
pub struct TableShardedReplication {
/// The membership manager of this node
pub system: Arc<System>,
/// How many time each data should be replicated
pub replication_factor: usize,
/// How many nodes to contact for a read, should be at most `replication_factor`
pub read_quorum: usize,
/// How many nodes to contact for a write, should be at most `replication_factor`
pub write_quorum: usize,
pub layout_manager: Arc<LayoutManager>,
}
impl TableReplication for TableShardedReplication {
type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
type ConsistencyParam = ConsistencyMode;
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
self.system.cluster_layout().storage_nodes_of(hash)
self.layout_manager.layout().storage_nodes_of(hash)
}
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
self.system.cluster_layout().read_nodes_of(hash)
self.layout_manager.layout().read_nodes_of(hash)
}
fn read_quorum(&self) -> usize {
self.read_quorum
fn read_quorum(&self, c: ConsistencyMode) -> usize {
self.layout_manager.read_quorum(c)
}
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
self.system.layout_manager.write_sets_of(hash)
self.layout_manager.write_sets_of(hash)
}
fn write_quorum(&self) -> usize {
self.write_quorum
fn write_quorum(&self, c: ConsistencyMode) -> usize {
self.layout_manager.write_quorum(c)
}
fn partition_of(&self, hash: &Hash) -> Partition {
self.system.cluster_layout().current().partition_of(hash)
self.layout_manager.layout().current().partition_of(hash)
}
fn sync_partitions(&self) -> SyncPartitions {
let layout = self.system.cluster_layout();
let layout = self.layout_manager.layout();
let layout_version = layout.ack_map_min();
let mut partitions = layout

View file

@ -118,7 +118,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
);
let mut result_tracker = QuorumSetResultTracker::new(
&partition.storage_sets,
self.data.replication.write_quorum(),
self.data.replication.write_quorum(Default::default()),
);
let mut sync_futures = result_tracker
@ -190,7 +190,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
);
break;
}
if nodes.len() < self.data.replication.write_quorum() {
if nodes.len() < self.data.replication.write_quorum(Default::default()) {
return Err(Error::Message(
"Not offloading as we don't have a quorum of nodes to write to."
.to_string(),

View file

@ -104,11 +104,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
bg.spawn_worker(InsertQueueWorker(self.clone()));
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
pub async fn insert(&self, c: R::ConsistencyParam, e: &F::E) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert", F::TABLE_NAME));
self.insert_internal(e)
self.insert_internal(c, e)
.bound_record_duration(&self.data.metrics.put_request_duration)
.with_context(Context::current_with_span(span))
.await?;
@ -118,7 +118,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
Ok(())
}
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
async fn insert_internal(&self, c: R::ConsistencyParam, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
let who = self.data.replication.write_sets(&hash);
@ -132,7 +132,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
who.as_ref(),
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.write_quorum()),
.with_quorum(self.data.replication.write_quorum(c)),
)
.await?;
@ -144,7 +144,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
self.data.queue_insert(tx, e)
}
pub async fn insert_many<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
pub async fn insert_many<I, IE>(
self: &Arc<Self>,
c: R::ConsistencyParam,
entries: I,
) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
IE: Borrow<F::E> + Send + Sync,
@ -152,7 +156,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert_many", F::TABLE_NAME));
self.insert_many_internal(entries)
self.insert_many_internal(c, entries)
.bound_record_duration(&self.data.metrics.put_request_duration)
.with_context(Context::current_with_span(span))
.await?;
@ -162,7 +166,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
Ok(())
}
async fn insert_many_internal<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
async fn insert_many_internal<I, IE>(
self: &Arc<Self>,
c: R::ConsistencyParam,
entries: I,
) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
IE: Borrow<F::E> + Send + Sync,
@ -181,7 +189,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
// a quorum of nodes has answered OK, then the insert has succeeded and
// consistency properties (read-after-write) are preserved.
let quorum = self.data.replication.write_quorum();
let quorum = self.data.replication.write_quorum(c);
// Serialize all entries and compute the write sets for each of them.
// In the case of sharded table replication, this also takes an "ack lock"
@ -283,6 +291,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
pub async fn get(
self: &Arc<Self>,
c: R::ConsistencyParam,
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
@ -290,7 +299,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let span = tracer.start(format!("{} get", F::TABLE_NAME));
let res = self
.get_internal(partition_key, sort_key)
.get_internal(c, partition_key, sort_key)
.bound_record_duration(&self.data.metrics.get_request_duration)
.with_context(Context::current_with_span(span))
.await?;
@ -302,6 +311,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn get_internal(
self: &Arc<Self>,
c: R::ConsistencyParam,
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
@ -317,7 +327,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
&who,
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.read_quorum()),
.with_quorum(self.data.replication.read_quorum(c)),
)
.await?;
@ -359,6 +369,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
pub async fn get_range(
self: &Arc<Self>,
c: R::ConsistencyParam,
partition_key: &F::P,
begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
@ -370,6 +381,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let res = self
.get_range_internal(
c,
partition_key,
begin_sort_key,
filter,
@ -387,6 +399,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn get_range_internal(
self: &Arc<Self>,
c: R::ConsistencyParam,
partition_key: &F::P,
begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
@ -412,7 +425,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
&who,
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.read_quorum()),
.with_quorum(self.data.replication.read_quorum(c)),
)
.await?;

View file

@ -30,12 +30,20 @@ pub struct Config {
)]
pub block_size: usize,
/// Replication mode. Supported values:
/// - none, 1 -> no replication
/// - 2 -> 2-way replication
/// - 3 -> 3-way replication
// (we can add more aliases for this later)
pub replication_mode: String,
/// Number of replicas. Can be any positive integer, but uneven numbers are more favorable.
/// - 1 for single-node clusters, or to disable replication
/// - 3 is the recommended and supported setting.
#[serde(default)]
pub replication_factor: Option<usize>,
/// Consistency mode for all for requests through this node
/// - Degraded -> Disable read quorum
/// - Dangerous -> Disable read and write quorum
#[serde(default = "default_consistency_mode")]
pub consistency_mode: String,
/// Legacy option
pub replication_mode: Option<String>,
/// Zstd compression level used on data blocks
#[serde(
@ -244,10 +252,15 @@ fn default_sled_cache_capacity() -> usize {
fn default_sled_flush_every_ms() -> u64 {
2000
}
fn default_block_size() -> usize {
1048576
}
fn default_consistency_mode() -> String {
"consistent".into()
}
fn default_compression() -> Option<i32> {
Some(1)
}
@ -359,7 +372,7 @@ mod tests {
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
replication_mode = "3"
replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret = "foo"

View file

@ -84,7 +84,7 @@ where
&self.v
}
/// Take the value inside the CRDT (discards the timesamp)
/// Take the value inside the CRDT (discards the timestamp)
pub fn take(self) -> T {
self.v
}

View file

@ -28,6 +28,7 @@ use garage_api::s3::error::{
};
use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx};
use garage_model::bucket_table::BucketParams;
use garage_model::garage::Garage;
use garage_table::*;
@ -182,11 +183,20 @@ impl WebServer {
}
}
async fn check_key_exists(self: &Arc<Self>, bucket_id: Uuid, key: &str) -> Result<bool, Error> {
async fn check_key_exists(
self: &Arc<Self>,
bucket_id: Uuid,
bucket_params: &BucketParams,
key: &str,
) -> Result<bool, Error> {
let exists = self
.garage
.object_table
.get(&bucket_id, &key.to_string())
.get(
*bucket_params.consistency_mode.get(),
&bucket_id,
&key.to_string(),
)
.await?
.map(|object| object.versions().iter().any(|v| v.is_data()))
.unwrap_or(false);
@ -211,7 +221,7 @@ impl WebServer {
let bucket_id = self
.garage
.bucket_alias_table
.get(&EmptyKey, &bucket_name.to_string())
.get((), &EmptyKey, &bucket_name.to_string())
.await?
.and_then(|x| x.state.take())
.ok_or(Error::NotFound)?;
@ -246,13 +256,22 @@ impl WebServer {
.map_err(ApiError::from)
.map(|res| res.map(|_empty_body: EmptyBody| empty_body())),
Method::HEAD => {
handle_head_without_ctx(self.garage.clone(), req, bucket_id, &key, None).await
handle_head_without_ctx(
self.garage.clone(),
req,
bucket_id,
&bucket_params,
&key,
None,
)
.await
}
Method::GET => {
handle_get_without_ctx(
self.garage.clone(),
req,
bucket_id,
&bucket_params,
&key,
None,
Default::default(),
@ -265,7 +284,9 @@ impl WebServer {
// Try implicit redirect on error
let ret_doc_with_redir = match (&ret_doc, may_redirect) {
(Err(ApiError::NoSuchKey), ImplicitRedirect::To { key, url })
if self.check_key_exists(bucket_id, key.as_str()).await? =>
if self
.check_key_exists(bucket_id, &bucket_params, key.as_str())
.await? =>
{
Ok(Response::builder()
.status(StatusCode::FOUND)
@ -306,6 +327,7 @@ impl WebServer {
self.garage.clone(),
&req2,
bucket_id,
&bucket_params,
&error_document,
None,
Default::default(),