From 9f25721ad2be3475e94fee9912cfc5ecec1b6838 Mon Sep 17 00:00:00 2001 From: Yureka Date: Mon, 4 Mar 2024 18:42:17 +0100 Subject: [PATCH 1/4] adjust docs for replication factor --- doc/book/cookbook/real-world.md | 2 +- doc/book/operations/layout.md | 2 +- doc/book/quick-start/_index.md | 2 +- doc/book/reference-manual/configuration.md | 133 ++++++++++++--------- doc/book/reference-manual/features.md | 6 +- 5 files changed, 83 insertions(+), 62 deletions(-) diff --git a/doc/book/cookbook/real-world.md b/doc/book/cookbook/real-world.md index c15ea384..cb10b550 100644 --- a/doc/book/cookbook/real-world.md +++ b/doc/book/cookbook/real-world.md @@ -116,7 +116,7 @@ metadata_dir = "/var/lib/garage/meta" data_dir = "/var/lib/garage/data" db_engine = "lmdb" -replication_mode = "3" +replication_factor = 3 compression_level = 2 diff --git a/doc/book/operations/layout.md b/doc/book/operations/layout.md index cf1372b0..667e89d2 100644 --- a/doc/book/operations/layout.md +++ b/doc/book/operations/layout.md @@ -12,7 +12,7 @@ An introduction to building cluster layouts can be found in the [production depl In Garage, all of the data that can be stored in a given cluster is divided into slices which we call *partitions*. Each partition is stored by one or several nodes in the cluster -(see [`replication_mode`](@/documentation/reference-manual/configuration.md#replication_mode)). +(see [`replication_factor`](@/documentation/reference-manual/configuration.md#replication_factor)). The layout determines the correspondence between these partitions, which exist on a logical level, and actual storage nodes. diff --git a/doc/book/quick-start/_index.md b/doc/book/quick-start/_index.md index f359843d..be9fe329 100644 --- a/doc/book/quick-start/_index.md +++ b/doc/book/quick-start/_index.md @@ -59,7 +59,7 @@ metadata_dir = "/tmp/meta" data_dir = "/tmp/data" db_engine = "lmdb" -replication_mode = "none" +replication_factor = 1 rpc_bind_addr = "[::]:3901" rpc_public_addr = "127.0.0.1:3901" diff --git a/doc/book/reference-manual/configuration.md b/doc/book/reference-manual/configuration.md index af7690f4..580e9fbc 100644 --- a/doc/book/reference-manual/configuration.md +++ b/doc/book/reference-manual/configuration.md @@ -8,7 +8,8 @@ weight = 20 Here is an example `garage.toml` configuration file that illustrates all of the possible options: ```toml -replication_mode = "3" +replication_factor = 3 +consistency_mode = "consistent" metadata_dir = "/var/lib/garage/meta" data_dir = "/var/lib/garage/data" @@ -90,7 +91,8 @@ Top-level configuration options: [`lmdb_map_size`](#lmdb_map_size), [`metadata_dir`](#metadata_dir), [`metadata_fsync`](#metadata_fsync), -[`replication_mode`](#replication_mode), +[`replication_factor`](#replication_factor), +[`consistency_mode`](#consistency_mode), [`rpc_bind_addr`](#rpc_bind_addr), [`rpc_bind_outgoing`](#rpc_bind_outgoing), [`rpc_public_addr`](#rpc_public_addr), @@ -133,11 +135,12 @@ The `[admin]` section: ### Top-level configuration options -#### `replication_mode` {#replication_mode} +#### `replication_factor` {#replication_factor} -Garage supports the following replication modes: +The replication factor can be any positive integer smaller or equal the node count in your cluster. +The chosen replication factor has a big impact on the cluster's failure tolerancy and performance characteristics. -- `none` or `1`: data stored on Garage is stored on a single node. There is no +- `1`: data stored on Garage is stored on a single node. There is no redundancy, and data will be unavailable as soon as one node fails or its network is disconnected. Do not use this for anything else than test deployments. @@ -148,17 +151,6 @@ Garage supports the following replication modes: before losing data. Data remains available in read-only mode when one node is down, but write operations will fail. - - `2-dangerous`: a variant of mode `2`, where written objects are written to - the second replica asynchronously. This means that Garage will return `200 - OK` to a PutObject request before the second copy is fully written (or even - before it even starts being written). This means that data can more easily - be lost if the node crashes before a second copy can be completed. This - also means that written objects might not be visible immediately in read - operations. In other words, this mode severely breaks the consistency and - durability guarantees of standard Garage cluster operation. Benefits of - this mode: you can still write to your cluster when one node is - unavailable. - - `3`: data stored on Garage will be stored on three different nodes, if possible each in a different zones. Garage tolerates two node failure, or several node failures but in no more than two zones (in a deployment with at @@ -166,55 +158,84 @@ Garage supports the following replication modes: or node failures are only in a single zone, reading and writing data to Garage can continue normally. - - `3-degraded`: a variant of replication mode `3`, that lowers the read - quorum to `1`, to allow you to read data from your cluster when several - nodes (or nodes in several zones) are unavailable. In this mode, Garage - does not provide read-after-write consistency anymore. The write quorum is - still 2, ensuring that data successfully written to Garage is stored on at - least two nodes. - - - `3-dangerous`: a variant of replication mode `3` that lowers both the read - and write quorums to `1`, to allow you to both read and write to your - cluster when several nodes (or nodes in several zones) are unavailable. It - is the least consistent mode of operation proposed by Garage, and also one - that should probably never be used. +- `5`, `7`, ...: When setting the replication factor above 3, it is most useful to + choose an uneven value, since for every two copies added, one more node can fail + before losing the ability to write and read to the cluster. Note that in modes `2` and `3`, if at least the same number of zones are available, an arbitrary number of failures in any given zone is tolerated as copies of data will be spread over several zones. -**Make sure `replication_mode` is the same in the configuration files of all nodes. +**Make sure `replication_factor` is the same in the configuration files of all nodes. Never run a Garage cluster where that is not the case.** +It is technically possible to change the replication factor although it's a +dangerous operation that is not officially supported. This requires you to +delete the existing cluster layout and create a new layout from scratch, +meaning that a full rebalancing of your cluster's data will be needed. To do +it, shut down your cluster entirely, delete the `custer_layout` files in the +meta directories of all your nodes, update all your configuration files with +the new `replication_factor` parameter, restart your cluster, and then create a +new layout with all the nodes you want to keep. Rebalancing data will take +some time, and data might temporarily appear unavailable to your users. +It is recommended to shut down public access to the cluster while rebalancing +is in progress. In theory, no data should be lost as rebalancing is a +routine operation for Garage, although we cannot guarantee you that everything + will go right in such an extreme scenario. + +#### `consistency_mode` {#consistency_mode} + +The consistency mode setting determines the read and write behaviour of your cluster. + + - `consistent`: The default setting. This is what the paragraph above describes. + The read and write quorum will be determined so that read-after-write consistency + is guaranteed. + - `degraded`: Lowers the read + quorum to `1`, to allow you to read data from your cluster when several + nodes (or nodes in several zones) are unavailable. In this mode, Garage + does not provide read-after-write consistency anymore. + The write quorum stays the same as in the `consistent` mode, ensuring that + data successfully written to Garage is stored on multiple nodes (depending + the replication factor). + - `dangerous`: This mode lowers both the read + and write quorums to `1`, to allow you to both read and write to your + cluster when several nodes (or nodes in several zones) are unavailable. It + is the least consistent mode of operation proposed by Garage, and also one + that should probably never be used. + +Changing the `consistency_mode` between modes while leaving the `replication_factor` untouched +(e.g. setting your node's `consistency_mode` to `degraded` when it was previously unset, or from +`dangerous` to `consistent`), can be done easily by just changing the `consistency_mode` +parameter in your config files and restarting all your Garage nodes. + +The consistency mode can be used together with various replication factors, to achieve +a wide range of read and write characteristics. Some examples: + + - Replication factor `2`, consistency mode `degraded`: While this mode + technically exists, its properties are the same as with consistency mode `consistent`, + since the read quorum with replication factor `2`, consistency mode `consistent` is already 1. + + - Replication factor `2`, consistency mode `dangerous`: written objects are written to + the second replica asynchronously. This means that Garage will return `200 + OK` to a PutObject request before the second copy is fully written (or even + before it even starts being written). This means that data can more easily + be lost if the node crashes before a second copy can be completed. This + also means that written objects might not be visible immediately in read + operations. In other words, this configuration severely breaks the consistency and + durability guarantees of standard Garage cluster operation. Benefits of + this configuration: you can still write to your cluster when one node is + unavailable. + The quorums associated with each replication mode are described below: -| `replication_mode` | Number of replicas | Write quorum | Read quorum | Read-after-write consistency? | -| ------------------ | ------------------ | ------------ | ----------- | ----------------------------- | -| `none` or `1` | 1 | 1 | 1 | yes | -| `2` | 2 | 2 | 1 | yes | -| `2-dangerous` | 2 | 1 | 1 | NO | -| `3` | 3 | 2 | 2 | yes | -| `3-degraded` | 3 | 2 | 1 | NO | -| `3-dangerous` | 3 | 1 | 1 | NO | - -Changing the `replication_mode` between modes with the same number of replicas -(e.g. from `3` to `3-degraded`, or from `2-dangerous` to `2`), can be done easily by -just changing the `replication_mode` parameter in your config files and restarting all your -Garage nodes. - -It is also technically possible to change the replication mode to a mode with a -different numbers of replicas, although it's a dangerous operation that is not -officially supported. This requires you to delete the existing cluster layout -and create a new layout from scratch, meaning that a full rebalancing of your -cluster's data will be needed. To do it, shut down your cluster entirely, -delete the `custer_layout` files in the meta directories of all your nodes, -update all your configuration files with the new `replication_mode` parameter, -restart your cluster, and then create a new layout with all the nodes you want -to keep. Rebalancing data will take some time, and data might temporarily -appear unavailable to your users. It is recommended to shut down public access -to the cluster while rebalancing is in progress. In theory, no data should be -lost as rebalancing is a routine operation for Garage, although we cannot -guarantee you that everything will go right in such an extreme scenario. +| `consistency_mode` | `replication_factor` | Write quorum | Read quorum | Read-after-write consistency? | +| ------------------ | -------------------- | ------------ | ----------- | ----------------------------- | +| `consistent` | 1 | 1 | 1 | yes | +| `consistent` | 2 | 2 | 1 | yes | +| `dangerous` | 2 | 1 | 1 | NO | +| `consistent` | 3 | 2 | 2 | yes | +| `degraded` | 3 | 2 | 1 | NO | +| `dangerous` | 3 | 1 | 1 | NO | #### `metadata_dir` {#metadata_dir} diff --git a/doc/book/reference-manual/features.md b/doc/book/reference-manual/features.md index f7014b26..34f692cc 100644 --- a/doc/book/reference-manual/features.md +++ b/doc/book/reference-manual/features.md @@ -39,10 +39,10 @@ Read about cluster layout management [here](@/documentation/operations/layout.md ### Several replication modes -Garage supports a variety of replication modes, with 1 copy, 2 copies or 3 copies of your data, +Garage supports a variety of replication modes, with configurable replica count, and with various levels of consistency, in order to adapt to a variety of usage scenarios. -Read our reference page on [supported replication modes](@/documentation/reference-manual/configuration.md#replication_mode) -to select the replication mode best suited to your use case (hint: in most cases, `replication_mode = "3"` is what you want). +Read our reference page on [supported replication modes](@/documentation/reference-manual/configuration.md#replication_factor) +to select the replication mode best suited to your use case (hint: in most cases, `replication_factor = 3` is what you want). ### Compression and deduplication -- 2.43.4 From ce042e48d48479cc0481b638bafbf83060e45d9b Mon Sep 17 00:00:00 2001 From: Yureka Date: Mon, 4 Mar 2024 18:42:25 +0100 Subject: [PATCH 2/4] ReplicationMode -> ConsistencyMode+ReplicationFactor --- script/dev-cluster.sh | 2 +- script/helm/garage/values.yaml | 8 +- .../src/jepsen/garage/daemon.clj | 2 +- script/k8s/config.yaml | 2 +- src/garage/secrets.rs | 6 +- src/garage/tests/common/garage.rs | 2 +- src/model/garage.rs | 23 ++-- src/rpc/layout/helper.rs | 18 ++- src/rpc/layout/history.rs | 10 +- src/rpc/layout/manager.rs | 27 ++-- src/rpc/layout/test.rs | 3 +- src/rpc/replication_mode.rs | 127 +++++++++++------- src/rpc/system.rs | 22 +-- src/rpc/system_metrics.rs | 2 +- src/util/config.rs | 27 +++- 15 files changed, 169 insertions(+), 112 deletions(-) diff --git a/script/dev-cluster.sh b/script/dev-cluster.sh index 6b39255a..f3ccce44 100755 --- a/script/dev-cluster.sh +++ b/script/dev-cluster.sh @@ -38,7 +38,7 @@ data_dir = "/tmp/garage-data-$count" rpc_bind_addr = "0.0.0.0:$((3900+$count))" # the port other Garage nodes will use to talk to this node rpc_public_addr = "127.0.0.1:$((3900+$count))" bootstrap_peers = [] -replication_mode = "3" +replication_factor = 3 rpc_secret = "$NETWORK_SECRET" [s3_api] diff --git a/script/helm/garage/values.yaml b/script/helm/garage/values.yaml index 02a6651b..1fccffac 100644 --- a/script/helm/garage/values.yaml +++ b/script/helm/garage/values.yaml @@ -18,9 +18,9 @@ garage: sledCacheCapacity: "134217728" sledFlushEveryMs: "2000" - # Default to 3 replicas, see the replication_mode section at - # https://garagehq.deuxfleurs.fr/documentation/reference-manual/configuration/#replication-mode - replicationMode: "3" + # Default to 3 replicas, see the replication_factor section at + # https://garagehq.deuxfleurs.fr/documentation/reference-manual/configuration/#replication-factor + replicationFactor: 3 # zstd compression level of stored blocks # https://garagehq.deuxfleurs.fr/documentation/reference-manual/configuration/#compression-level @@ -55,7 +55,7 @@ garage: sled_flush_every_ms = {{ .Values.garage.sledFlushEveryMs }} {{- end }} - replication_mode = "{{ .Values.garage.replicationMode }}" + replication_factor = {{ .Values.garage.replicationFactor }} compression_level = {{ .Values.garage.compressionLevel }} diff --git a/script/jepsen.garage/src/jepsen/garage/daemon.clj b/script/jepsen.garage/src/jepsen/garage/daemon.clj index d407dd29..b1d52b6f 100644 --- a/script/jepsen.garage/src/jepsen/garage/daemon.clj +++ b/script/jepsen.garage/src/jepsen/garage/daemon.clj @@ -43,7 +43,7 @@ "rpc_bind_addr = \"0.0.0.0:3901\"\n" "rpc_public_addr = \"" node ":3901\"\n" "db_engine = \"lmdb\"\n" - "replication_mode = \"2\"\n" + "replication_factor = 2\n" "data_dir = \"" data-dir "\"\n" "metadata_dir = \"" meta-dir "\"\n" "[s3_api]\n" diff --git a/script/k8s/config.yaml b/script/k8s/config.yaml index 8cf40fc2..bfefd999 100644 --- a/script/k8s/config.yaml +++ b/script/k8s/config.yaml @@ -8,7 +8,7 @@ data: metadata_dir = "/tmp/meta" data_dir = "/tmp/data" - replication_mode = "3" + replication_factor = 3 rpc_bind_addr = "[::]:3901" rpc_secret = "1799bccfd7411eddcf9ebd316bc1f5287ad12a68094e1c6ac6abde7e6feae1ec" diff --git a/src/garage/secrets.rs b/src/garage/secrets.rs index c3d704aa..8d2ff475 100644 --- a/src/garage/secrets.rs +++ b/src/garage/secrets.rs @@ -163,7 +163,7 @@ mod tests { r#" metadata_dir = "/tmp/garage/meta" data_dir = "/tmp/garage/data" - replication_mode = "3" + replication_factor = 3 rpc_bind_addr = "[::]:3901" rpc_secret_file = "{}" @@ -185,7 +185,7 @@ mod tests { r#" metadata_dir = "/tmp/garage/meta" data_dir = "/tmp/garage/data" - replication_mode = "3" + replication_factor = 3 rpc_bind_addr = "[::]:3901" rpc_secret_file = "{}" allow_world_readable_secrets = true @@ -296,7 +296,7 @@ mod tests { r#" metadata_dir = "/tmp/garage/meta" data_dir = "/tmp/garage/data" - replication_mode = "3" + replication_factor = 3 rpc_bind_addr = "[::]:3901" rpc_secret= "dummy" rpc_secret_file = "dummy" diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index ebc82f37..f1c1efc8 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -54,7 +54,7 @@ metadata_dir = "{path}/meta" data_dir = "{path}/data" db_engine = "lmdb" -replication_mode = "1" +replication_factor = 1 rpc_bind_addr = "127.0.0.1:{rpc_port}" rpc_public_addr = "127.0.0.1:{rpc_port}" diff --git a/src/model/garage.rs b/src/model/garage.rs index 561aca8f..19f58077 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -9,7 +9,7 @@ use garage_util::config::*; use garage_util::error::*; use garage_util::persister::PersisterShared; -use garage_rpc::replication_mode::ReplicationMode; +use garage_rpc::replication_mode::*; use garage_rpc::system::System; use garage_block::manager::*; @@ -39,8 +39,8 @@ pub struct Garage { /// The set of background variables that can be viewed/modified at runtime pub bg_vars: vars::BgVars, - /// The replication mode of this cluster - pub replication_mode: ReplicationMode, + /// The replication factor of this cluster + pub replication_factor: ReplicationFactor, /// The local database pub db: db::Db, @@ -222,27 +222,26 @@ impl Garage { .and_then(|x| NetworkKey::from_slice(&x)) .ok_or_message("Invalid RPC secret key")?; - let replication_mode = ReplicationMode::parse(&config.replication_mode) - .ok_or_message("Invalid replication_mode in config file.")?; + let (replication_factor, consistency_mode) = parse_replication_mode(&config)?; info!("Initialize background variable system..."); let mut bg_vars = vars::BgVars::new(); info!("Initialize membership management system..."); - let system = System::new(network_key, replication_mode, &config)?; + let system = System::new(network_key, replication_factor, consistency_mode, &config)?; let data_rep_param = TableShardedReplication { system: system.clone(), - replication_factor: replication_mode.replication_factor(), - write_quorum: replication_mode.write_quorum(), + replication_factor: replication_factor.into(), + write_quorum: replication_factor.write_quorum(consistency_mode), read_quorum: 1, }; let meta_rep_param = TableShardedReplication { system: system.clone(), - replication_factor: replication_mode.replication_factor(), - write_quorum: replication_mode.write_quorum(), - read_quorum: replication_mode.read_quorum(), + replication_factor: replication_factor.into(), + write_quorum: replication_factor.write_quorum(consistency_mode), + read_quorum: replication_factor.read_quorum(consistency_mode), }; let control_rep_param = TableFullReplication { @@ -338,7 +337,7 @@ impl Garage { Ok(Arc::new(Self { config, bg_vars, - replication_mode, + replication_factor, db, system, block_manager, diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 9fb738ea..2835347a 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; use super::*; -use crate::replication_mode::ReplicationMode; +use crate::replication_mode::*; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] pub struct RpcLayoutDigest { @@ -29,7 +29,8 @@ pub struct SyncLayoutDigest { } pub struct LayoutHelper { - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, layout: Option, // cached values @@ -57,7 +58,8 @@ impl Deref for LayoutHelper { impl LayoutHelper { pub fn new( - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, mut layout: LayoutHistory, mut ack_lock: HashMap, ) -> Self { @@ -66,7 +68,7 @@ impl LayoutHelper { // correct and we have rapid access to important values such as // the layout versions to use when reading to ensure consistency. - if !replication_mode.is_read_after_write_consistent() { + if consistency_mode != ConsistencyMode::Consistent { // Fast path for when no consistency is required. // In this case we only need to keep the last version of the layout, // we don't care about coordinating stuff in the cluster. @@ -103,7 +105,7 @@ impl LayoutHelper { // This value is calculated using quorums to allow progress even // if not all nodes have successfully completed a sync. let sync_map_min = - layout.calculate_sync_map_min_with_quorum(replication_mode, &all_nongateway_nodes); + layout.calculate_sync_map_min_with_quorum(replication_factor, &all_nongateway_nodes); let trackers_hash = layout.calculate_trackers_hash(); let staging_hash = layout.calculate_staging_hash(); @@ -114,7 +116,8 @@ impl LayoutHelper { .or_insert(AtomicUsize::new(0)); LayoutHelper { - replication_mode, + replication_factor, + consistency_mode, layout: Some(layout), ack_map_min, sync_map_min, @@ -139,7 +142,8 @@ impl LayoutHelper { let changed = f(self.layout.as_mut().unwrap()); if changed { *self = Self::new( - self.replication_mode, + self.replication_factor, + self.consistency_mode, self.layout.take().unwrap(), std::mem::take(&mut self.ack_lock), ); diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index b8cc27da..290f058d 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -6,11 +6,11 @@ use garage_util::encode::nonversioned_encode; use garage_util::error::*; use super::*; -use crate::replication_mode::ReplicationMode; +use crate::replication_mode::*; impl LayoutHistory { - pub fn new(replication_factor: usize) -> Self { - let version = LayoutVersion::new(replication_factor); + pub fn new(replication_factor: ReplicationFactor) -> Self { + let version = LayoutVersion::new(replication_factor.into()); let staging = LayoutStaging { parameters: Lww::::new(version.parameters), @@ -119,7 +119,7 @@ impl LayoutHistory { pub(crate) fn calculate_sync_map_min_with_quorum( &self, - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, all_nongateway_nodes: &[Uuid], ) -> u64 { // This function calculates the minimum layout version from which @@ -133,7 +133,7 @@ impl LayoutHistory { return self.current().version; } - let quorum = replication_mode.write_quorum(); + let quorum = replication_factor.write_quorum(ConsistencyMode::Consistent); let min_version = self.min_stored(); let global_min = self diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 0b6c7e63..8a6eb1c3 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -14,13 +14,13 @@ use garage_util::error::*; use garage_util::persister::Persister; use super::*; -use crate::replication_mode::ReplicationMode; +use crate::replication_mode::*; use crate::rpc_helper::*; use crate::system::*; pub struct LayoutManager { node_id: Uuid, - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, persist_cluster_layout: Persister, layout: Arc>, @@ -38,20 +38,19 @@ impl LayoutManager { node_id: NodeID, system_endpoint: Arc>, peering: Arc, - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, ) -> Result, Error> { - let replication_factor = replication_mode.replication_factor(); - let persist_cluster_layout: Persister = Persister::new(&config.metadata_dir, "cluster_layout"); let cluster_layout = match persist_cluster_layout.load() { Ok(x) => { - if x.current().replication_factor != replication_mode.replication_factor() { + if x.current().replication_factor != replication_factor.replication_factor() { return Err(Error::Message(format!( "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", x.current().replication_factor, - replication_factor + replication_factor.replication_factor() ))); } x @@ -65,8 +64,12 @@ impl LayoutManager { } }; - let mut cluster_layout = - LayoutHelper::new(replication_mode, cluster_layout, Default::default()); + let mut cluster_layout = LayoutHelper::new( + replication_factor, + consistency_mode, + cluster_layout, + Default::default(), + ); cluster_layout.update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); @@ -81,7 +84,7 @@ impl LayoutManager { Ok(Arc::new(Self { node_id: node_id.into(), - replication_mode, + replication_factor, persist_cluster_layout, layout, change_notify, @@ -295,11 +298,11 @@ impl LayoutManager { adv.update_trackers ); - if adv.current().replication_factor != self.replication_mode.replication_factor() { + if adv.current().replication_factor != self.replication_factor.replication_factor() { let msg = format!( "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", adv.current().replication_factor, - self.replication_mode.replication_factor() + self.replication_factor.replication_factor() ); error!("{}", msg); return Err(Error::Message(msg)); diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs index 88eb518e..fcbb9dfc 100644 --- a/src/rpc/layout/test.rs +++ b/src/rpc/layout/test.rs @@ -5,6 +5,7 @@ use garage_util::crdt::Crdt; use garage_util::error::*; use crate::layout::*; +use crate::replication_mode::ReplicationFactor; // This function checks that the partition size S computed is at least better than the // one given by a very naive algorithm. To do so, we try to run the naive algorithm @@ -120,7 +121,7 @@ fn test_assignment() { let mut node_capacity_vec = vec![4000, 1000, 2000]; let mut node_zone_vec = vec!["A", "B", "C"]; - let mut cl = LayoutHistory::new(3); + let mut cl = LayoutHistory::new(ReplicationFactor::new(3).unwrap()); update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); let v = cl.current().version; let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); diff --git a/src/rpc/replication_mode.rs b/src/rpc/replication_mode.rs index b142ea10..a3a94085 100644 --- a/src/rpc/replication_mode.rs +++ b/src/rpc/replication_mode.rs @@ -1,57 +1,94 @@ -#[derive(Clone, Copy)] -pub enum ReplicationMode { - None, - TwoWay, - TwoWayDangerous, - ThreeWay, - ThreeWayDegraded, - ThreeWayDangerous, +use garage_util::config::Config; +use garage_util::crdt::AutoCrdt; +use garage_util::error::*; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct ReplicationFactor(usize); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum ConsistencyMode { + /// Read- and Write-quorum are 1 + Dangerous, + /// Read-quorum is 1 + Degraded, + /// Read- and Write-quorum are determined for read-after-write-consistency + #[default] + Consistent, } -impl ReplicationMode { - pub fn parse(v: &str) -> Option { - match v { - "none" | "1" => Some(Self::None), - "2" => Some(Self::TwoWay), - "2-dangerous" => Some(Self::TwoWayDangerous), - "3" => Some(Self::ThreeWay), - "3-degraded" => Some(Self::ThreeWayDegraded), - "3-dangerous" => Some(Self::ThreeWayDangerous), - _ => None, +impl ConsistencyMode { + pub fn parse(s: &str) -> Option { + serde_json::from_value(serde_json::Value::String(s.to_string())).ok() + } +} + +impl AutoCrdt for ConsistencyMode { + const WARN_IF_DIFFERENT: bool = true; +} + +impl ReplicationFactor { + pub fn new(replication_factor: usize) -> Option { + if replication_factor < 1 { + None + } else { + Some(Self(replication_factor)) } } pub fn replication_factor(&self) -> usize { - match self { - Self::None => 1, - Self::TwoWay | Self::TwoWayDangerous => 2, - Self::ThreeWay | Self::ThreeWayDegraded | Self::ThreeWayDangerous => 3, + self.0 + } + + pub fn read_quorum(&self, consistency_mode: ConsistencyMode) -> usize { + match consistency_mode { + ConsistencyMode::Dangerous | ConsistencyMode::Degraded => 1, + ConsistencyMode::Consistent => self.replication_factor().div_ceil(2), } } - pub fn read_quorum(&self) -> usize { - match self { - Self::None => 1, - Self::TwoWay | Self::TwoWayDangerous => 1, - Self::ThreeWay => 2, - Self::ThreeWayDegraded | Self::ThreeWayDangerous => 1, - } - } - - pub fn write_quorum(&self) -> usize { - match self { - Self::None => 1, - Self::TwoWay => 2, - Self::TwoWayDangerous => 1, - Self::ThreeWay | Self::ThreeWayDegraded => 2, - Self::ThreeWayDangerous => 1, - } - } - - pub fn is_read_after_write_consistent(&self) -> bool { - match self { - Self::None | Self::TwoWay | Self::ThreeWay => true, - _ => false, + pub fn write_quorum(&self, consistency_mode: ConsistencyMode) -> usize { + match consistency_mode { + ConsistencyMode::Dangerous => 1, + ConsistencyMode::Degraded | ConsistencyMode::Consistent => { + (self.replication_factor() + 1) - self.read_quorum(ConsistencyMode::Consistent) + } } } } + +impl std::convert::From for usize { + fn from(replication_factor: ReplicationFactor) -> usize { + replication_factor.0 + } +} + +pub fn parse_replication_mode( + config: &Config, +) -> Result<(ReplicationFactor, ConsistencyMode), Error> { + match (&config.replication_mode, config.replication_factor, config.consistency_mode.as_str()) { + (Some(replication_mode), None, "consistent") => { + tracing::warn!("Legacy config option replication_mode in use. Please migrate to replication_factor and consistency_mode"); + let parsed_replication_mode = match replication_mode.as_str() { + "1" | "none" => Some((ReplicationFactor(1), ConsistencyMode::Consistent)), + "2" => Some((ReplicationFactor(2), ConsistencyMode::Consistent)), + "2-dangerous" => Some((ReplicationFactor(2), ConsistencyMode::Dangerous)), + "3" => Some((ReplicationFactor(3), ConsistencyMode::Consistent)), + "3-degraded" => Some((ReplicationFactor(3), ConsistencyMode::Degraded)), + "3-dangerous" => Some((ReplicationFactor(3), ConsistencyMode::Dangerous)), + _ => None, + }; + Some(parsed_replication_mode.ok_or_message("Invalid replication_mode in config file.")?) + }, + (None, Some(replication_factor), consistency_mode) => { + let replication_factor = ReplicationFactor::new(replication_factor) + .ok_or_message("Invalid replication_factor in config file.")?; + let consistency_mode = ConsistencyMode::parse(consistency_mode) + .ok_or_message("Invalid consistency_mode in config file.")?; + Some((replication_factor, consistency_mode)) + } + _ => None, + }.ok_or_message("Either the legacy replication_mode or replication_level and consistency_mode can be set, not both.") +} diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 1c668306..54d589d2 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -112,8 +112,7 @@ pub struct System { metrics: ArcSwapOption, - replication_mode: ReplicationMode, - pub(crate) replication_factor: usize, + pub(crate) replication_factor: ReplicationFactor, /// Path to metadata directory pub metadata_dir: PathBuf, @@ -243,7 +242,8 @@ impl System { /// Create this node's membership manager pub fn new( network_key: NetworkKey, - replication_mode: ReplicationMode, + replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, config: &Config, ) -> Result, Error> { // ---- setup netapp RPC protocol ---- @@ -274,14 +274,13 @@ impl System { let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); // ---- setup cluster layout and layout manager ---- - let replication_factor = replication_mode.replication_factor(); - let layout_manager = LayoutManager::new( config, netapp.id, system_endpoint.clone(), peering.clone(), - replication_mode, + replication_factor, + consistency_mode, )?; let mut local_status = NodeStatus::initial(replication_factor, &layout_manager); @@ -315,7 +314,6 @@ impl System { netapp: netapp.clone(), peering: peering.clone(), system_endpoint, - replication_mode, replication_factor, rpc_listen_addr: config.rpc_bind_addr, rpc_public_addr, @@ -427,7 +425,9 @@ impl System { } pub fn health(&self) -> ClusterHealth { - let quorum = self.replication_mode.write_quorum(); + let quorum = self + .replication_factor + .write_quorum(ConsistencyMode::Consistent); // Gather information about running nodes. // Technically, `nodes` contains currently running nodes, as well @@ -631,7 +631,7 @@ impl System { .count(); let not_configured = self.cluster_layout().check().is_err(); - let no_peers = n_connected < self.replication_factor; + let no_peers = n_connected < self.replication_factor.into(); let expected_n_nodes = self.cluster_layout().all_nodes().len(); let bad_peers = n_connected != expected_n_nodes; @@ -774,14 +774,14 @@ impl EndpointHandler for System { } impl NodeStatus { - fn initial(replication_factor: usize, layout_manager: &LayoutManager) -> Self { + fn initial(replication_factor: ReplicationFactor, layout_manager: &LayoutManager) -> Self { NodeStatus { hostname: Some( gethostname::gethostname() .into_string() .unwrap_or_else(|_| "".to_string()), ), - replication_factor, + replication_factor: replication_factor.into(), layout_digest: layout_manager.layout().digest(), meta_disk_avail: None, data_disk_avail: None, diff --git a/src/rpc/system_metrics.rs b/src/rpc/system_metrics.rs index 0bb55bf3..a64daec8 100644 --- a/src/rpc/system_metrics.rs +++ b/src/rpc/system_metrics.rs @@ -68,7 +68,7 @@ impl SystemMetrics { let replication_factor = system.replication_factor; meter .u64_value_observer("garage_replication_factor", move |observer| { - observer.observe(replication_factor as u64, &[]) + observer.observe(replication_factor.replication_factor() as u64, &[]) }) .with_description("Garage replication factor setting") .init() diff --git a/src/util/config.rs b/src/util/config.rs index 056c625d..b7f27676 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -30,12 +30,20 @@ pub struct Config { )] pub block_size: usize, - /// Replication mode. Supported values: - /// - none, 1 -> no replication - /// - 2 -> 2-way replication - /// - 3 -> 3-way replication - // (we can add more aliases for this later) - pub replication_mode: String, + /// Number of replicas. Can be any positive integer, but uneven numbers are more favorable. + /// - 1 for single-node clusters, or to disable replication + /// - 3 is the recommended and supported setting. + #[serde(default)] + pub replication_factor: Option, + + /// Consistency mode for all for requests through this node + /// - Degraded -> Disable read quorum + /// - Dangerous -> Disable read and write quorum + #[serde(default = "default_consistency_mode")] + pub consistency_mode: String, + + /// Legacy option + pub replication_mode: Option, /// Zstd compression level used on data blocks #[serde( @@ -244,10 +252,15 @@ fn default_sled_cache_capacity() -> usize { fn default_sled_flush_every_ms() -> u64 { 2000 } + fn default_block_size() -> usize { 1048576 } +fn default_consistency_mode() -> String { + "consistent".into() +} + fn default_compression() -> Option { Some(1) } @@ -359,7 +372,7 @@ mod tests { r#" metadata_dir = "/tmp/garage/meta" data_dir = "/tmp/garage/data" - replication_mode = "3" + replication_factor = 3 rpc_bind_addr = "[::]:3901" rpc_secret = "foo" -- 2.43.4 From 060f83edf182a7d9c9a68823aee9a184d5a3c13b Mon Sep 17 00:00:00 2001 From: Yureka Date: Mon, 4 Mar 2024 18:43:08 +0100 Subject: [PATCH 3/4] other refactors --- src/block/manager.rs | 11 ++--------- src/block/resync.rs | 10 ++++++---- src/model/garage.rs | 13 +------------ src/rpc/layout/helper.rs | 13 +------------ src/rpc/layout/manager.rs | 18 ++++++++++++------ src/table/replication/sharded.rs | 24 +++++++++--------------- src/util/crdt/lww.rs | 2 +- 7 files changed, 32 insertions(+), 59 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index f4d8ee56..218ef9eb 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -33,8 +33,6 @@ use garage_rpc::rpc_helper::OrderTag; use garage_rpc::system::System; use garage_rpc::*; -use garage_table::replication::{TableReplication, TableShardedReplication}; - use crate::block::*; use crate::layout::*; use crate::metrics::*; @@ -74,9 +72,6 @@ impl Rpc for BlockRpc { /// The block manager, handling block exchange between nodes, and block storage on local node pub struct BlockManager { - /// Replication strategy, allowing to find on which node blocks should be located - pub replication: TableShardedReplication, - /// Data layout pub(crate) data_layout: ArcSwap, /// Data layout persister @@ -122,7 +117,6 @@ impl BlockManager { data_dir: DataDirEnum, data_fsync: bool, compression_level: Option, - replication: TableShardedReplication, system: Arc, ) -> Result, Error> { // Load or compute layout, i.e. assignment of data blocks to the different data directories @@ -163,7 +157,6 @@ impl BlockManager { let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); let block_manager = Arc::new(Self { - replication, data_layout: ArcSwap::new(Arc::new(data_layout)), data_layout_persister, data_fsync, @@ -354,7 +347,7 @@ impl BlockManager { data: Bytes, order_tag: Option, ) -> Result<(), Error> { - let who = self.replication.write_sets(&hash); + let who = self.system.layout_manager.write_sets_of(&hash); let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) .await @@ -374,7 +367,7 @@ impl BlockManager { who.as_ref(), put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) - .with_quorum(self.replication.write_quorum()), + .with_quorum(self.system.layout_manager.write_quorum()), ) .await?; diff --git a/src/block/resync.rs b/src/block/resync.rs index 15f210e4..180e7bcf 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -28,8 +28,6 @@ use garage_util::tranquilizer::Tranquilizer; use garage_rpc::system::System; use garage_rpc::*; -use garage_table::replication::TableReplication; - use crate::manager::*; // The delay between the time where a resync operation fails @@ -377,8 +375,12 @@ impl BlockResyncManager { info!("Resync block {:?}: offloading and deleting", hash); let existing_path = existing_path.unwrap(); - let mut who = manager.replication.storage_nodes(hash); - if who.len() < manager.replication.write_quorum() { + let mut who = manager + .system + .layout_manager + .layout() + .storage_nodes_of(hash); + if who.len() < manager.system.layout_manager.write_quorum() { return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string())); } who.retain(|id| *id != manager.system.id); diff --git a/src/model/garage.rs b/src/model/garage.rs index 19f58077..482e187d 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -230,18 +230,8 @@ impl Garage { info!("Initialize membership management system..."); let system = System::new(network_key, replication_factor, consistency_mode, &config)?; - let data_rep_param = TableShardedReplication { - system: system.clone(), - replication_factor: replication_factor.into(), - write_quorum: replication_factor.write_quorum(consistency_mode), - read_quorum: 1, - }; - let meta_rep_param = TableShardedReplication { - system: system.clone(), - replication_factor: replication_factor.into(), - write_quorum: replication_factor.write_quorum(consistency_mode), - read_quorum: replication_factor.read_quorum(consistency_mode), + layout_manager: system.layout_manager.clone(), }; let control_rep_param = TableFullReplication { @@ -254,7 +244,6 @@ impl Garage { config.data_dir.clone(), config.data_fsync, config.compression_level, - data_rep_param, system.clone(), )?; block_manager.register_bg_vars(&mut bg_vars); diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 2835347a..cd72e2d3 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; use super::*; -use crate::replication_mode::*; +use crate::replication_mode::ReplicationFactor; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] pub struct RpcLayoutDigest { @@ -30,7 +30,6 @@ pub struct SyncLayoutDigest { pub struct LayoutHelper { replication_factor: ReplicationFactor, - consistency_mode: ConsistencyMode, layout: Option, // cached values @@ -59,7 +58,6 @@ impl Deref for LayoutHelper { impl LayoutHelper { pub fn new( replication_factor: ReplicationFactor, - consistency_mode: ConsistencyMode, mut layout: LayoutHistory, mut ack_lock: HashMap, ) -> Self { @@ -68,13 +66,6 @@ impl LayoutHelper { // correct and we have rapid access to important values such as // the layout versions to use when reading to ensure consistency. - if consistency_mode != ConsistencyMode::Consistent { - // Fast path for when no consistency is required. - // In this case we only need to keep the last version of the layout, - // we don't care about coordinating stuff in the cluster. - layout.keep_current_version_only(); - } - layout.cleanup_old_versions(); let all_nodes = layout.get_all_nodes(); @@ -117,7 +108,6 @@ impl LayoutHelper { LayoutHelper { replication_factor, - consistency_mode, layout: Some(layout), ack_map_min, sync_map_min, @@ -143,7 +133,6 @@ impl LayoutHelper { if changed { *self = Self::new( self.replication_factor, - self.consistency_mode, self.layout.take().unwrap(), std::mem::take(&mut self.ack_lock), ); diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 8a6eb1c3..846eea47 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -21,6 +21,7 @@ use crate::system::*; pub struct LayoutManager { node_id: Uuid, replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, persist_cluster_layout: Persister, layout: Arc>, @@ -64,12 +65,8 @@ impl LayoutManager { } }; - let mut cluster_layout = LayoutHelper::new( - replication_factor, - consistency_mode, - cluster_layout, - Default::default(), - ); + let mut cluster_layout = + LayoutHelper::new(replication_factor, cluster_layout, Default::default()); cluster_layout.update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); @@ -85,6 +82,7 @@ impl LayoutManager { Ok(Arc::new(Self { node_id: node_id.into(), replication_factor, + consistency_mode, persist_cluster_layout, layout, change_notify, @@ -141,6 +139,14 @@ impl LayoutManager { } } + pub fn read_quorum(self: &Arc) -> usize { + self.replication_factor.read_quorum(self.consistency_mode) + } + + pub fn write_quorum(self: &Arc) -> usize { + self.replication_factor.write_quorum(self.consistency_mode) + } + // ---- ACK LOCKING ---- pub fn write_sets_of(self: &Arc, position: &Hash) -> WriteLock>> { diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index e0245949..fa5e48d7 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -1,7 +1,7 @@ use std::sync::Arc; +use garage_rpc::layout::manager::LayoutManager; use garage_rpc::layout::*; -use garage_rpc::system::System; use garage_util::data::*; use crate::replication::*; @@ -15,42 +15,36 @@ use crate::replication::*; #[derive(Clone)] pub struct TableShardedReplication { /// The membership manager of this node - pub system: Arc, - /// How many time each data should be replicated - pub replication_factor: usize, - /// How many nodes to contact for a read, should be at most `replication_factor` - pub read_quorum: usize, - /// How many nodes to contact for a write, should be at most `replication_factor` - pub write_quorum: usize, + pub layout_manager: Arc, } impl TableReplication for TableShardedReplication { type WriteSets = WriteLock>>; fn storage_nodes(&self, hash: &Hash) -> Vec { - self.system.cluster_layout().storage_nodes_of(hash) + self.layout_manager.layout().storage_nodes_of(hash) } fn read_nodes(&self, hash: &Hash) -> Vec { - self.system.cluster_layout().read_nodes_of(hash) + self.layout_manager.layout().read_nodes_of(hash) } fn read_quorum(&self) -> usize { - self.read_quorum + self.layout_manager.read_quorum() } fn write_sets(&self, hash: &Hash) -> Self::WriteSets { - self.system.layout_manager.write_sets_of(hash) + self.layout_manager.write_sets_of(hash) } fn write_quorum(&self) -> usize { - self.write_quorum + self.layout_manager.write_quorum() } fn partition_of(&self, hash: &Hash) -> Partition { - self.system.cluster_layout().current().partition_of(hash) + self.layout_manager.layout().current().partition_of(hash) } fn sync_partitions(&self) -> SyncPartitions { - let layout = self.system.cluster_layout(); + let layout = self.layout_manager.layout(); let layout_version = layout.ack_map_min(); let mut partitions = layout diff --git a/src/util/crdt/lww.rs b/src/util/crdt/lww.rs index 958844c9..0677a673 100644 --- a/src/util/crdt/lww.rs +++ b/src/util/crdt/lww.rs @@ -84,7 +84,7 @@ where &self.v } - /// Take the value inside the CRDT (discards the timesamp) + /// Take the value inside the CRDT (discards the timestamp) pub fn take(self) -> T { self.v } -- 2.43.4 From 41a17ce14a0f0dffed84b9e2e8a429dcd53120f3 Mon Sep 17 00:00:00 2001 From: Yureka Date: Mon, 4 Mar 2024 16:58:06 +0100 Subject: [PATCH 4/4] Per-Bucket Consistency --- src/api/admin/bucket.rs | 28 ++++++--- src/api/admin/key.rs | 14 +++-- src/api/k2v/batch.rs | 38 +++++++++--- src/api/k2v/index.rs | 6 +- src/api/k2v/item.rs | 24 ++++++-- src/api/k2v/range.rs | 3 + src/api/s3/bucket.rs | 8 +-- src/api/s3/copy.rs | 65 +++++++++++++-------- src/api/s3/cors.rs | 4 +- src/api/s3/delete.rs | 11 +++- src/api/s3/get.rs | 51 ++++++++++++---- src/api/s3/lifecycle.rs | 4 +- src/api/s3/list.rs | 16 ++++- src/api/s3/multipart.rs | 66 +++++++++++++++------ src/api/s3/put.rs | 45 ++++++++++---- src/api/s3/website.rs | 4 +- src/api/signature/payload.rs | 2 +- src/block/manager.rs | 4 +- src/block/resync.rs | 7 ++- src/garage/admin/block.rs | 58 ++++++++++++++---- src/garage/admin/bucket.rs | 46 +++++++-------- src/garage/admin/key.rs | 19 +++--- src/garage/repair/online.rs | 20 ++++--- src/model/bucket_table.rs | 91 ++++++++++++++++++++++++++++- src/model/helper/bucket.rs | 31 ++++++++-- src/model/helper/key.rs | 5 +- src/model/helper/locked.rs | 36 +++++++----- src/model/k2v/rpc.rs | 59 +++++++++++++------ src/model/migrate.rs | 31 +++++----- src/model/s3/lifecycle_worker.rs | 2 +- src/rpc/layout/manager.rs | 10 ++-- src/table/queue.rs | 2 +- src/table/replication/fullcopy.rs | 5 +- src/table/replication/parameters.rs | 5 +- src/table/replication/sharded.rs | 10 ++-- src/table/sync.rs | 4 +- src/table/table.rs | 35 +++++++---- src/web/web_server.rs | 32 ++++++++-- 38 files changed, 658 insertions(+), 243 deletions(-) diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index ac3cba00..4d53cc16 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -10,6 +10,8 @@ use garage_util::time::*; use garage_table::*; +use garage_rpc::replication_mode::ConsistencyMode; + use garage_model::bucket_alias_table::*; use garage_model::bucket_table::*; use garage_model::garage::Garage; @@ -27,6 +29,7 @@ pub async fn handle_list_buckets(garage: &Arc) -> Result>(), + consistency_mode: *state.consistency_mode.get(), website_access: state.website_config.get().is_some(), website_config: state.website_config.get().clone().map(|wsc| { GetBucketInfoWebsiteResult { @@ -238,6 +244,7 @@ async fn bucket_info_results( struct GetBucketInfoResult { id: String, global_aliases: Vec, + consistency_mode: ConsistencyMode, website_access: bool, #[serde(default)] website_config: Option, @@ -283,7 +290,7 @@ pub async fn handle_create_bucket( ))); } - if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? { + if let Some(alias) = garage.bucket_alias_table.get((), &EmptyKey, ga).await? { if alias.state.get().is_some() { return Err(CommonError::BucketAlreadyExists.into()); } @@ -306,7 +313,7 @@ pub async fn handle_create_bucket( } let bucket = Bucket::new(); - garage.bucket_table.insert(&bucket).await?; + garage.bucket_table.insert((), &bucket).await?; if let Some(ga) = &req.global_alias { helper.set_global_bucket_alias(bucket.id, ga).await?; @@ -394,7 +401,7 @@ pub async fn handle_delete_bucket( // 4. delete bucket bucket.state = Deletable::delete(); - garage.bucket_table.insert(&bucket).await?; + garage.bucket_table.insert((), &bucket).await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -416,6 +423,10 @@ pub async fn handle_update_bucket( let state = bucket.state.as_option_mut().unwrap(); + if let Some(cm) = req.consistency_mode { + state.consistency_mode.update(cm); + } + if let Some(wa) = req.website_access { if wa.enabled { state.website_config.update(Some(WebsiteConfig { @@ -441,7 +452,7 @@ pub async fn handle_update_bucket( }); } - garage.bucket_table.insert(&bucket).await?; + garage.bucket_table.insert((), &bucket).await?; bucket_info_results(garage, bucket_id).await } @@ -449,6 +460,7 @@ pub async fn handle_update_bucket( #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct UpdateBucketRequest { + consistency_mode: Option, website_access: Option, quotas: Option, } diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 291b6d54..9f1a152c 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -17,6 +17,7 @@ pub async fn handle_list_keys(garage: &Arc) -> Result, let res = garage .key_table .get_range( + (), &EmptyKey, None, Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), @@ -68,7 +69,7 @@ pub async fn handle_create_key( let req = parse_json_body::(req).await?; let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key")); - garage.key_table.insert(&key).await?; + garage.key_table.insert((), &key).await?; key_info_results(garage, key, true).await } @@ -85,7 +86,10 @@ pub async fn handle_import_key( ) -> Result, Error> { let req = parse_json_body::(req).await?; - let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?; + let prev_key = garage + .key_table + .get((), &EmptyKey, &req.access_key_id) + .await?; if prev_key.is_some() { return Err(Error::KeyAlreadyExists(req.access_key_id.to_string())); } @@ -96,7 +100,7 @@ pub async fn handle_import_key( req.name.as_deref().unwrap_or("Imported key"), ) .ok_or_bad_request("Invalid key format")?; - garage.key_table.insert(&imported_key).await?; + garage.key_table.insert((), &imported_key).await?; key_info_results(garage, imported_key, false).await } @@ -134,7 +138,7 @@ pub async fn handle_update_key( } } - garage.key_table.insert(&key).await?; + garage.key_table.insert((), &key).await?; key_info_results(garage, key, false).await } @@ -184,7 +188,7 @@ async fn key_info_results( .filter_map(|(_, _, v)| v.as_ref()), ) { if !relevant_buckets.contains_key(id) { - if let Some(b) = garage.bucket_table.get(&EmptyKey, id).await? { + if let Some(b) = garage.bucket_table.get((), &EmptyKey, id).await? { if b.state.as_option().is_some() { relevant_buckets.insert(*id, b); } diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 02b7ae8b..b3d96dff 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -17,7 +17,10 @@ pub async fn handle_insert_batch( req: Request, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; let items = parse_json_body::, _, Error>(req).await?; @@ -35,7 +38,11 @@ pub async fn handle_insert_batch( items2.push((it.pk, it.sk, ct, v)); } - garage.k2v.rpc.insert_batch(*bucket_id, items2).await?; + garage + .k2v + .rpc + .insert_batch(*bucket_params.consistency_mode.get(), *bucket_id, items2) + .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -68,8 +75,12 @@ async fn handle_read_batch_query( query: ReadBatchQuery, ) -> Result { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = ctx; + let c = *bucket_params.consistency_mode.get(); let partition = K2VItemPartition { bucket_id: *bucket_id, @@ -92,7 +103,7 @@ async fn handle_read_batch_query( let item = garage .k2v .item_table - .get(&partition, sk) + .get(c, &partition, sk) .await? .filter(|e| K2VItemTable::matches_filter(e, &filter)); match item { @@ -109,6 +120,7 @@ async fn handle_read_batch_query( query.limit, Some(filter), EnumerationOrder::from_reverse(query.reverse), + c, ) .await?; @@ -162,8 +174,12 @@ async fn handle_delete_batch_query( query: DeleteBatchQuery, ) -> Result { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; + let c = *bucket_params.consistency_mode.get(); let partition = K2VItemPartition { bucket_id: *bucket_id, @@ -186,7 +202,7 @@ async fn handle_delete_batch_query( let item = garage .k2v .item_table - .get(&partition, sk) + .get(c, &partition, sk) .await? .filter(|e| K2VItemTable::matches_filter(e, &filter)); match item { @@ -196,6 +212,7 @@ async fn handle_delete_batch_query( .k2v .rpc .insert( + c, *bucket_id, i.partition.partition_key, i.sort_key, @@ -217,6 +234,7 @@ async fn handle_delete_batch_query( None, Some(filter), EnumerationOrder::Forward, + c, ) .await?; assert!(!more); @@ -236,7 +254,7 @@ async fn handle_delete_batch_query( .collect::>(); let n = items.len(); - garage.k2v.rpc.insert_batch(*bucket_id, items).await?; + garage.k2v.rpc.insert_batch(c, *bucket_id, items).await?; n }; @@ -257,7 +275,10 @@ pub(crate) async fn handle_poll_range( req: Request, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = ctx; use garage_model::k2v::sub::PollRange; @@ -269,6 +290,7 @@ pub(crate) async fn handle_poll_range( .k2v .rpc .poll_range( + *bucket_params.consistency_mode.get(), PollRange { partition: K2VItemPartition { bucket_id, diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index e3397238..9e804d05 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -19,7 +19,10 @@ pub async fn handle_read_index( reverse: Option, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; let reverse = reverse.unwrap_or(false); @@ -39,6 +42,7 @@ pub async fn handle_read_index( limit, Some((DeletedFilter::NotDeleted, node_id_vec)), EnumerationOrder::from_reverse(reverse), + *bucket_params.consistency_mode.get(), ) .await?; diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index af3af4e4..dfa30b33 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -101,7 +101,10 @@ pub async fn handle_read_item( sort_key: &String, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; let format = ReturnFormat::from(req)?; @@ -110,6 +113,7 @@ pub async fn handle_read_item( .k2v .item_table .get( + *bucket_params.consistency_mode.get(), &K2VItemPartition { bucket_id: *bucket_id, partition_key: partition_key.to_string(), @@ -129,7 +133,10 @@ pub async fn handle_insert_item( sort_key: &str, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; let causal_context = req .headers() @@ -149,6 +156,7 @@ pub async fn handle_insert_item( .k2v .rpc .insert( + *bucket_params.consistency_mode.get(), *bucket_id, partition_key.to_string(), sort_key.to_string(), @@ -169,7 +177,10 @@ pub async fn handle_delete_item( sort_key: &str, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; let causal_context = req .headers() @@ -185,6 +196,7 @@ pub async fn handle_delete_item( .k2v .rpc .insert( + *bucket_params.consistency_mode.get(), *bucket_id, partition_key.to_string(), sort_key.to_string(), @@ -209,7 +221,10 @@ pub async fn handle_poll_item( timeout_secs: Option, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; let format = ReturnFormat::from(req)?; @@ -222,6 +237,7 @@ pub async fn handle_poll_item( .k2v .rpc .poll_item( + *bucket_params.consistency_mode.get(), *bucket_id, partition_key, sort_key, diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs index bb9d3be5..4fd5ba04 100644 --- a/src/api/k2v/range.rs +++ b/src/api/k2v/range.rs @@ -4,6 +4,7 @@ use std::sync::Arc; +use garage_rpc::replication_mode::ConsistencyMode; use garage_table::replication::TableShardedReplication; use garage_table::*; @@ -22,6 +23,7 @@ pub(crate) async fn read_range( limit: Option, filter: Option, enumeration_order: EnumerationOrder, + c: ConsistencyMode, ) -> Result<(Vec, bool, Option), Error> where F: TableSchema + 'static, @@ -54,6 +56,7 @@ where ); let get_ret = table .get_range( + c, partition_key, start.clone(), filter.clone(), diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 6a12aa9c..1cf156b9 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -70,10 +70,10 @@ pub async fn handle_list_buckets( let mut aliases = HashMap::new(); for bucket_id in ids.iter() { - let bucket = garage.bucket_table.get(&EmptyKey, bucket_id).await?; + let bucket = garage.bucket_table.get((), &EmptyKey, bucket_id).await?; if let Some(bucket) = bucket { for (alias, _, _active) in bucket.aliases().iter().filter(|(_, _, active)| *active) { - let alias_opt = garage.bucket_alias_table.get(&EmptyKey, alias).await?; + let alias_opt = garage.bucket_alias_table.get((), &EmptyKey, alias).await?; if let Some(alias_ent) = alias_opt { if *alias_ent.state.get() == Some(*bucket_id) { aliases.insert(alias_ent.name().to_string(), *bucket_id); @@ -187,7 +187,7 @@ pub async fn handle_create_bucket( } let bucket = Bucket::new(); - garage.bucket_table.insert(&bucket).await?; + garage.bucket_table.insert((), &bucket).await?; helper .set_bucket_key_permissions(bucket.id, &api_key.key_id, BucketKeyPerm::ALL_PERMISSIONS) @@ -268,7 +268,7 @@ pub async fn handle_delete_bucket(ctx: ReqCtx) -> Result, Erro state: Deletable::delete(), }; // 3. delete bucket - garage.bucket_table.insert(&bucket).await?; + garage.bucket_table.insert((), &bucket).await?; } else if is_local_alias { // Just unalias helper diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 3c2bd483..c1160f87 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -9,6 +9,7 @@ use hyper::{Request, Response}; use serde::Serialize; use garage_net::bytes_buf::BytesBuf; +use garage_rpc::replication_mode::ConsistencyMode; use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::data::*; @@ -33,13 +34,15 @@ pub async fn handle_copy( ) -> Result, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; - let source_object = get_copy_source(&ctx, req).await?; + let (source_object, source_c) = get_copy_source(&ctx, req).await?; let ReqCtx { garage, bucket_id: dest_bucket_id, + bucket_params: dest_bucket_params, .. } = ctx; + let dest_c = *dest_bucket_params.consistency_mode.get(); let (source_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -80,13 +83,13 @@ pub async fn handle_copy( dest_key.to_string(), vec![dest_object_version], ); - garage.object_table.insert(&dest_object).await?; + garage.object_table.insert(dest_c, &dest_object).await?; } ObjectVersionData::FirstBlock(_meta, first_block_hash) => { // Get block list from source version let source_version = garage .version_table - .get(&source_version.uuid, &EmptyKey) + .get(source_c, &source_version.uuid, &EmptyKey) .await?; let source_version = source_version.ok_or(Error::NoSuchKey)?; @@ -106,7 +109,7 @@ pub async fn handle_copy( dest_key.to_string(), vec![tmp_dest_object_version], ); - garage.object_table.insert(&tmp_dest_object).await?; + garage.object_table.insert(dest_c, &tmp_dest_object).await?; // Write version in the version table. Even with empty block list, // this means that the BlockRef entries linked to this version cannot be @@ -120,7 +123,7 @@ pub async fn handle_copy( }, false, ); - garage.version_table.insert(&dest_version).await?; + garage.version_table.insert(dest_c, &dest_version).await?; // Fill in block list for version and insert block refs for (bk, bv) in source_version.blocks.items().iter() { @@ -137,8 +140,10 @@ pub async fn handle_copy( }) .collect::>(); futures::try_join!( - garage.version_table.insert(&dest_version), - garage.block_ref_table.insert_many(&dest_block_refs[..]), + garage.version_table.insert(dest_c, &dest_version), + garage + .block_ref_table + .insert_many(dest_c, &dest_block_refs[..]), )?; // Insert final object @@ -160,7 +165,7 @@ pub async fn handle_copy( dest_key.to_string(), vec![dest_object_version], ); - garage.object_table.insert(&dest_object).await?; + garage.object_table.insert(dest_c, &dest_object).await?; } } @@ -193,12 +198,17 @@ pub async fn handle_upload_part_copy( let dest_upload_id = multipart::decode_upload_id(upload_id)?; let dest_key = dest_key.to_string(); - let (source_object, (_, _, mut dest_mpu)) = futures::try_join!( + let ((source_object, source_c), (_, _, mut dest_mpu)) = futures::try_join!( get_copy_source(&ctx, req), multipart::get_upload(&ctx, &dest_key, &dest_upload_id) )?; - let ReqCtx { garage, .. } = ctx; + let ReqCtx { + garage, + bucket_params: dest_bucket_params, + .. + } = ctx; + let dest_c = *dest_bucket_params.consistency_mode.get(); let (source_object_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -244,7 +254,7 @@ pub async fn handle_upload_part_copy( // and destination version to check part hasn't yet been uploaded let source_version = garage .version_table - .get(&source_object_version.uuid, &EmptyKey) + .get(source_c, &source_object_version.uuid, &EmptyKey) .await? .ok_or(Error::NoSuchKey)?; @@ -304,7 +314,7 @@ pub async fn handle_upload_part_copy( size: None, }, ); - garage.mpu_table.insert(&dest_mpu).await?; + garage.mpu_table.insert(dest_c, &dest_mpu).await?; let mut dest_version = Version::new( dest_version_id, @@ -390,7 +400,7 @@ pub async fn handle_upload_part_copy( if must_upload { garage2 .block_manager - .rpc_put_block(final_hash, data, None) + .rpc_put_block(dest_c, final_hash, data, None) .await } else { Ok(()) @@ -398,9 +408,9 @@ pub async fn handle_upload_part_copy( }, async { // Thing 2: we need to insert the block in the version - garage.version_table.insert(&dest_version).await?; + garage.version_table.insert(dest_c, &dest_version).await?; // Thing 3: we need to add a block reference - garage.block_ref_table.insert(&block_ref).await + garage.block_ref_table.insert(dest_c, &block_ref).await }, // Thing 4: we need to prefetch the next block defragmenter.next(), @@ -422,7 +432,7 @@ pub async fn handle_upload_part_copy( size: Some(current_offset), }, ); - garage.mpu_table.insert(&dest_mpu).await?; + garage.mpu_table.insert(dest_c, &dest_mpu).await?; // LGTM let resp_xml = s3_xml::to_xml_with_header(&CopyPartResult { @@ -440,24 +450,33 @@ pub async fn handle_upload_part_copy( .body(string_body(resp_xml))?) } -async fn get_copy_source(ctx: &ReqCtx, req: &Request) -> Result { +async fn get_copy_source( + ctx: &ReqCtx, + req: &Request, +) -> Result<(Object, ConsistencyMode), Error> { let ReqCtx { garage, api_key, .. } = ctx; - let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; - let (source_bucket, source_key) = parse_bucket_key(©_source, None)?; + let (source_bucket_name, source_key) = parse_bucket_key(©_source, None)?; let source_bucket_id = garage .bucket_helper() - .resolve_bucket(&source_bucket.to_string(), api_key) + .resolve_bucket(&source_bucket_name.to_string(), api_key) .await?; + let source_bucket = garage + .bucket_helper() + .get_existing_bucket(source_bucket_id) + .await?; + let source_bucket_state = source_bucket.state.as_option().unwrap(); + let source_c = *source_bucket_state.consistency_mode.get(); + if !api_key.allow_read(&source_bucket_id) { return Err(Error::forbidden(format!( "Reading from bucket {} not allowed for this key", - source_bucket + source_bucket_name ))); } @@ -465,11 +484,11 @@ async fn get_copy_source(ctx: &ReqCtx, req: &Request) -> Result Result, Error> bucket_params.cors_config.update(None); garage .bucket_table - .insert(&Bucket::present(bucket_id, bucket_params)) + .insert((), &Bucket::present(bucket_id, bucket_params)) .await?; Ok(Response::builder() @@ -91,7 +91,7 @@ pub async fn handle_put_cors( .update(Some(conf.into_garage_cors_config()?)); garage .bucket_table - .insert(&Bucket::present(bucket_id, bucket_params)) + .insert((), &Bucket::present(bucket_id, bucket_params)) .await?; Ok(Response::builder() diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 57f6f948..8f9152cb 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -14,11 +14,16 @@ use crate::signature::verify_signed_content; async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = ctx; + let c = *bucket_params.consistency_mode.get(); + let object = garage .object_table - .get(bucket_id, &key.to_string()) + .get(c, bucket_id, &key.to_string()) .await? .ok_or(Error::NoSuchKey)?; // No need to delete @@ -49,7 +54,7 @@ async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), }], ); - garage.object_table.insert(&object).await?; + garage.object_table.insert(c, &object).await?; Ok((deleted_version, del_uuid)) } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index ed996fb1..90560e6b 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -14,11 +14,13 @@ use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; use garage_net::stream::ByteStream; +use garage_rpc::replication_mode::ConsistencyMode; use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; use garage_util::error::OkOrMessage; +use garage_model::bucket_table::BucketParams; use garage_model::garage::Garage; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; @@ -136,7 +138,15 @@ pub async fn handle_head( key: &str, part_number: Option, ) -> Result, Error> { - handle_head_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number).await + handle_head_without_ctx( + ctx.garage, + req, + ctx.bucket_id, + &ctx.bucket_params, + key, + part_number, + ) + .await } /// Handle HEAD request for website @@ -144,12 +154,15 @@ pub async fn handle_head_without_ctx( garage: Arc, req: &Request, bucket_id: Uuid, + bucket_params: &BucketParams, key: &str, part_number: Option, ) -> Result, Error> { + let c = *bucket_params.consistency_mode.get(); + let object = garage .object_table - .get(&bucket_id, &key.to_string()) + .get(c, &bucket_id, &key.to_string()) .await? .ok_or(Error::NoSuchKey)?; @@ -194,7 +207,7 @@ pub async fn handle_head_without_ctx( ObjectVersionData::FirstBlock(_, _) => { let version = garage .version_table - .get(&object_version.uuid, &EmptyKey) + .get(c, &object_version.uuid, &EmptyKey) .await? .ok_or(Error::NoSuchKey)?; @@ -234,7 +247,16 @@ pub async fn handle_get( part_number: Option, overrides: GetObjectOverrides, ) -> Result, Error> { - handle_get_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number, overrides).await + handle_get_without_ctx( + ctx.garage, + req, + ctx.bucket_id, + &ctx.bucket_params, + key, + part_number, + overrides, + ) + .await } /// Handle GET request @@ -242,13 +264,16 @@ pub async fn handle_get_without_ctx( garage: Arc, req: &Request, bucket_id: Uuid, + bucket_params: &BucketParams, key: &str, part_number: Option, overrides: GetObjectOverrides, ) -> Result, Error> { + let c = *bucket_params.consistency_mode.get(); + let object = garage .object_table - .get(&bucket_id, &key.to_string()) + .get(c, &bucket_id, &key.to_string()) .await? .ok_or(Error::NoSuchKey)?; @@ -277,10 +302,11 @@ pub async fn handle_get_without_ctx( (Some(_), Some(_)) => Err(Error::bad_request( "Cannot specify both partNumber and Range header", )), - (Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await, + (Some(pn), None) => handle_get_part(garage, c, last_v, last_v_data, last_v_meta, pn).await, (None, Some(range)) => { handle_get_range( garage, + c, last_v, last_v_data, last_v_meta, @@ -289,12 +315,15 @@ pub async fn handle_get_without_ctx( ) .await } - (None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await, + (None, None) => { + handle_get_full(garage, c, last_v, last_v_data, last_v_meta, overrides).await + } } } async fn handle_get_full( garage: Arc, + c: ConsistencyMode, version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, @@ -321,7 +350,7 @@ async fn handle_get_full( match async { let garage2 = garage.clone(); let version_fut = tokio::spawn(async move { - garage2.version_table.get(&version_uuid, &EmptyKey).await + garage2.version_table.get(c, &version_uuid, &EmptyKey).await }); let stream_block_0 = garage @@ -362,6 +391,7 @@ async fn handle_get_full( async fn handle_get_range( garage: Arc, + c: ConsistencyMode, version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, @@ -394,7 +424,7 @@ async fn handle_get_range( ObjectVersionData::FirstBlock(_meta, _first_block_hash) => { let version = garage .version_table - .get(&version.uuid, &EmptyKey) + .get(c, &version.uuid, &EmptyKey) .await? .ok_or(Error::NoSuchKey)?; @@ -406,6 +436,7 @@ async fn handle_get_range( async fn handle_get_part( garage: Arc, + c: ConsistencyMode, object_version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, @@ -432,7 +463,7 @@ async fn handle_get_part( ObjectVersionData::FirstBlock(_, _) => { let version = garage .version_table - .get(&object_version.uuid, &EmptyKey) + .get(c, &object_version.uuid, &EmptyKey) .await? .ok_or(Error::NoSuchKey)?; diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 7eb1c2cb..6a0f19ff 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -44,7 +44,7 @@ pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result, E bucket_params.lifecycle_config.update(None); garage .bucket_table - .insert(&Bucket::present(bucket_id, bucket_params)) + .insert((), &Bucket::present(bucket_id, bucket_params)) .await?; Ok(Response::builder() @@ -78,7 +78,7 @@ pub async fn handle_put_lifecycle( bucket_params.lifecycle_config.update(Some(config)); garage .bucket_table - .insert(&Bucket::present(bucket_id, bucket_params)) + .insert((), &Bucket::present(bucket_id, bucket_params)) .await?; Ok(Response::builder() diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 302c03f4..70087024 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -63,11 +63,17 @@ pub async fn handle_list( ctx: ReqCtx, query: &ListObjectsQuery, ) -> Result, Error> { - let ReqCtx { garage, .. } = &ctx; + let ReqCtx { + garage, + bucket_params, + .. + } = &ctx; + let c = *bucket_params.consistency_mode.get(); let io = |bucket, key, count| { let t = &garage.object_table; async move { t.get_range( + c, &bucket, key, Some(ObjectFilter::IsData), @@ -169,12 +175,18 @@ pub async fn handle_list_multipart_upload( ctx: ReqCtx, query: &ListMultipartUploadsQuery, ) -> Result, Error> { - let ReqCtx { garage, .. } = &ctx; + let ReqCtx { + garage, + bucket_params, + .. + } = &ctx; + let c = *bucket_params.consistency_mode.get(); let io = |bucket, key, count| { let t = &garage.object_table; async move { t.get_range( + c, &bucket, key, Some(ObjectFilter::IsUploading { diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 1d5aeb26..af9f8418 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -5,6 +5,7 @@ use futures::prelude::*; use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; +use garage_rpc::replication_mode::ConsistencyMode; use garage_table::*; use garage_util::data::*; @@ -32,9 +33,12 @@ pub async fn handle_create_multipart_upload( garage, bucket_id, bucket_name, + bucket_params, .. } = &ctx; - let existing_object = garage.object_table.get(&bucket_id, &key).await?; + let c = *bucket_params.consistency_mode.get(); + + let existing_object = garage.object_table.get(c, &bucket_id, key).await?; let upload_id = gen_uuid(); let timestamp = next_timestamp(existing_object.as_ref()); @@ -51,13 +55,13 @@ pub async fn handle_create_multipart_upload( }, }; let object = Object::new(*bucket_id, key.to_string(), vec![object_version]); - garage.object_table.insert(&object).await?; + garage.object_table.insert(c, &object).await?; // Create multipart upload in mpu table // This multipart upload will hold references to uploaded parts // (which are entries in the Version table) let mpu = MultipartUpload::new(upload_id, timestamp, *bucket_id, key.into(), false); - garage.mpu_table.insert(&mpu).await?; + garage.mpu_table.insert(c, &mpu).await?; // Send success response let result = s3_xml::InitiateMultipartUploadResult { @@ -79,7 +83,12 @@ pub async fn handle_put_part( upload_id: &str, content_sha256: Option, ) -> Result, Error> { - let ReqCtx { garage, .. } = &ctx; + let ReqCtx { + garage, + bucket_params, + .. + } = &ctx; + let c = *bucket_params.consistency_mode.get(); let upload_id = decode_upload_id(upload_id)?; @@ -112,6 +121,7 @@ pub async fn handle_put_part( // before everything is finished (cleanup is done using the Drop trait). let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { garage: garage.clone(), + consistency_mode: c, upload_id, version_uuid, })); @@ -126,14 +136,14 @@ pub async fn handle_put_part( size: None, }, ); - garage.mpu_table.insert(&mpu).await?; + garage.mpu_table.insert(c, &mpu).await?; let version = Version::new( version_uuid, VersionBacklink::MultipartUpload { upload_id }, false, ); - garage.version_table.insert(&version).await?; + garage.version_table.insert(c, &version).await?; // Copy data to version let (total_size, data_md5sum, data_sha256sum, _) = @@ -157,7 +167,7 @@ pub async fn handle_put_part( size: Some(total_size), }, ); - garage.mpu_table.insert(&mpu).await?; + garage.mpu_table.insert(c, &mpu).await?; // We were not interrupted, everything went fine. // We won't have to clean up on drop. @@ -173,6 +183,7 @@ pub async fn handle_put_part( struct InterruptedCleanup(Option); struct InterruptedCleanupInner { garage: Arc, + consistency_mode: ConsistencyMode, upload_id: Uuid, version_uuid: Uuid, } @@ -193,7 +204,12 @@ impl Drop for InterruptedCleanup { }, true, ); - if let Err(e) = info.garage.version_table.insert(&version).await { + if let Err(e) = info + .garage + .version_table + .insert(info.consistency_mode, &version) + .await + { warn!("Cannot cleanup after aborted UploadPart: {}", e); } }); @@ -212,8 +228,10 @@ pub async fn handle_complete_multipart_upload( garage, bucket_id, bucket_name, + bucket_params, .. } = &ctx; + let c = *bucket_params.consistency_mode.get(); let body = http_body_util::BodyExt::collect(req.into_body()) .await? @@ -279,7 +297,7 @@ pub async fn handle_complete_multipart_upload( let grg = &garage; let parts_versions = futures::future::try_join_all(parts.iter().map(|p| async move { grg.version_table - .get(&p.version, &EmptyKey) + .get(c, &p.version, &EmptyKey) .await? .ok_or_internal_error("Part version missing from version table") })) @@ -308,14 +326,14 @@ pub async fn handle_complete_multipart_upload( ); } } - garage.version_table.insert(&final_version).await?; + garage.version_table.insert(c, &final_version).await?; let block_refs = final_version.blocks.items().iter().map(|(_, b)| BlockRef { block: b.hash, version: upload_id, deleted: false.into(), }); - garage.block_ref_table.insert_many(block_refs).await?; + garage.block_ref_table.insert_many(c, block_refs).await?; // Calculate etag of final object // To understand how etags are calculated, read more here: @@ -336,7 +354,7 @@ pub async fn handle_complete_multipart_upload( if let Err(e) = check_quotas(&ctx, total_size, Some(&object)).await { object_version.state = ObjectVersionState::Aborted; let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]); - garage.object_table.insert(&final_object).await?; + garage.object_table.insert(c, &final_object).await?; return Err(e); } @@ -352,7 +370,7 @@ pub async fn handle_complete_multipart_upload( )); let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]); - garage.object_table.insert(&final_object).await?; + garage.object_table.insert(c, &final_object).await?; // Send response saying ok we're done let result = s3_xml::CompleteMultipartUploadResult { @@ -373,8 +391,12 @@ pub async fn handle_abort_multipart_upload( upload_id: &str, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; + let c = *bucket_params.consistency_mode.get(); let upload_id = decode_upload_id(upload_id)?; @@ -382,7 +404,7 @@ pub async fn handle_abort_multipart_upload( object_version.state = ObjectVersionState::Aborted; let final_object = Object::new(*bucket_id, key.to_string(), vec![object_version]); - garage.object_table.insert(&final_object).await?; + garage.object_table.insert(c, &final_object).await?; Ok(Response::new(empty_body())) } @@ -396,13 +418,21 @@ pub(crate) async fn get_upload( upload_id: &Uuid, ) -> Result<(Object, ObjectVersion, MultipartUpload), Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = ctx; + let c = *bucket_params.consistency_mode.get(); + let (object, mpu) = futures::try_join!( - garage.object_table.get(bucket_id, key).map_err(Error::from), + garage + .object_table + .get(c, bucket_id, key) + .map_err(Error::from), garage .mpu_table - .get(upload_id, &EmptyKey) + .get(c, upload_id, &EmptyKey) .map_err(Error::from), )?; diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index f06aa7a2..b235c1b8 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -20,6 +20,7 @@ use opentelemetry::{ }; use garage_net::bytes_buf::BytesBuf; +use garage_rpc::replication_mode::ConsistencyMode; use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::async_hash::*; @@ -71,13 +72,20 @@ pub(crate) async fn save_stream> + Unpin>( content_sha256: Option, ) -> Result<(Uuid, String), Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = ctx; + let c = *bucket_params.consistency_mode.get(); let mut chunker = StreamChunker::new(body, garage.config.block_size); let (first_block_opt, existing_object) = try_join!( chunker.next(), - garage.object_table.get(bucket_id, key).map_err(Error::from), + garage + .object_table + .get(c, bucket_id, key) + .map_err(Error::from), )?; let first_block = first_block_opt.unwrap_or_default(); @@ -120,7 +128,7 @@ pub(crate) async fn save_stream> + Unpin>( }; let object = Object::new(*bucket_id, key.into(), vec![object_version]); - garage.object_table.insert(&object).await?; + garage.object_table.insert(c, &object).await?; return Ok((version_uuid, data_md5sum_hex)); } @@ -131,6 +139,7 @@ pub(crate) async fn save_stream> + Unpin>( let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { garage: garage.clone(), bucket_id: *bucket_id, + consistency_mode: c, key: key.into(), version_uuid, version_timestamp, @@ -147,7 +156,7 @@ pub(crate) async fn save_stream> + Unpin>( }, }; let object = Object::new(*bucket_id, key.into(), vec![object_version.clone()]); - garage.object_table.insert(&object).await?; + garage.object_table.insert(c, &object).await?; // Initialize corresponding entry in version table // Write this entry now, even with empty block list, @@ -161,7 +170,7 @@ pub(crate) async fn save_stream> + Unpin>( }, false, ); - garage.version_table.insert(&version).await?; + garage.version_table.insert(c, &version).await?; // Transfer data and verify checksum let (total_size, data_md5sum, data_sha256sum, first_block_hash) = @@ -187,7 +196,7 @@ pub(crate) async fn save_stream> + Unpin>( first_block_hash, )); let object = Object::new(*bucket_id, key.into(), vec![object_version]); - garage.object_table.insert(&object).await?; + garage.object_table.insert(c, &object).await?; // We were not interrupted, everything went fine. // We won't have to clean up on drop. @@ -235,6 +244,7 @@ pub(crate) async fn check_quotas( bucket_params, .. } = ctx; + let c = *bucket_params.consistency_mode.get(); let quotas = bucket_params.quotas.get(); if quotas.max_objects.is_none() && quotas.max_size.is_none() { @@ -244,7 +254,7 @@ pub(crate) async fn check_quotas( let counters = garage .object_counter_table .table - .get(bucket_id, &EmptyKey) + .get(c, bucket_id, &EmptyKey) .await?; let counters = counters @@ -451,7 +461,12 @@ async fn put_block_and_meta( block: Bytes, order_tag: OrderTag, ) -> Result<(), GarageError> { - let ReqCtx { garage, .. } = ctx; + let ReqCtx { + garage, + bucket_params, + .. + } = ctx; + let c = *bucket_params.consistency_mode.get(); let mut version = version.clone(); version.blocks.put( @@ -474,9 +489,9 @@ async fn put_block_and_meta( futures::try_join!( garage .block_manager - .rpc_put_block(hash, block, Some(order_tag)), - garage.version_table.insert(&version), - garage.block_ref_table.insert(&block_ref), + .rpc_put_block(c, hash, block, Some(order_tag)), + garage.version_table.insert(c, &version), + garage.block_ref_table.insert(c, &block_ref), )?; Ok(()) } @@ -529,6 +544,7 @@ struct InterruptedCleanup(Option); struct InterruptedCleanupInner { garage: Arc, bucket_id: Uuid, + consistency_mode: ConsistencyMode, key: String, version_uuid: Uuid, version_timestamp: u64, @@ -549,7 +565,12 @@ impl Drop for InterruptedCleanup { state: ObjectVersionState::Aborted, }; let object = Object::new(info.bucket_id, info.key, vec![object_version]); - if let Err(e) = info.garage.object_table.insert(&object).await { + if let Err(e) = info + .garage + .object_table + .insert(info.consistency_mode, &object) + .await + { warn!("Cannot cleanup after aborted PutObject: {}", e); } }); diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index 6af55677..3f2da68a 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -49,7 +49,7 @@ pub async fn handle_delete_website(ctx: ReqCtx) -> Result, Err bucket_params.website_config.update(None); garage .bucket_table - .insert(&Bucket::present(bucket_id, bucket_params)) + .insert((), &Bucket::present(bucket_id, bucket_params)) .await?; Ok(Response::builder() @@ -83,7 +83,7 @@ pub async fn handle_put_website( .update(Some(conf.into_garage_website_config()?)); garage .bucket_table - .insert(&Bucket::present(bucket_id, bucket_params)) + .insert((), &Bucket::present(bucket_id, bucket_params)) .await?; Ok(Response::builder() diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs index d72736bb..f34ce186 100644 --- a/src/api/signature/payload.rs +++ b/src/api/signature/payload.rs @@ -370,7 +370,7 @@ pub async fn verify_v4( let key = garage .key_table - .get(&EmptyKey, &auth.key_id) + .get((), &EmptyKey, &auth.key_id) .await? .filter(|k| !k.state.is_deleted()) .ok_or_else(|| Error::forbidden(format!("No such key: {}", &auth.key_id)))?; diff --git a/src/block/manager.rs b/src/block/manager.rs index 218ef9eb..c8fc40cf 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -29,6 +29,7 @@ use garage_util::metrics::RecordDuration; use garage_util::persister::{Persister, PersisterShared}; use garage_util::time::msec_to_rfc3339; +use garage_rpc::replication_mode::ConsistencyMode; use garage_rpc::rpc_helper::OrderTag; use garage_rpc::system::System; use garage_rpc::*; @@ -343,6 +344,7 @@ impl BlockManager { /// Send block to nodes that should have it pub async fn rpc_put_block( &self, + consistency_mode: ConsistencyMode, hash: Hash, data: Bytes, order_tag: Option, @@ -367,7 +369,7 @@ impl BlockManager { who.as_ref(), put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) - .with_quorum(self.system.layout_manager.write_quorum()), + .with_quorum(self.system.layout_manager.write_quorum(consistency_mode)), ) .await?; diff --git a/src/block/resync.rs b/src/block/resync.rs index 180e7bcf..62c3ed21 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -380,7 +380,12 @@ impl BlockResyncManager { .layout_manager .layout() .storage_nodes_of(hash); - if who.len() < manager.system.layout_manager.write_quorum() { + if who.len() + < manager + .system + .layout_manager + .write_quorum(Default::default()) + { return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string())); } who.retain(|id| *id != manager.system.id); diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index edeb88c0..7369ff85 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -30,7 +30,14 @@ impl AdminRpcHandler { let block_refs = self .garage .block_ref_table - .get_range(&hash, None, None, 10000, Default::default()) + .get_range( + Default::default(), + &hash, + None, + None, + 10000, + Default::default(), + ) .await?; let mut versions = vec![]; let mut uploads = vec![]; @@ -38,11 +45,16 @@ impl AdminRpcHandler { if let Some(v) = self .garage .version_table - .get(&br.version, &EmptyKey) + .get(Default::default(), &br.version, &EmptyKey) .await? { if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink { - if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { + if let Some(u) = self + .garage + .mpu_table + .get(Default::default(), upload_id, &EmptyKey) + .await? + { uploads.push(u); } } @@ -108,14 +120,21 @@ impl AdminRpcHandler { let block_refs = self .garage .block_ref_table - .get_range(&hash, None, None, 10000, Default::default()) + .get_range( + Default::default(), + &hash, + None, + None, + 10000, + Default::default(), + ) .await?; for br in block_refs { if let Some(version) = self .garage .version_table - .get(&br.version, &EmptyKey) + .get(Default::default(), &br.version, &EmptyKey) .await? { self.handle_block_purge_version_backlink( @@ -127,7 +146,10 @@ impl AdminRpcHandler { if !version.deleted.get() { let deleted_version = Version::new(version.uuid, version.backlink, true); - self.garage.version_table.insert(&deleted_version).await?; + self.garage + .version_table + .insert(Default::default(), &deleted_version) + .await?; ver_dels += 1; } } @@ -152,11 +174,19 @@ impl AdminRpcHandler { let (bucket_id, key, ov_id) = match &version.backlink { VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), VersionBacklink::MultipartUpload { upload_id } => { - if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { + if let Some(mut mpu) = self + .garage + .mpu_table + .get(Default::default(), upload_id, &EmptyKey) + .await? + { if !mpu.deleted.get() { mpu.parts.clear(); mpu.deleted.set(); - self.garage.mpu_table.insert(&mpu).await?; + self.garage + .mpu_table + .insert(Default::default(), &mpu) + .await?; *mpu_dels += 1; } (mpu.bucket_id, mpu.key.clone(), *upload_id) @@ -166,7 +196,12 @@ impl AdminRpcHandler { } }; - if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? { + if let Some(object) = self + .garage + .object_table + .get(Default::default(), &bucket_id, &key) + .await? + { let ov = object.versions().iter().rev().find(|v| v.is_complete()); if let Some(ov) = ov { if ov.uuid == ov_id { @@ -180,7 +215,10 @@ impl AdminRpcHandler { state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), }], ); - self.garage.object_table.insert(&deleted_object).await?; + self.garage + .object_table + .insert(Default::default(), &deleted_object) + .await?; *obj_dels += 1; } } diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs index 6b1190f8..eb9fde77 100644 --- a/src/garage/admin/bucket.rs +++ b/src/garage/admin/bucket.rs @@ -39,6 +39,7 @@ impl AdminRpcHandler { .garage .bucket_table .get_range( + (), &EmptyKey, None, Some(DeletedFilter::NotDeleted), @@ -63,12 +64,14 @@ impl AdminRpcHandler { .bucket_helper() .get_existing_bucket(bucket_id) .await?; + let bucket_params = bucket.state.as_option().unwrap(); + let c = *bucket_params.consistency_mode.get(); let counters = self .garage .object_counter_table .table - .get(&bucket_id, &EmptyKey) + .get(c, &bucket_id, &EmptyKey) .await? .map(|x| x.filtered_values(&self.garage.system.cluster_layout())) .unwrap_or_default(); @@ -77,42 +80,28 @@ impl AdminRpcHandler { .garage .mpu_counter_table .table - .get(&bucket_id, &EmptyKey) + .get(c, &bucket_id, &EmptyKey) .await? .map(|x| x.filtered_values(&self.garage.system.cluster_layout())) .unwrap_or_default(); let mut relevant_keys = HashMap::new(); - for (k, _) in bucket - .state - .as_option() - .unwrap() - .authorized_keys - .items() - .iter() - { + for (k, _) in bucket_params.authorized_keys.items().iter() { if let Some(key) = self .garage .key_table - .get(&EmptyKey, k) + .get((), &EmptyKey, k) .await? .filter(|k| !k.is_deleted()) { relevant_keys.insert(k.clone(), key); } } - for ((k, _), _, _) in bucket - .state - .as_option() - .unwrap() - .local_aliases - .items() - .iter() - { + for ((k, _), _, _) in bucket_params.local_aliases.items().iter() { if relevant_keys.contains_key(k) { continue; } - if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? { + if let Some(key) = self.garage.key_table.get((), &EmptyKey, k).await? { relevant_keys.insert(k.clone(), key); } } @@ -136,7 +125,12 @@ impl AdminRpcHandler { let helper = self.garage.locked_helper().await; - if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? { + if let Some(alias) = self + .garage + .bucket_alias_table + .get((), &EmptyKey, name) + .await? + { if alias.state.get().is_some() { return Err(Error::BadRequest(format!("Bucket {} already exists", name))); } @@ -145,7 +139,7 @@ impl AdminRpcHandler { // ---- done checking, now commit ---- let bucket = Bucket::new(); - self.garage.bucket_table.insert(&bucket).await?; + self.garage.bucket_table.insert((), &bucket).await?; helper.set_global_bucket_alias(bucket.id, name).await?; @@ -170,7 +164,7 @@ impl AdminRpcHandler { let bucket_alias = self .garage .bucket_alias_table - .get(&EmptyKey, &query.name) + .get((), &EmptyKey, &query.name) .await?; // Check bucket doesn't have other aliases @@ -225,7 +219,7 @@ impl AdminRpcHandler { // 3. delete bucket bucket.state = Deletable::delete(); - self.garage.bucket_table.insert(&bucket).await?; + self.garage.bucket_table.insert((), &bucket).await?; Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name))) } @@ -405,7 +399,7 @@ impl AdminRpcHandler { }; bucket_state.website_config.update(website); - self.garage.bucket_table.insert(&bucket).await?; + self.garage.bucket_table.insert((), &bucket).await?; let msg = if query.allow { format!("Website access allowed for {}", &query.bucket) @@ -462,7 +456,7 @@ impl AdminRpcHandler { } bucket_state.quotas.update(quotas); - self.garage.bucket_table.insert(&bucket).await?; + self.garage.bucket_table.insert((), &bucket).await?; Ok(AdminRpc::Ok(format!( "Quotas updated for {}", diff --git a/src/garage/admin/key.rs b/src/garage/admin/key.rs index bd010d2c..66c9f8ed 100644 --- a/src/garage/admin/key.rs +++ b/src/garage/admin/key.rs @@ -28,6 +28,7 @@ impl AdminRpcHandler { .garage .key_table .get_range( + (), &EmptyKey, None, Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), @@ -57,7 +58,7 @@ impl AdminRpcHandler { async fn handle_create_key(&self, query: &KeyNewOpt) -> Result { let key = Key::new(&query.name); - self.garage.key_table.insert(&key).await?; + self.garage.key_table.insert((), &key).await?; self.key_info_result(key).await } @@ -71,7 +72,7 @@ impl AdminRpcHandler { .unwrap() .name .update(query.new_name.clone()); - self.garage.key_table.insert(&key).await?; + self.garage.key_table.insert((), &key).await?; self.key_info_result(key).await } @@ -106,7 +107,7 @@ impl AdminRpcHandler { if query.create_bucket { key.params_mut().unwrap().allow_create_bucket.update(true); } - self.garage.key_table.insert(&key).await?; + self.garage.key_table.insert((), &key).await?; self.key_info_result(key).await } @@ -119,7 +120,7 @@ impl AdminRpcHandler { if query.create_bucket { key.params_mut().unwrap().allow_create_bucket.update(false); } - self.garage.key_table.insert(&key).await?; + self.garage.key_table.insert((), &key).await?; self.key_info_result(key).await } @@ -128,14 +129,18 @@ impl AdminRpcHandler { return Err(Error::BadRequest("This command is intended to re-import keys that were previously generated by Garage. If you want to create a new key, use `garage key new` instead. Add the --yes flag if you really want to re-import a key.".to_string())); } - let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; + let prev_key = self + .garage + .key_table + .get((), &EmptyKey, &query.key_id) + .await?; if prev_key.is_some() { return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); } let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name) .ok_or_bad_request("Invalid key format")?; - self.garage.key_table.insert(&imported_key).await?; + self.garage.key_table.insert((), &imported_key).await?; self.key_info_result(imported_key).await } @@ -151,7 +156,7 @@ impl AdminRpcHandler { .items() .iter() { - if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? { + if let Some(b) = self.garage.bucket_table.get((), &EmptyKey, id).await? { relevant_buckets.insert(*id, b); } } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 9e4de873..84d1038e 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -176,7 +176,7 @@ impl TableRepair for RepairVersions { let ref_exists = match &version.backlink { VersionBacklink::Object { bucket_id, key } => garage .object_table - .get(bucket_id, key) + .get(Default::default(), bucket_id, key) .await? .map(|o| { o.versions().iter().any(|x| { @@ -186,7 +186,7 @@ impl TableRepair for RepairVersions { .unwrap_or(false), VersionBacklink::MultipartUpload { upload_id } => garage .mpu_table - .get(upload_id, &EmptyKey) + .get(Default::default(), upload_id, &EmptyKey) .await? .map(|u| !u.deleted.get()) .unwrap_or(false), @@ -196,7 +196,10 @@ impl TableRepair for RepairVersions { info!("Repair versions: marking version as deleted: {:?}", version); garage .version_table - .insert(&Version::new(version.uuid, version.backlink, true)) + .insert( + Default::default(), + &Version::new(version.uuid, version.backlink, true), + ) .await?; return Ok(true); } @@ -222,7 +225,7 @@ impl TableRepair for RepairBlockRefs { if !block_ref.deleted.get() { let ref_exists = garage .version_table - .get(&block_ref.version, &EmptyKey) + .get(Default::default(), &block_ref.version, &EmptyKey) .await? .map(|v| !v.deleted.get()) .unwrap_or(false); @@ -233,7 +236,10 @@ impl TableRepair for RepairBlockRefs { block_ref ); block_ref.deleted.set(); - garage.block_ref_table.insert(&block_ref).await?; + garage + .block_ref_table + .insert(Default::default(), &block_ref) + .await?; return Ok(true); } } @@ -258,7 +264,7 @@ impl TableRepair for RepairMpu { if !mpu.deleted.get() { let ref_exists = garage .object_table - .get(&mpu.bucket_id, &mpu.key) + .get(Default::default(), &mpu.bucket_id, &mpu.key) .await? .map(|o| { o.versions() @@ -274,7 +280,7 @@ impl TableRepair for RepairMpu { ); mpu.parts.clear(); mpu.deleted.set(); - garage.mpu_table.insert(&mpu).await?; + garage.mpu_table.insert(Default::default(), &mpu).await?; return Ok(true); } } diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 1dbdfac2..d964b9fd 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,3 +1,4 @@ +use garage_rpc::replication_mode::ConsistencyMode; use garage_table::crdt::*; use garage_table::*; use garage_util::data::*; @@ -119,7 +120,94 @@ mod v08 { impl garage_util::migrate::InitialFormat for Bucket {} } -pub use v08::*; +mod v011 { + use super::v08; + pub use super::v08::{ + BucketQuotas, CorsRule, LifecycleExpiration, LifecycleFilter, LifecycleRule, WebsiteConfig, + }; + use crate::permission::BucketKeyPerm; + use garage_rpc::replication_mode::ConsistencyMode; + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + /// A bucket is a collection of objects + /// + /// Its parameters are not directly accessible as: + /// - It must be possible to merge paramaters, hence the use of a LWW CRDT. + /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Bucket { + /// ID of the bucket + pub id: Uuid, + /// State, and configuration if not deleted, of the bucket + pub state: crdt::Deletable, + } + + /// Configuration for a bucket + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct BucketParams { + /// Bucket's creation date + pub creation_date: u64, + /// Map of key with access to the bucket, and what kind of access they give + pub authorized_keys: crdt::Map, + + /// Map of aliases that are or have been given to this bucket + /// in the global namespace + /// (not authoritative: this is just used as an indication to + /// map back to aliases when doing ListBuckets) + pub aliases: crdt::LwwMap, + /// Map of aliases that are or have been given to this bucket + /// in namespaces local to keys + /// key = (access key id, alias name) + pub local_aliases: crdt::LwwMap<(String, String), bool>, + + /// Whether to enable read-after-write consistency for this bucket + pub consistency_mode: crdt::Lww, + /// Whether this bucket is allowed for website access + /// (under all of its global alias names), + /// and if so, the website configuration XML document + pub website_config: crdt::Lww>, + /// CORS rules + pub cors_config: crdt::Lww>>, + /// Lifecycle configuration + #[serde(default)] + pub lifecycle_config: crdt::Lww>>, + /// Bucket quotas + #[serde(default)] + pub quotas: crdt::Lww, + } + + impl garage_util::migrate::Migrate for Bucket { + const VERSION_MARKER: &'static [u8] = b"G011lh"; + + type Previous = v08::Bucket; + + fn migrate(previous: Self::Previous) -> Self { + Self { + id: previous.id, + state: match previous.state { + crdt::Deletable::Present(prev_state) => { + crdt::Deletable::Present(BucketParams { + creation_date: prev_state.creation_date, + authorized_keys: prev_state.authorized_keys, + aliases: prev_state.aliases, + local_aliases: prev_state.local_aliases, + website_config: prev_state.website_config, + cors_config: prev_state.cors_config, + lifecycle_config: prev_state.lifecycle_config, + quotas: prev_state.quotas, + consistency_mode: crdt::Lww::new(ConsistencyMode::Consistent), + }) + } + crdt::Deletable::Deleted => crdt::Deletable::Deleted, + }, + } + } + } +} + +pub use v011::*; impl AutoCrdt for BucketQuotas { const WARN_IF_DIFFERENT: bool = true; @@ -133,6 +221,7 @@ impl BucketParams { authorized_keys: crdt::Map::new(), aliases: crdt::LwwMap::new(), local_aliases: crdt::LwwMap::new(), + consistency_mode: crdt::Lww::new(ConsistencyMode::Consistent), website_config: crdt::Lww::new(None), cors_config: crdt::Lww::new(None), lifecycle_config: crdt::Lww::new(None), diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index a712d683..aec60de9 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -36,7 +36,7 @@ impl<'a> BucketHelper<'a> { Ok(self .0 .bucket_table - .get(&EmptyKey, &bucket_id) + .get((), &EmptyKey, &bucket_id) .await? .filter(|x| !x.state.is_deleted()) .map(|_| bucket_id)) @@ -44,7 +44,7 @@ impl<'a> BucketHelper<'a> { Ok(self .0 .bucket_alias_table - .get(&EmptyKey, bucket_name) + .get((), &EmptyKey, bucket_name) .await? .and_then(|x| *x.state.get())) } @@ -74,7 +74,7 @@ impl<'a> BucketHelper<'a> { Ok(self .0 .bucket_table - .get(&EmptyKey, &bucket_id) + .get((), &EmptyKey, &bucket_id) .await? .ok_or_message(format!("Bucket {:?} does not exist", bucket_id))?) } @@ -86,7 +86,7 @@ impl<'a> BucketHelper<'a> { pub async fn get_existing_bucket(&self, bucket_id: Uuid) -> Result { self.0 .bucket_table - .get(&EmptyKey, &bucket_id) + .get((), &EmptyKey, &bucket_id) .await? .filter(|b| !b.is_deleted()) .ok_or_else(|| Error::NoSuchBucket(hex::encode(bucket_id))) @@ -95,10 +95,20 @@ impl<'a> BucketHelper<'a> { // ---- pub async fn is_bucket_empty(&self, bucket_id: Uuid) -> Result { + let consistency_mode = *self + .get_existing_bucket(bucket_id) + .await? + .state + .as_option() + .unwrap() + .consistency_mode + .get(); + let objects = self .0 .object_table .get_range( + consistency_mode, &bucket_id, None, Some(ObjectFilter::IsData), @@ -124,6 +134,7 @@ impl<'a> BucketHelper<'a> { .counter_table .table .get_range( + consistency_mode, &bucket_id, None, Some((DeletedFilter::NotDeleted, node_id_vec)), @@ -151,6 +162,12 @@ impl<'a> BucketHelper<'a> { older_than: Duration, ) -> Result { let older_than = now_msec() - older_than.as_millis() as u64; + let consistency_mode = self + .get_existing_bucket(*bucket_id) + .await? + .params() + .map(|params| *params.consistency_mode.get()) + .unwrap_or_default(); let mut ret = 0usize; let mut start = None; @@ -160,6 +177,7 @@ impl<'a> BucketHelper<'a> { .0 .object_table .get_range( + consistency_mode, bucket_id, start, Some(ObjectFilter::IsUploading { @@ -196,7 +214,10 @@ impl<'a> BucketHelper<'a> { .collect::>(); ret += abortions.len(); - self.0.object_table.insert_many(abortions).await?; + self.0 + .object_table + .insert_many(consistency_mode, abortions) + .await?; if objects.len() < 1000 { break; diff --git a/src/model/helper/key.rs b/src/model/helper/key.rs index b8a99d55..203b2141 100644 --- a/src/model/helper/key.rs +++ b/src/model/helper/key.rs @@ -16,7 +16,7 @@ impl<'a> KeyHelper<'a> { Ok(self .0 .key_table - .get(&EmptyKey, key_id) + .get((), &EmptyKey, key_id) .await? .ok_or_message(format!("Key {} does not exist", key_id))?) } @@ -28,7 +28,7 @@ impl<'a> KeyHelper<'a> { pub async fn get_existing_key(&self, key_id: &String) -> Result { self.0 .key_table - .get(&EmptyKey, key_id) + .get((), &EmptyKey, key_id) .await? .filter(|b| !b.state.is_deleted()) .ok_or_else(|| Error::NoSuchAccessKey(key_id.to_string())) @@ -44,6 +44,7 @@ impl<'a> KeyHelper<'a> { .0 .key_table .get_range( + (), &EmptyKey, None, Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())), diff --git a/src/model/helper/locked.rs b/src/model/helper/locked.rs index f8e06add..d1e8adc9 100644 --- a/src/model/helper/locked.rs +++ b/src/model/helper/locked.rs @@ -56,7 +56,11 @@ impl<'a> LockedHelper<'a> { let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?; - let alias = self.0.bucket_alias_table.get(&EmptyKey, alias_name).await?; + let alias = self + .0 + .bucket_alias_table + .get((), &EmptyKey, alias_name) + .await?; if let Some(existing_alias) = alias.as_ref() { if let Some(p_bucket) = existing_alias.state.get() { @@ -88,10 +92,10 @@ impl<'a> LockedHelper<'a> { a } }; - self.0.bucket_alias_table.insert(&alias).await?; + self.0.bucket_alias_table.insert((), &alias).await?; bucket_p.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, true); - self.0.bucket_table.insert(&bucket).await?; + self.0.bucket_table.insert((), &bucket).await?; Ok(()) } @@ -112,7 +116,7 @@ impl<'a> LockedHelper<'a> { let mut alias = self .0 .bucket_alias_table - .get(&EmptyKey, alias_name) + .get((), &EmptyKey, alias_name) .await? .filter(|a| a.state.get().map(|x| x == bucket_id).unwrap_or(false)) .ok_or_message(format!( @@ -144,10 +148,10 @@ impl<'a> LockedHelper<'a> { // writes are now done and all writes use timestamp alias_ts alias.state = Lww::raw(alias_ts, None); - self.0.bucket_alias_table.insert(&alias).await?; + self.0.bucket_alias_table.insert((), &alias).await?; bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false); - self.0.bucket_table.insert(&bucket).await?; + self.0.bucket_table.insert((), &bucket).await?; Ok(()) } @@ -168,7 +172,7 @@ impl<'a> LockedHelper<'a> { let mut alias = self .0 .bucket_alias_table - .get(&EmptyKey, alias_name) + .get((), &EmptyKey, alias_name) .await? .ok_or_else(|| Error::NoSuchBucket(alias_name.to_string()))?; @@ -186,12 +190,12 @@ impl<'a> LockedHelper<'a> { if alias.state.get() == &Some(bucket_id) { alias.state = Lww::raw(alias_ts, None); - self.0.bucket_alias_table.insert(&alias).await?; + self.0.bucket_alias_table.insert((), &alias).await?; } if let Some(bucket_state) = bucket.state.as_option_mut() { bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false); - self.0.bucket_table.insert(&bucket).await?; + self.0.bucket_table.insert((), &bucket).await?; } Ok(()) @@ -245,10 +249,10 @@ impl<'a> LockedHelper<'a> { // writes are now done and all writes use timestamp alias_ts key_param.local_aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, Some(bucket_id)); - self.0.key_table.insert(&key).await?; + self.0.key_table.insert((), &key).await?; bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true); - self.0.bucket_table.insert(&bucket).await?; + self.0.bucket_table.insert((), &bucket).await?; Ok(()) } @@ -317,10 +321,10 @@ impl<'a> LockedHelper<'a> { // writes are now done and all writes use timestamp alias_ts key_param.local_aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, None); - self.0.key_table.insert(&key).await?; + self.0.key_table.insert((), &key).await?; bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false); - self.0.bucket_table.insert(&bucket).await?; + self.0.bucket_table.insert((), &bucket).await?; Ok(()) } @@ -365,12 +369,12 @@ impl<'a> LockedHelper<'a> { if let Some(bstate) = bucket.state.as_option_mut() { bstate.authorized_keys = Map::put_mutator(key_id.clone(), perm); - self.0.bucket_table.insert(&bucket).await?; + self.0.bucket_table.insert((), &bucket).await?; } if let Some(kstate) = key.state.as_option_mut() { kstate.authorized_buckets = Map::put_mutator(bucket_id, perm); - self.0.key_table.insert(&key).await?; + self.0.key_table.insert((), &key).await?; } Ok(()) @@ -403,7 +407,7 @@ impl<'a> LockedHelper<'a> { // 3. Actually delete key key.state = Deletable::delete(); - self.0.key_table.insert(key).await?; + self.0.key_table.insert((), key).await?; Ok(()) } diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index e15f2df8..140d52a6 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -23,6 +23,7 @@ use garage_util::data::*; use garage_util::error::*; use garage_util::time::now_msec; +use garage_rpc::replication_mode::ConsistencyMode; use garage_rpc::system::System; use garage_rpc::*; @@ -43,8 +44,8 @@ const TIMESTAMP_KEY: &[u8] = b"timestamp"; #[derive(Debug, Serialize, Deserialize)] enum K2VRpc { Ok, - InsertItem(InsertedItem), - InsertManyItems(Vec), + InsertItem(ConsistencyMode, InsertedItem), + InsertManyItems(ConsistencyMode, Vec), PollItem { key: PollKey, causal_context: CausalContext, @@ -113,6 +114,7 @@ impl K2VRpcHandler { pub async fn insert( &self, + consistency_mode: ConsistencyMode, bucket_id: Uuid, partition_key: String, sort_key: String, @@ -135,12 +137,15 @@ impl K2VRpcHandler { .try_call_many( &self.endpoint, &who, - K2VRpc::InsertItem(InsertedItem { - partition, - sort_key, - causal_context, - value, - }), + K2VRpc::InsertItem( + consistency_mode, + InsertedItem { + partition, + sort_key, + causal_context, + value, + }, + ), RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1), ) .await?; @@ -150,6 +155,7 @@ impl K2VRpcHandler { pub async fn insert_batch( &self, + consistency_mode: ConsistencyMode, bucket_id: Uuid, items: Vec<(String, String, Option, DvvsValue)>, ) -> Result<(), Error> { @@ -189,7 +195,7 @@ impl K2VRpcHandler { .try_call_many( &self.endpoint, &nodes[..], - K2VRpc::InsertManyItems(items), + K2VRpc::InsertManyItems(consistency_mode, items), RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1), ) .await?; @@ -206,6 +212,7 @@ impl K2VRpcHandler { pub async fn poll_item( &self, + consistency_mode: ConsistencyMode, bucket_id: Uuid, partition_key: String, sort_key: String, @@ -235,7 +242,12 @@ impl K2VRpcHandler { timeout_msec, }, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.item_table.data.replication.read_quorum()) + .with_quorum( + self.item_table + .data + .replication + .read_quorum(consistency_mode), + ) .send_all_at_once(true) .without_timeout(), ); @@ -266,6 +278,7 @@ impl K2VRpcHandler { pub async fn poll_range( &self, + consistency_mode: ConsistencyMode, range: PollRange, seen_str: Option, timeout_msec: u64, @@ -288,7 +301,11 @@ impl K2VRpcHandler { .data .replication .read_nodes(&range.partition.hash()); - let quorum = self.item_table.data.replication.read_quorum(); + let quorum = self + .item_table + .data + .replication + .read_quorum(consistency_mode); let msg = K2VRpc::PollRange { range, seen_str, @@ -376,7 +393,11 @@ impl K2VRpcHandler { // ---- internal handlers ---- - async fn handle_insert(&self, item: &InsertedItem) -> Result { + async fn handle_insert( + &self, + c: ConsistencyMode, + item: &InsertedItem, + ) -> Result { let new = { let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); self.local_insert(&local_timestamp_tree, item)? @@ -384,13 +405,17 @@ impl K2VRpcHandler { // Propagate to rest of network if let Some(updated) = new { - self.item_table.insert(&updated).await?; + self.item_table.insert(c, &updated).await?; } Ok(K2VRpc::Ok) } - async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result { + async fn handle_insert_many( + &self, + c: ConsistencyMode, + items: &[InsertedItem], + ) -> Result { let mut updated_vec = vec![]; { @@ -406,7 +431,7 @@ impl K2VRpcHandler { // Propagate to rest of network if !updated_vec.is_empty() { - self.item_table.insert_many(&updated_vec).await?; + self.item_table.insert_many(c, &updated_vec).await?; } Ok(K2VRpc::Ok) @@ -546,8 +571,8 @@ impl K2VRpcHandler { impl EndpointHandler for K2VRpcHandler { async fn handle(self: &Arc, message: &K2VRpc, _from: NodeID) -> Result { match message { - K2VRpc::InsertItem(item) => self.handle_insert(item).await, - K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await, + K2VRpc::InsertItem(c, item) => self.handle_insert(*c, item).await, + K2VRpc::InsertManyItems(c, items) => self.handle_insert_many(*c, &items[..]).await, K2VRpc::PollItem { key, causal_context, diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 8528382a..b03cf2fb 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use garage_rpc::replication_mode::ConsistencyMode; use garage_util::crdt::*; use garage_util::data::*; use garage_util::encode::nonversioned_decode; @@ -71,19 +72,23 @@ impl Migrate { self.garage .bucket_table - .insert(&Bucket { - id: bucket_id, - state: Deletable::Present(BucketParams { - creation_date: now_msec(), - authorized_keys: Map::new(), - aliases: LwwMap::new(), - local_aliases: LwwMap::new(), - website_config: Lww::new(website), - cors_config: Lww::new(None), - lifecycle_config: Lww::new(None), - quotas: Lww::new(Default::default()), - }), - }) + .insert( + (), + &Bucket { + id: bucket_id, + state: Deletable::Present(BucketParams { + creation_date: now_msec(), + authorized_keys: Map::new(), + aliases: LwwMap::new(), + local_aliases: LwwMap::new(), + consistency_mode: Lww::new(ConsistencyMode::Consistent), + website_config: Lww::new(website), + cors_config: Lww::new(None), + lifecycle_config: Lww::new(None), + quotas: Lww::new(Default::default()), + }), + }, + ) .await?; helper.set_global_bucket_alias(bucket_id, &new_name).await?; diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 50d4283f..9feb484d 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -253,7 +253,7 @@ async fn process_object( _ => { match garage .bucket_table - .get(&EmptyKey, &object.bucket_id) + .get((), &EmptyKey, &object.bucket_id) .await? { Some(b) => b, diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 846eea47..ef05415a 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -139,12 +139,14 @@ impl LayoutManager { } } - pub fn read_quorum(self: &Arc) -> usize { - self.replication_factor.read_quorum(self.consistency_mode) + pub fn read_quorum(self: &Arc, bucket_consistency_mode: ConsistencyMode) -> usize { + self.replication_factor + .read_quorum(bucket_consistency_mode.min(self.consistency_mode)) } - pub fn write_quorum(self: &Arc) -> usize { - self.replication_factor.write_quorum(self.consistency_mode) + pub fn write_quorum(self: &Arc, bucket_consistency_mode: ConsistencyMode) -> usize { + self.replication_factor + .write_quorum(bucket_consistency_mode.min(self.consistency_mode)) } // ---- ACK LOCKING ---- diff --git a/src/table/queue.rs b/src/table/queue.rs index ffe0a4a7..39f0a75f 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -51,7 +51,7 @@ impl Worker for InsertQueueWorker { return Ok(WorkerState::Idle); } - self.0.insert_many(values).await?; + self.0.insert_many(Default::default(), values).await?; self.0.data.insert_queue.db().transaction(|tx| { for (k, v) in kv_pairs.iter() { diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 1e52bb47..4dd76f10 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -25,6 +25,7 @@ pub struct TableFullReplication { impl TableReplication for TableFullReplication { type WriteSets = Vec>; + type ConsistencyParam = (); fn storage_nodes(&self, _hash: &Hash) -> Vec { let layout = self.system.cluster_layout(); @@ -34,14 +35,14 @@ impl TableReplication for TableFullReplication { fn read_nodes(&self, _hash: &Hash) -> Vec { vec![self.system.id] } - fn read_quorum(&self) -> usize { + fn read_quorum(&self, _: ()) -> usize { 1 } fn write_sets(&self, hash: &Hash) -> Self::WriteSets { vec![self.storage_nodes(hash)] } - fn write_quorum(&self) -> usize { + fn write_quorum(&self, _: ()) -> usize { let nmembers = self.system.cluster_layout().current().all_nodes().len(); let max_faults = if nmembers > 1 { 1 } else { 0 }; diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 682c1ea6..6edfd352 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -4,6 +4,7 @@ use garage_util::data::*; /// Trait to describe how a table shall be replicated pub trait TableReplication: Send + Sync + 'static { type WriteSets: AsRef>> + AsMut>> + Send + Sync + 'static; + type ConsistencyParam: Send + Default; // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods @@ -14,12 +15,12 @@ pub trait TableReplication: Send + Sync + 'static { /// Which nodes to send read requests to fn read_nodes(&self, hash: &Hash) -> Vec; /// Responses needed to consider a read succesfull - fn read_quorum(&self) -> usize; + fn read_quorum(&self, consistency_param: Self::ConsistencyParam) -> usize; /// Which nodes to send writes to fn write_sets(&self, hash: &Hash) -> Self::WriteSets; /// Responses needed to consider a write succesfull in each set - fn write_quorum(&self) -> usize; + fn write_quorum(&self, consistency_param: Self::ConsistencyParam) -> usize; // Accessing partitions, for Merkle tree & sync /// Get partition for data with given hash diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index fa5e48d7..40a3ccfa 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use garage_rpc::layout::manager::LayoutManager; use garage_rpc::layout::*; +use garage_rpc::replication_mode::*; use garage_util::data::*; use crate::replication::*; @@ -20,6 +21,7 @@ pub struct TableShardedReplication { impl TableReplication for TableShardedReplication { type WriteSets = WriteLock>>; + type ConsistencyParam = ConsistencyMode; fn storage_nodes(&self, hash: &Hash) -> Vec { self.layout_manager.layout().storage_nodes_of(hash) @@ -28,15 +30,15 @@ impl TableReplication for TableShardedReplication { fn read_nodes(&self, hash: &Hash) -> Vec { self.layout_manager.layout().read_nodes_of(hash) } - fn read_quorum(&self) -> usize { - self.layout_manager.read_quorum() + fn read_quorum(&self, c: ConsistencyMode) -> usize { + self.layout_manager.read_quorum(c) } fn write_sets(&self, hash: &Hash) -> Self::WriteSets { self.layout_manager.write_sets_of(hash) } - fn write_quorum(&self) -> usize { - self.layout_manager.write_quorum() + fn write_quorum(&self, c: ConsistencyMode) -> usize { + self.layout_manager.write_quorum(c) } fn partition_of(&self, hash: &Hash) -> Partition { diff --git a/src/table/sync.rs b/src/table/sync.rs index cd080df0..ab96c493 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -118,7 +118,7 @@ impl TableSyncer { ); let mut result_tracker = QuorumSetResultTracker::new( &partition.storage_sets, - self.data.replication.write_quorum(), + self.data.replication.write_quorum(Default::default()), ); let mut sync_futures = result_tracker @@ -190,7 +190,7 @@ impl TableSyncer { ); break; } - if nodes.len() < self.data.replication.write_quorum() { + if nodes.len() < self.data.replication.write_quorum(Default::default()) { return Err(Error::Message( "Not offloading as we don't have a quorum of nodes to write to." .to_string(), diff --git a/src/table/table.rs b/src/table/table.rs index a5be2910..8e5a6bb4 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -104,11 +104,11 @@ impl Table { bg.spawn_worker(InsertQueueWorker(self.clone())); } - pub async fn insert(&self, e: &F::E) -> Result<(), Error> { + pub async fn insert(&self, c: R::ConsistencyParam, e: &F::E) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} insert", F::TABLE_NAME)); - self.insert_internal(e) + self.insert_internal(c, e) .bound_record_duration(&self.data.metrics.put_request_duration) .with_context(Context::current_with_span(span)) .await?; @@ -118,7 +118,7 @@ impl Table { Ok(()) } - async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { + async fn insert_internal(&self, c: R::ConsistencyParam, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); let who = self.data.replication.write_sets(&hash); @@ -132,7 +132,7 @@ impl Table { who.as_ref(), rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.write_quorum()), + .with_quorum(self.data.replication.write_quorum(c)), ) .await?; @@ -144,7 +144,11 @@ impl Table { self.data.queue_insert(tx, e) } - pub async fn insert_many(self: &Arc, entries: I) -> Result<(), Error> + pub async fn insert_many( + self: &Arc, + c: R::ConsistencyParam, + entries: I, + ) -> Result<(), Error> where I: IntoIterator + Send + Sync, IE: Borrow + Send + Sync, @@ -152,7 +156,7 @@ impl Table { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} insert_many", F::TABLE_NAME)); - self.insert_many_internal(entries) + self.insert_many_internal(c, entries) .bound_record_duration(&self.data.metrics.put_request_duration) .with_context(Context::current_with_span(span)) .await?; @@ -162,7 +166,11 @@ impl Table { Ok(()) } - async fn insert_many_internal(self: &Arc, entries: I) -> Result<(), Error> + async fn insert_many_internal( + self: &Arc, + c: R::ConsistencyParam, + entries: I, + ) -> Result<(), Error> where I: IntoIterator + Send + Sync, IE: Borrow + Send + Sync, @@ -181,7 +189,7 @@ impl Table { // a quorum of nodes has answered OK, then the insert has succeeded and // consistency properties (read-after-write) are preserved. - let quorum = self.data.replication.write_quorum(); + let quorum = self.data.replication.write_quorum(c); // Serialize all entries and compute the write sets for each of them. // In the case of sharded table replication, this also takes an "ack lock" @@ -283,6 +291,7 @@ impl Table { pub async fn get( self: &Arc, + c: R::ConsistencyParam, partition_key: &F::P, sort_key: &F::S, ) -> Result, Error> { @@ -290,7 +299,7 @@ impl Table { let span = tracer.start(format!("{} get", F::TABLE_NAME)); let res = self - .get_internal(partition_key, sort_key) + .get_internal(c, partition_key, sort_key) .bound_record_duration(&self.data.metrics.get_request_duration) .with_context(Context::current_with_span(span)) .await?; @@ -302,6 +311,7 @@ impl Table { async fn get_internal( self: &Arc, + c: R::ConsistencyParam, partition_key: &F::P, sort_key: &F::S, ) -> Result, Error> { @@ -317,7 +327,7 @@ impl Table { &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()), + .with_quorum(self.data.replication.read_quorum(c)), ) .await?; @@ -359,6 +369,7 @@ impl Table { pub async fn get_range( self: &Arc, + c: R::ConsistencyParam, partition_key: &F::P, begin_sort_key: Option, filter: Option, @@ -370,6 +381,7 @@ impl Table { let res = self .get_range_internal( + c, partition_key, begin_sort_key, filter, @@ -387,6 +399,7 @@ impl Table { async fn get_range_internal( self: &Arc, + c: R::ConsistencyParam, partition_key: &F::P, begin_sort_key: Option, filter: Option, @@ -412,7 +425,7 @@ impl Table { &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()), + .with_quorum(self.data.replication.read_quorum(c)), ) .await?; diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 69939f65..813e5a47 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -28,6 +28,7 @@ use garage_api::s3::error::{ }; use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx}; +use garage_model::bucket_table::BucketParams; use garage_model::garage::Garage; use garage_table::*; @@ -182,11 +183,20 @@ impl WebServer { } } - async fn check_key_exists(self: &Arc, bucket_id: Uuid, key: &str) -> Result { + async fn check_key_exists( + self: &Arc, + bucket_id: Uuid, + bucket_params: &BucketParams, + key: &str, + ) -> Result { let exists = self .garage .object_table - .get(&bucket_id, &key.to_string()) + .get( + *bucket_params.consistency_mode.get(), + &bucket_id, + &key.to_string(), + ) .await? .map(|object| object.versions().iter().any(|v| v.is_data())) .unwrap_or(false); @@ -211,7 +221,7 @@ impl WebServer { let bucket_id = self .garage .bucket_alias_table - .get(&EmptyKey, &bucket_name.to_string()) + .get((), &EmptyKey, &bucket_name.to_string()) .await? .and_then(|x| x.state.take()) .ok_or(Error::NotFound)?; @@ -246,13 +256,22 @@ impl WebServer { .map_err(ApiError::from) .map(|res| res.map(|_empty_body: EmptyBody| empty_body())), Method::HEAD => { - handle_head_without_ctx(self.garage.clone(), req, bucket_id, &key, None).await + handle_head_without_ctx( + self.garage.clone(), + req, + bucket_id, + &bucket_params, + &key, + None, + ) + .await } Method::GET => { handle_get_without_ctx( self.garage.clone(), req, bucket_id, + &bucket_params, &key, None, Default::default(), @@ -265,7 +284,9 @@ impl WebServer { // Try implicit redirect on error let ret_doc_with_redir = match (&ret_doc, may_redirect) { (Err(ApiError::NoSuchKey), ImplicitRedirect::To { key, url }) - if self.check_key_exists(bucket_id, key.as_str()).await? => + if self + .check_key_exists(bucket_id, &bucket_params, key.as_str()) + .await? => { Ok(Response::builder() .status(StatusCode::FOUND) @@ -306,6 +327,7 @@ impl WebServer { self.garage.clone(), &req2, bucket_id, + &bucket_params, &error_document, None, Default::default(), -- 2.43.4