diff --git a/Cargo.lock b/Cargo.lock index 6032cdf8..1c66d7f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1404,7 +1404,6 @@ dependencies = [ "garage_db", "garage_net", "garage_rpc", - "garage_table", "garage_util", "hex", "opentelemetry", diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index a214dfa7..97b1fe0d 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -169,8 +169,7 @@ impl AdminApiServer { }; if token_required { - verify_authorization(&self.garage, global_token_hash, auth_header, request.name()) - .await?; + verify_authorization(&self.garage, global_token_hash, auth_header, request.name())?; } match request { @@ -245,7 +244,7 @@ fn hash_bearer_token(token: &str) -> String { .to_string() } -async fn verify_authorization( +fn verify_authorization( garage: &Garage, global_token_hash: Option<&str>, auth_header: Option, @@ -271,8 +270,7 @@ async fn verify_authorization( let token_hash_string = if let Some((prefix, _)) = token.split_once('.') { garage .admin_token_table - .get(&EmptyKey, &prefix.to_string()) - .await? + .get_local(&EmptyKey, &prefix.to_string())? .and_then(|k| k.state.into_option()) .filter(|p| { p.expiration diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 7f89d4b2..a91940d7 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -79,18 +79,24 @@ impl RequestHandler for GetBucketInfoRequest { garage: &Arc, _admin: &Admin, ) -> Result { - let bucket_id = match (self.id, self.global_alias, self.search) { - (Some(id), None, None) => parse_bucket_id(&id)?, - (None, Some(ga), None) => garage - .bucket_alias_table - .get(&EmptyKey, &ga) - .await? - .and_then(|x| *x.state.get()) - .ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?, + let bucket = match (self.id, self.global_alias, self.search) { + (Some(id), None, None) => { + let id = parse_bucket_id(&id)?; + garage.bucket_helper().get_existing_bucket(id).await? + } + (None, Some(ga), None) => { + let id = garage + .bucket_alias_table + .get(&EmptyKey, &ga) + .await? + .and_then(|x| *x.state.get()) + .ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?; + garage.bucket_helper().get_existing_bucket(id).await? + } (None, None, Some(search)) => { let helper = garage.bucket_helper(); - if let Some(uuid) = helper.resolve_global_bucket_name(&search).await? { - uuid + if let Some(bucket) = helper.resolve_global_bucket(&search).await? { + bucket } else { let hexdec = if search.len() >= 2 { search @@ -124,7 +130,7 @@ impl RequestHandler for GetBucketInfoRequest { if candidates.is_empty() { return Err(Error::Common(CommonError::NoSuchBucket(search.clone()))); } else if candidates.len() == 1 { - candidates.into_iter().next().unwrap().id + candidates.into_iter().next().unwrap() } else { return Err(Error::bad_request(format!( "Several matching buckets: {}", @@ -140,23 +146,18 @@ impl RequestHandler for GetBucketInfoRequest { } }; - bucket_info_results(garage, bucket_id).await + bucket_info_results(garage, bucket).await } } async fn bucket_info_results( garage: &Arc, - bucket_id: Uuid, + bucket: Bucket, ) -> Result { - let bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - let counters = garage .object_counter_table .table - .get(&bucket_id, &EmptyKey) + .get(&bucket.id, &EmptyKey) .await? .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); @@ -164,7 +165,7 @@ async fn bucket_info_results( let mpu_counters = garage .mpu_counter_table .table - .get(&bucket_id, &EmptyKey) + .get(&bucket.id, &EmptyKey) .await? .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); @@ -336,7 +337,7 @@ impl RequestHandler for CreateBucketRequest { } Ok(CreateBucketResponse( - bucket_info_results(garage, bucket.id).await?, + bucket_info_results(garage, bucket).await?, )) } } @@ -444,7 +445,7 @@ impl RequestHandler for UpdateBucketRequest { garage.bucket_table.insert(&bucket).await?; Ok(UpdateBucketResponse( - bucket_info_results(garage, bucket_id).await?, + bucket_info_results(garage, bucket).await?, )) } } @@ -534,7 +535,7 @@ pub async fn handle_bucket_change_key_perm( .set_bucket_key_permissions(bucket.id, &key.key_id, perm) .await?; - bucket_info_results(garage, bucket.id).await + bucket_info_results(garage, bucket).await } // ---- BUCKET ALIASES ---- @@ -551,11 +552,11 @@ impl RequestHandler for AddBucketAliasRequest { let helper = garage.locked_helper().await; - match self.alias { + let bucket = match self.alias { BucketAliasEnum::Global { global_alias } => { helper .set_global_bucket_alias(bucket_id, &global_alias) - .await?; + .await? } BucketAliasEnum::Local { local_alias, @@ -563,12 +564,12 @@ impl RequestHandler for AddBucketAliasRequest { } => { helper .set_local_bucket_alias(bucket_id, &access_key_id, &local_alias) - .await?; + .await? } - } + }; Ok(AddBucketAliasResponse( - bucket_info_results(garage, bucket_id).await?, + bucket_info_results(garage, bucket).await?, )) } } @@ -585,11 +586,11 @@ impl RequestHandler for RemoveBucketAliasRequest { let helper = garage.locked_helper().await; - match self.alias { + let bucket = match self.alias { BucketAliasEnum::Global { global_alias } => { helper .unset_global_bucket_alias(bucket_id, &global_alias) - .await?; + .await? } BucketAliasEnum::Local { local_alias, @@ -597,12 +598,12 @@ impl RequestHandler for RemoveBucketAliasRequest { } => { helper .unset_local_bucket_alias(bucket_id, &access_key_id, &local_alias) - .await?; + .await? } - } + }; Ok(RemoveBucketAliasResponse( - bucket_info_results(garage, bucket_id).await?, + bucket_info_results(garage, bucket).await?, )) } } diff --git a/src/api/admin/special.rs b/src/api/admin/special.rs index 0ecf82bc..0a4e6705 100644 --- a/src/api/admin/special.rs +++ b/src/api/admin/special.rs @@ -151,12 +151,11 @@ async fn check_domain(garage: &Arc, domain: &str) -> Result (domain.to_string(), true) }; - let bucket_id = match garage + let bucket = match garage .bucket_helper() - .resolve_global_bucket_name(&bucket_name) - .await? + .resolve_global_bucket_fast(&bucket_name)? { - Some(bucket_id) => bucket_id, + Some(b) => b, None => return Ok(false), }; @@ -164,11 +163,6 @@ async fn check_domain(garage: &Arc, domain: &str) -> Result return Ok(true); } - let bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - let bucket_state = bucket.state.as_option().unwrap(); let bucket_website_config = bucket_state.website_config.get(); diff --git a/src/api/common/cors.rs b/src/api/common/cors.rs index 09b55c13..a0ba6e48 100644 --- a/src/api/common/cors.rs +++ b/src/api/common/cors.rs @@ -9,9 +9,7 @@ use hyper::{body::Body, body::Incoming as IncomingBody, Request, Response, Statu use garage_model::bucket_table::{BucketParams, CorsRule as GarageCorsRule}; use garage_model::garage::Garage; -use crate::common_error::{ - helper_error_as_internal, CommonError, OkOrBadRequest, OkOrInternalError, -}; +use crate::common_error::{CommonError, OkOrBadRequest, OkOrInternalError}; use crate::helpers::*; pub fn find_matching_cors_rule<'a, B>( @@ -76,7 +74,7 @@ pub fn add_cors_headers( Ok(()) } -pub async fn handle_options_api( +pub fn handle_options_api( garage: Arc, req: &Request, bucket_name: Option, @@ -93,16 +91,8 @@ pub async fn handle_options_api( // OPTIONS calls are not auhtenticated). if let Some(bn) = bucket_name { let helper = garage.bucket_helper(); - let bucket_id = helper - .resolve_global_bucket_name(&bn) - .await - .map_err(helper_error_as_internal)?; - if let Some(id) = bucket_id { - let bucket = garage - .bucket_helper() - .get_existing_bucket(id) - .await - .map_err(helper_error_as_internal)?; + let bucket_opt = helper.resolve_global_bucket_fast(&bn)?; + if let Some(bucket) = bucket_opt { let bucket_params = bucket.state.into_option().unwrap(); handle_options_for_bucket(req, &bucket_params) } else { diff --git a/src/api/common/signature/mod.rs b/src/api/common/signature/mod.rs index 50fbd304..6f1748c3 100644 --- a/src/api/common/signature/mod.rs +++ b/src/api/common/signature/mod.rs @@ -64,12 +64,12 @@ pub struct VerifiedRequest { pub content_sha256_header: ContentSha256Header, } -pub async fn verify_request( +pub fn verify_request( garage: &Garage, mut req: Request, service: &'static str, ) -> Result { - let checked_signature = payload::check_payload_signature(&garage, &mut req, service).await?; + let checked_signature = payload::check_payload_signature(&garage, &mut req, service)?; let request = streaming::parse_streaming_body( req, diff --git a/src/api/common/signature/payload.rs b/src/api/common/signature/payload.rs index 2d5f8603..8386607d 100644 --- a/src/api/common/signature/payload.rs +++ b/src/api/common/signature/payload.rs @@ -32,7 +32,7 @@ pub struct CheckedSignature { pub signature_header: Option, } -pub async fn check_payload_signature( +pub fn check_payload_signature( garage: &Garage, request: &mut Request, service: &'static str, @@ -43,9 +43,9 @@ pub async fn check_payload_signature( // We check for presigned-URL-style authentication first, because // the browser or something else could inject an Authorization header // that is totally unrelated to AWS signatures. - check_presigned_signature(garage, service, request, query).await + check_presigned_signature(garage, service, request, query) } else if request.headers().contains_key(AUTHORIZATION) { - check_standard_signature(garage, service, request, query).await + check_standard_signature(garage, service, request, query) } else { // Unsigned (anonymous) request let content_sha256 = request @@ -93,7 +93,7 @@ fn parse_x_amz_content_sha256(header: Option<&str>) -> Result, @@ -128,7 +128,7 @@ async fn check_standard_signature( trace!("canonical request:\n{}", canonical_request); trace!("string to sign:\n{}", string_to_sign); - let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?; + let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes())?; let content_sha256_header = parse_x_amz_content_sha256(Some(&authorization.content_sha256))?; @@ -139,7 +139,7 @@ async fn check_standard_signature( }) } -async fn check_presigned_signature( +fn check_presigned_signature( garage: &Garage, service: &'static str, request: &mut Request, @@ -178,7 +178,7 @@ async fn check_presigned_signature( trace!("canonical request (presigned url):\n{}", canonical_request); trace!("string to sign (presigned url):\n{}", string_to_sign); - let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?; + let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes())?; // In the page on presigned URLs, AWS specifies that if a signed query // parameter and a signed header of the same name have different values, @@ -378,7 +378,7 @@ pub fn parse_date(date: &str) -> Result, Error> { Ok(Utc.from_utc_datetime(&date)) } -pub async fn verify_v4( +pub fn verify_v4( garage: &Garage, service: &str, auth: &Authorization, @@ -391,8 +391,7 @@ pub async fn verify_v4( let key = garage .key_table - .get(&EmptyKey, &auth.key_id) - .await? + .get_local(&EmptyKey, &auth.key_id)? .filter(|k| !k.state.is_deleted()) .ok_or_else(|| Error::forbidden(format!("No such key: {}", &auth.key_id)))?; let key_p = key.params().unwrap(); diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index dfa22dd2..8ace37d4 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -77,25 +77,19 @@ impl ApiHandler for K2VApiServer { // The OPTIONS method is processed early, before we even check for an API key if let Endpoint::Options = endpoint { let options_res = handle_options_api(garage, &req, Some(bucket_name)) - .await .ok_or_bad_request("Error handling OPTIONS")?; return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let verified_request = verify_request(&garage, req, "k2v").await?; + let verified_request = verify_request(&garage, req, "k2v")?; let req = verified_request.request; let api_key = verified_request.access_key; - let bucket_id = garage - .bucket_helper() - .resolve_bucket(&bucket_name, &api_key) - .await - .map_err(pass_helper_error)?; let bucket = garage .bucket_helper() - .get_existing_bucket(bucket_id) - .await - .map_err(helper_error_as_internal)?; + .resolve_bucket_fast(&bucket_name, &api_key) + .map_err(pass_helper_error)?; + let bucket_id = bucket.id; let bucket_params = bucket.state.into_option().unwrap(); let allowed = match endpoint.authorization_type() { diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs index 257ff893..55737268 100644 --- a/src/api/k2v/error.rs +++ b/src/api/k2v/error.rs @@ -2,8 +2,8 @@ use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; +pub(crate) use garage_api_common::common_error::pass_helper_error; use garage_api_common::common_error::{commonErrorDerivative, CommonError}; -pub(crate) use garage_api_common::common_error::{helper_error_as_internal, pass_helper_error}; pub use garage_api_common::common_error::{ CommonErrorDerivative, OkOrBadRequest, OkOrInternalError, }; diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 4cca21ed..1c967d58 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -118,11 +118,11 @@ impl ApiHandler for S3ApiServer { return handle_post_object(garage, req, bucket_name.unwrap()).await; } if let Endpoint::Options = endpoint { - let options_res = handle_options_api(garage, &req, bucket_name).await?; + let options_res = handle_options_api(garage, &req, bucket_name)?; return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let verified_request = verify_request(&garage, req, "s3").await?; + let verified_request = verify_request(&garage, req, "s3")?; let req = verified_request.request; let api_key = verified_request.access_key; @@ -140,15 +140,11 @@ impl ApiHandler for S3ApiServer { return handle_create_bucket(&garage, req, &api_key.key_id, bucket_name).await; } - let bucket_id = garage - .bucket_helper() - .resolve_bucket(&bucket_name, &api_key) - .await - .map_err(pass_helper_error)?; let bucket = garage .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; + .resolve_bucket_fast(&bucket_name, &api_key) + .map_err(pass_helper_error)?; + let bucket_id = bucket.id; let bucket_params = bucket.state.into_option().unwrap(); let allowed = match endpoint.authorization_type() { diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 3a09e769..7b5d714f 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -143,21 +143,16 @@ pub async fn handle_create_bucket( let api_key = helper.key().get_existing_key(api_key_id).await?; let key_params = api_key.params().unwrap(); - let existing_bucket = if let Some(Some(bucket_id)) = key_params.local_aliases.get(&bucket_name) - { - Some(*bucket_id) - } else { - helper - .bucket() - .resolve_global_bucket_name(&bucket_name) - .await? - }; + let existing_bucket = helper + .bucket() + .resolve_bucket(&bucket_name, &api_key.key_id) + .await?; - if let Some(bucket_id) = existing_bucket { + if let Some(bucket) = existing_bucket { // Check we have write or owner permission on the bucket, // in that case it's fine, return 200 OK, bucket exists; // otherwise return a forbidden error. - let kp = api_key.bucket_permissions(&bucket_id); + let kp = api_key.bucket_permissions(&bucket.id); if !(kp.allow_write || kp.allow_owner) { return Err(CommonError::BucketAlreadyExists.into()); } diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 7c67a65d..8892d4ff 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -683,16 +683,15 @@ async fn get_copy_source(ctx: &ReqCtx, req: &Request) -> Result) -> Result, @@ -122,7 +120,7 @@ impl BlockManager { pub fn new( db: &db::Db, config: &Config, - replication: TableShardedReplication, + write_quorum: usize, system: Arc, ) -> Result, Error> { // Load or compute layout, i.e. assignment of data blocks to the different data directories @@ -166,7 +164,7 @@ impl BlockManager { let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); let block_manager = Arc::new(Self { - replication, + write_quorum, data_layout: ArcSwap::new(Arc::new(data_layout)), data_layout_persister, data_fsync: config.data_fsync, @@ -338,6 +336,19 @@ impl BlockManager { Err(err) } + /// Returns the set of nodes that should store a copy of a given block. + /// These are the nodes assigned to the block's hash in the current + /// layout version only: since blocks are immutable, we don't need to + /// do complex logic when several layour versions are active at once, + /// just move them directly to the new nodes. + pub(crate) fn storage_nodes_of(&self, hash: &Hash) -> Vec { + self.system + .cluster_layout() + .current() + .nodes_of(hash) + .collect() + } + // ---- Public interface ---- /// Ask nodes that might have a block for it, return it as a stream @@ -370,7 +381,7 @@ impl BlockManager { prevent_compression: bool, order_tag: Option, ) -> Result<(), Error> { - let who = self.system.cluster_layout().current_storage_nodes_of(&hash); + let who = self.storage_nodes_of(&hash); let compression_level = self.compression_level.filter(|_| !prevent_compression); let (header, bytes) = DataBlock::from_buffer(data, compression_level) @@ -400,7 +411,7 @@ impl BlockManager { put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) .with_drop_on_completion(permit) - .with_quorum(self.replication.write_quorum()), + .with_quorum(self.write_quorum), ) .await?; diff --git a/src/block/resync.rs b/src/block/resync.rs index b476a0b8..307f7c48 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -27,8 +27,6 @@ use garage_util::tranquilizer::Tranquilizer; use garage_rpc::system::System; use garage_rpc::*; -use garage_table::replication::TableReplication; - use crate::manager::*; // The delay between the time where a resync operation fails @@ -377,11 +375,8 @@ impl BlockResyncManager { info!("Resync block {:?}: offloading and deleting", hash); let existing_path = existing_path.unwrap(); - let mut who = manager - .system - .cluster_layout() - .current_storage_nodes_of(hash); - if who.len() < manager.replication.write_quorum() { + let mut who = manager.storage_nodes_of(hash); + if who.len() < manager.write_quorum { return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string())); } who.retain(|id| *id != manager.system.id); @@ -463,10 +458,7 @@ impl BlockResyncManager { // First, check whether we are still supposed to store that // block in the latest cluster layout version. - let storage_nodes = manager - .system - .cluster_layout() - .current_storage_nodes_of(&hash); + let storage_nodes = manager.storage_nodes_of(&hash); if !storage_nodes.contains(&manager.system.id) { info!( diff --git a/src/model/garage.rs b/src/model/garage.rs index 95f7b577..a7e0b62b 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -154,13 +154,6 @@ impl Garage { info!("Initialize membership management system..."); let system = System::new(network_key, replication_factor, consistency_mode, &config)?; - let data_rep_param = TableShardedReplication { - system: system.clone(), - replication_factor: replication_factor.into(), - write_quorum: replication_factor.write_quorum(consistency_mode), - read_quorum: 1, - }; - let meta_rep_param = TableShardedReplication { system: system.clone(), replication_factor: replication_factor.into(), @@ -173,7 +166,8 @@ impl Garage { }; info!("Initialize block manager..."); - let block_manager = BlockManager::new(&db, &config, data_rep_param, system.clone())?; + let block_write_quorum = replication_factor.write_quorum(consistency_mode); + let block_manager = BlockManager::new(&db, &config, block_write_quorum, system.clone())?; block_manager.register_bg_vars(&mut bg_vars); // ---- admin tables ---- diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index a712d683..ebbe95ca 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -1,7 +1,7 @@ use std::time::Duration; use garage_util::data::*; -use garage_util::error::OkOrMessage; +use garage_util::error::{Error as GarageError, OkOrMessage}; use garage_util::time::*; use garage_table::util::*; @@ -16,61 +16,172 @@ pub struct BucketHelper<'a>(pub(crate) &'a Garage); #[allow(clippy::ptr_arg)] impl<'a> BucketHelper<'a> { - pub async fn resolve_global_bucket_name( + // ================ + // Local functions to find buckets FAST. + // This is only for the fast path in API requests. + // They do not provide the read-after-write guarantee + // when used in conjunction with other operations that + // modify buckets and bucket aliases. + // ================ + + /// Return bucket corresponding to global bucket name, if it exists + /// (and is not a tombstone entry). + /// + /// The name can be of two forms: + /// 1. A global bucket alias + /// 2. The full ID of a bucket encoded in hex + /// + /// Note that there is no possible ambiguity between the two forms, + /// as the maximum length of a bucket name is 63 characters, and the full + /// hex id is 64 chars long. + /// + /// This will not do any network interaction to check the alias and + /// bucket tables, it will only check the local copy of the table. + /// As a consequence, it does not provide read-after-write guarantees. + pub fn resolve_global_bucket_fast( &self, bucket_name: &String, - ) -> Result, Error> { - // Bucket names in Garage are aliases, true bucket identifiers - // are 32-byte UUIDs. This function resolves bucket names into - // their full identifier by looking up in the bucket_alias_table. - // This function also allows buckets to be identified by their - // full UUID (hex-encoded). Here, if the name to be resolved is a - // hex string of the correct length, it is directly parsed as a bucket - // identifier which is returned. There is no risk of this conflicting - // with an actual bucket name: bucket names are max 63 chars long by - // the AWS spec, and hex-encoded UUIDs are 64 chars long. + ) -> Result, GarageError> { let hexbucket = hex::decode(bucket_name.as_str()) .ok() .and_then(|by| Uuid::try_from(&by)); - if let Some(bucket_id) = hexbucket { - Ok(self - .0 - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .filter(|x| !x.state.is_deleted()) - .map(|_| bucket_id)) - } else { - Ok(self - .0 - .bucket_alias_table - .get(&EmptyKey, bucket_name) - .await? - .and_then(|x| *x.state.get())) - } + let bucket_id = match hexbucket { + Some(id) => id, + None => { + let alias = self + .0 + .bucket_alias_table + .get_local(&EmptyKey, bucket_name)? + .and_then(|x| *x.state.get()); + match alias { + Some(id) => id, + None => return Ok(None), + } + } + }; + Ok(self + .0 + .bucket_table + .get_local(&EmptyKey, &bucket_id)? + .filter(|x| !x.state.is_deleted())) } + /// Return bucket corresponding to a bucket name from the perspective of + /// a given access key, if it exists (and is not a tombstone entry). + /// + /// The name can be of three forms: + /// 1. A global bucket alias + /// 2. A local bucket alias + /// 3. The full ID of a bucket encoded in hex + /// + /// This will not do any network interaction, it will only check the local + /// copy of the bucket and global alias table. It will also resolve local + /// aliases directly using the data provided in the `api_key` parameter. + /// As a consequence, it does not provide read-after-write guarantees. + /// + /// In case no such bucket is found, this function returns a NoSuchBucket error. #[allow(clippy::ptr_arg)] - pub async fn resolve_bucket(&self, bucket_name: &String, api_key: &Key) -> Result { + pub fn resolve_bucket_fast( + &self, + bucket_name: &String, + api_key: &Key, + ) -> Result { let api_key_params = api_key .state .as_option() .ok_or_message("Key should not be deleted at this point")?; - if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) { - Ok(*bucket_id) - } else { + let bucket_opt = + if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) { + self.0 + .bucket_table + .get_local(&EmptyKey, &bucket_id)? + .filter(|x| !x.state.is_deleted()) + } else { + self.resolve_global_bucket_fast(bucket_name)? + }; + bucket_opt.ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string())) + } + + // ================ + // Global functions that do quorum reads/writes, + // for admin operations. + // ================ + + /// This is the same as `resolve_global_bucket_fast`, + /// except that it does quorum reads to ensure consistency. + pub async fn resolve_global_bucket( + &self, + bucket_name: &String, + ) -> Result, GarageError> { + let hexbucket = hex::decode(bucket_name.as_str()) + .ok() + .and_then(|by| Uuid::try_from(&by)); + let bucket_id = match hexbucket { + Some(id) => id, + None => { + let alias = self + .0 + .bucket_alias_table + .get(&EmptyKey, bucket_name) + .await? + .and_then(|x| *x.state.get()); + match alias { + Some(id) => id, + None => return Ok(None), + } + } + }; + Ok(self + .0 + .bucket_table + .get(&EmptyKey, &bucket_id) + .await? + .filter(|x| !x.state.is_deleted())) + } + + /// Return bucket corresponding to a bucket name from the perspective of + /// a given access key, if it exists (and is not a tombstone entry). + /// + /// This is the same as `resolve_bucket_fast`, with the following differences: + /// + /// - this function does quorum reads to ensure consistency. + /// - this function fetches the Key entry from the key table to ensure up-to-date data + /// - this function returns None if the bucket is not found, instead of HelperError::NoSuchBucket + #[allow(clippy::ptr_arg)] + pub async fn resolve_bucket( + &self, + bucket_name: &String, + key_id: &String, + ) -> Result, GarageError> { + let local_alias = self + .0 + .key_table + .get(&EmptyKey, &key_id) + .await? + .and_then(|k| k.state.into_option()) + .ok_or_else(|| GarageError::Message(format!("access key {} has been deleted", key_id)))? + .local_aliases + .get(bucket_name) + .copied() + .flatten(); + + if let Some(bucket_id) = local_alias { Ok(self - .resolve_global_bucket_name(bucket_name) + .0 + .bucket_table + .get(&EmptyKey, &bucket_id) .await? - .ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string()))?) + .filter(|x| !x.state.is_deleted())) + } else { + Ok(self.resolve_global_bucket(bucket_name).await?) } } /// Returns a Bucket if it is present in bucket table, /// even if it is in deleted state. Querying a non-existing /// bucket ID returns an internal error. - pub async fn get_internal_bucket(&self, bucket_id: Uuid) -> Result { + pub(crate) async fn get_internal_bucket(&self, bucket_id: Uuid) -> Result { Ok(self .0 .bucket_table diff --git a/src/model/helper/locked.rs b/src/model/helper/locked.rs index 482e91b0..9d6a8d36 100644 --- a/src/model/helper/locked.rs +++ b/src/model/helper/locked.rs @@ -6,6 +6,7 @@ use garage_util::time::*; use garage_table::util::*; use crate::bucket_alias_table::*; +use crate::bucket_table::*; use crate::garage::Garage; use crate::helper::bucket::BucketHelper; use crate::helper::error::*; @@ -56,7 +57,7 @@ impl<'a> LockedHelper<'a> { &self, bucket_id: Uuid, alias_name: &String, - ) -> Result<(), Error> { + ) -> Result { if !is_valid_bucket_name(alias_name) { return Err(Error::InvalidBucketName(alias_name.to_string())); } @@ -100,7 +101,7 @@ impl<'a> LockedHelper<'a> { bucket_p.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, true); self.0.bucket_table.insert(&bucket).await?; - Ok(()) + Ok(bucket) } /// Unsets an alias for a bucket in global namespace. @@ -112,7 +113,7 @@ impl<'a> LockedHelper<'a> { &self, bucket_id: Uuid, alias_name: &String, - ) -> Result<(), Error> { + ) -> Result { let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?; let bucket_state = bucket.state.as_option_mut().unwrap(); @@ -156,7 +157,7 @@ impl<'a> LockedHelper<'a> { bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false); self.0.bucket_table.insert(&bucket).await?; - Ok(()) + Ok(bucket) } /// Ensures a bucket does not have a certain global alias. @@ -215,7 +216,7 @@ impl<'a> LockedHelper<'a> { bucket_id: Uuid, key_id: &String, alias_name: &String, - ) -> Result<(), Error> { + ) -> Result { let key_helper = KeyHelper(self.0); if !is_valid_bucket_name(alias_name) { @@ -257,7 +258,7 @@ impl<'a> LockedHelper<'a> { bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true); self.0.bucket_table.insert(&bucket).await?; - Ok(()) + Ok(bucket) } /// Unsets an alias for a bucket in the local namespace of a key. @@ -271,7 +272,7 @@ impl<'a> LockedHelper<'a> { bucket_id: Uuid, key_id: &String, alias_name: &String, - ) -> Result<(), Error> { + ) -> Result { let key_helper = KeyHelper(self.0); let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?; @@ -330,7 +331,7 @@ impl<'a> LockedHelper<'a> { bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false); self.0.bucket_table.insert(&bucket).await?; - Ok(()) + Ok(bucket) } /// Sets permissions for a key on a bucket. diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 821f4549..ddc356b5 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -451,10 +451,7 @@ impl K2VRpcHandler { let mut value = self .item_table - .data - .read_entry(&key.partition, &key.sort_key)? - .map(|bytes| self.item_table.data.decode_entry(&bytes[..])) - .transpose()? + .get_local(&key.partition, &key.sort_key)? .unwrap_or_else(|| { K2VItem::new( key.partition.bucket_id, diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index c08a5629..35746851 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -149,14 +149,27 @@ impl LayoutHelper { self.layout.as_ref().unwrap() } + /// Returns the current layout version pub fn current(&self) -> &LayoutVersion { self.inner().current() } + /// Returns all layout versions currently active in the cluster pub fn versions(&self) -> &[LayoutVersion] { &self.inner().versions } + /// Returns the latest layout version for which it is safe to read data from, + /// i.e. the version whose version number is sync_map_min + pub fn read_version(&self) -> &LayoutVersion { + let sync_min = self.sync_map_min; + self.versions() + .iter() + .find(|x| x.version == sync_min) + .or(self.versions().last()) + .unwrap() + } + pub fn is_check_ok(&self) -> bool { self.is_check_ok } @@ -181,6 +194,8 @@ impl LayoutHelper { self.sync_map_min } + // ---- helpers for layout synchronization ---- + pub fn sync_digest(&self) -> SyncLayoutDigest { SyncLayoutDigest { current: self.current().version, @@ -189,50 +204,7 @@ impl LayoutHelper { } } - pub fn read_nodes_of(&self, position: &Hash) -> Vec { - let sync_min = self.sync_map_min; - let version = self - .versions() - .iter() - .find(|x| x.version == sync_min) - .or(self.versions().last()) - .unwrap(); - version - .nodes_of(position, version.replication_factor) - .collect() - } - - pub fn storage_sets_of(&self, position: &Hash) -> Vec> { - self.versions() - .iter() - .map(|x| x.nodes_of(position, x.replication_factor).collect()) - .collect() - } - - pub fn storage_nodes_of(&self, position: &Hash) -> Vec { - let mut ret = vec![]; - for version in self.versions().iter() { - ret.extend(version.nodes_of(position, version.replication_factor)); - } - ret.sort(); - ret.dedup(); - ret - } - - pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec { - let ver = self.current(); - ver.nodes_of(position, ver.replication_factor).collect() - } - - pub fn trackers_hash(&self) -> Hash { - self.trackers_hash - } - - pub fn staging_hash(&self) -> Hash { - self.staging_hash - } - - pub fn digest(&self) -> RpcLayoutDigest { + pub(crate) fn digest(&self) -> RpcLayoutDigest { RpcLayoutDigest { current_version: self.current().version, active_versions: self.versions().len(), diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 16c32fb2..1e6bc84b 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -180,9 +180,7 @@ impl LayoutHistory { // Determine set of nodes for partition p in layout version v. // Sort the node set to avoid duplicate computations. - let mut set = v - .nodes_of(&p_hash, v.replication_factor) - .collect::>(); + let mut set = v.nodes_of(&p_hash).collect::>(); set.sort(); // If this set was already processed, skip it. diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 21907ec7..0c75742b 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -143,16 +143,19 @@ impl LayoutManager { // ---- ACK LOCKING ---- - pub fn write_sets_of(self: &Arc, position: &Hash) -> WriteLock>> { + pub fn write_lock_with(self: &Arc, f: F) -> WriteLock + where + F: FnOnce(&LayoutHelper) -> T, + { let layout = self.layout(); let version = layout.current().version; - let nodes = layout.storage_sets_of(position); + let value = f(&layout); layout .ack_lock .get(&version) .unwrap() .fetch_add(1, Ordering::Relaxed); - WriteLock::new(version, self, nodes) + WriteLock::new(version, self, value) } // ---- INTERNALS --- diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 90a51de7..fdcccc46 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -114,9 +114,7 @@ impl LayoutVersion { } /// Return the n servers in which data for this hash should be replicated - pub fn nodes_of(&self, position: &Hash, n: usize) -> impl Iterator + '_ { - assert_eq!(n, self.replication_factor); - + pub fn nodes_of(&self, position: &Hash) -> impl Iterator + '_ { let data = &self.ring_assignment_data; let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) { diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 2505c2ce..87fff5d6 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -573,7 +573,7 @@ impl RpcHelper { // Compute, for each layout version, the set of nodes that might store // the block, and put them in their preferred order as of `request_order`. let mut vernodes = layout.versions().iter().map(|ver| { - let nodes = ver.nodes_of(position, ver.replication_factor); + let nodes = ver.nodes_of(position); rpc_helper.request_order(layout.current(), nodes) }); @@ -607,7 +607,7 @@ impl RpcHelper { // Second step: add nodes of older layout versions let old_ver_iter = layout.inner().old_versions.iter().rev(); for ver in old_ver_iter { - let nodes = ver.nodes_of(position, ver.replication_factor); + let nodes = ver.nodes_of(position); for node in rpc_helper.request_order(layout.current(), nodes) { if !ret.contains(&node) { ret.push(node); diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 198a5f6b..800b37f3 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -475,10 +475,7 @@ impl System { let mut partitions_quorum = 0; let mut partitions_all_ok = 0; for (_, hash) in partitions.iter() { - let mut write_sets = layout - .versions() - .iter() - .map(|x| x.nodes_of(hash, x.replication_factor)); + let mut write_sets = layout.versions().iter().map(|x| x.nodes_of(hash)); let has_quorum = write_sets .clone() .all(|set| set.filter(|x| node_up(x)).count() >= quorum); diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 39e29580..2ede578e 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use garage_rpc::layout::*; use garage_rpc::system::System; @@ -24,29 +25,53 @@ pub struct TableFullReplication { } impl TableReplication for TableFullReplication { - type WriteSets = Vec>; + type WriteSets = WriteLock>>; + + // Do anti-entropy every 10 seconds. + // Compared to sharded tables, anti-entropy is much less costly as there is + // a single partition hash to exchange. + // Also, it's generally a much bigger problem for fullcopy tables to be out of sync. + const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10); fn storage_nodes(&self, _hash: &Hash) -> Vec { - let layout = self.system.cluster_layout(); - layout.current().all_nodes().to_vec() + self.system.cluster_layout().all_nodes().to_vec() } fn read_nodes(&self, _hash: &Hash) -> Vec { - vec![self.system.id] + self.system + .cluster_layout() + .read_version() + .all_nodes() + .to_vec() } fn read_quorum(&self) -> usize { - 1 + let layout = self.system.cluster_layout(); + let nodes = layout.read_version().all_nodes(); + nodes.len().div_euclid(2) + 1 } - fn write_sets(&self, hash: &Hash) -> Self::WriteSets { - vec![self.storage_nodes(hash)] + fn write_sets(&self, _hash: &Hash) -> Self::WriteSets { + self.system.layout_manager.write_lock_with(write_sets) } fn write_quorum(&self) -> usize { - let nmembers = self.system.cluster_layout().current().all_nodes().len(); - if nmembers < 3 { - 1 + let layout = self.system.cluster_layout(); + let min_len = layout + .versions() + .iter() + .map(|x| x.all_nodes().len()) + .min() + .unwrap(); + let max_quorum = layout + .versions() + .iter() + .map(|x| x.all_nodes().len().div_euclid(2) + 1) + .max() + .unwrap(); + if min_len < max_quorum { + warn!("Write quorum will not be respected for TableFullReplication operations due to multiple active layout versions with vastly different number of nodes"); + min_len } else { - nmembers.div_euclid(2) + 1 + max_quorum } } @@ -56,15 +81,26 @@ impl TableReplication for TableFullReplication { fn sync_partitions(&self) -> SyncPartitions { let layout = self.system.cluster_layout(); - let layout_version = layout.current().version; + let layout_version = layout.ack_map_min(); + + let partitions = vec![SyncPartition { + partition: 0u16, + first_hash: [0u8; 32].into(), + last_hash: [0xff; 32].into(), + storage_sets: write_sets(&layout), + }]; + SyncPartitions { layout_version, - partitions: vec![SyncPartition { - partition: 0u16, - first_hash: [0u8; 32].into(), - last_hash: [0xff; 32].into(), - storage_sets: vec![layout.current().all_nodes().to_vec()], - }], + partitions, } } } + +fn write_sets(layout: &LayoutHelper) -> Vec> { + layout + .versions() + .iter() + .map(|x| x.all_nodes().to_vec()) + .collect() +} diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 3649fad3..327f2cbf 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use garage_rpc::layout::*; use garage_util::data::*; @@ -5,6 +7,8 @@ use garage_util::data::*; pub trait TableReplication: Send + Sync + 'static { type WriteSets: AsRef>> + AsMut>> + Send + Sync + 'static; + const ANTI_ENTROPY_INTERVAL: Duration; + // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index e0245949..2514d880 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use garage_rpc::layout::*; use garage_rpc::system::System; @@ -25,21 +26,37 @@ pub struct TableShardedReplication { } impl TableReplication for TableShardedReplication { + // Do anti-entropy every 10 minutes + const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); + type WriteSets = WriteLock>>; fn storage_nodes(&self, hash: &Hash) -> Vec { - self.system.cluster_layout().storage_nodes_of(hash) + let layout = self.system.cluster_layout(); + let mut ret = vec![]; + for version in layout.versions().iter() { + ret.extend(version.nodes_of(hash)); + } + ret.sort(); + ret.dedup(); + ret } fn read_nodes(&self, hash: &Hash) -> Vec { - self.system.cluster_layout().read_nodes_of(hash) + self.system + .cluster_layout() + .read_version() + .nodes_of(hash) + .collect() } fn read_quorum(&self) -> usize { self.read_quorum } fn write_sets(&self, hash: &Hash) -> Self::WriteSets { - self.system.layout_manager.write_sets_of(hash) + self.system + .layout_manager + .write_lock_with(|l| write_sets(l, hash)) } fn write_quorum(&self) -> usize { self.write_quorum @@ -57,12 +74,11 @@ impl TableReplication for TableShardedReplication { .current() .partitions() .map(|(partition, first_hash)| { - let storage_sets = layout.storage_sets_of(&first_hash); SyncPartition { partition, first_hash, last_hash: [0u8; 32].into(), // filled in just after - storage_sets, + storage_sets: write_sets(&layout, &first_hash), } }) .collect::>(); @@ -81,3 +97,11 @@ impl TableReplication for TableShardedReplication { } } } + +fn write_sets(layout: &LayoutHelper, hash: &Hash) -> Vec> { + layout + .versions() + .iter() + .map(|x| x.nodes_of(hash).collect()) + .collect() +} diff --git a/src/table/sync.rs b/src/table/sync.rs index 2d43b9fc..6c8ddb1d 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -27,9 +27,6 @@ use crate::merkle::*; use crate::replication::*; use crate::*; -// Do anti-entropy every 10 minutes -const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); - pub struct TableSyncer { system: Arc, data: Arc>, @@ -514,7 +511,7 @@ impl SyncWorker { partitions.partitions.shuffle(&mut thread_rng()); self.todo = Some(partitions); - self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; + self.next_full_sync = Instant::now() + R::ANTI_ENTROPY_INTERVAL; } } diff --git a/src/table/table.rs b/src/table/table.rs index c96f4731..565d27a5 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -482,6 +482,15 @@ impl Table { Ok(ret_vec) } + pub fn get_local( + self: &Arc, + partition_key: &F::P, + sort_key: &F::S, + ) -> Result, Error> { + let bytes = self.data.read_entry(partition_key, sort_key)?; + bytes.map(|b| self.data.decode_entry(&b)).transpose() + } + // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {