Merge pull request 'read/write quorums on admin operations' (#997) from admin-quorums into next-v2
All checks were successful
ci/woodpecker/push/debug Pipeline was successful
ci/woodpecker/pr/debug Pipeline was successful

Reviewed-on: #997
This commit is contained in:
Alex 2025-03-25 16:09:06 +00:00
commit c6d6cc1fc3
31 changed files with 385 additions and 281 deletions

1
Cargo.lock generated
View file

@ -1404,7 +1404,6 @@ dependencies = [
"garage_db",
"garage_net",
"garage_rpc",
"garage_table",
"garage_util",
"hex",
"opentelemetry",

View file

@ -169,8 +169,7 @@ impl AdminApiServer {
};
if token_required {
verify_authorization(&self.garage, global_token_hash, auth_header, request.name())
.await?;
verify_authorization(&self.garage, global_token_hash, auth_header, request.name())?;
}
match request {
@ -245,7 +244,7 @@ fn hash_bearer_token(token: &str) -> String {
.to_string()
}
async fn verify_authorization(
fn verify_authorization(
garage: &Garage,
global_token_hash: Option<&str>,
auth_header: Option<hyper::http::HeaderValue>,
@ -271,8 +270,7 @@ async fn verify_authorization(
let token_hash_string = if let Some((prefix, _)) = token.split_once('.') {
garage
.admin_token_table
.get(&EmptyKey, &prefix.to_string())
.await?
.get_local(&EmptyKey, &prefix.to_string())?
.and_then(|k| k.state.into_option())
.filter(|p| {
p.expiration

View file

@ -79,18 +79,24 @@ impl RequestHandler for GetBucketInfoRequest {
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetBucketInfoResponse, Error> {
let bucket_id = match (self.id, self.global_alias, self.search) {
(Some(id), None, None) => parse_bucket_id(&id)?,
(None, Some(ga), None) => garage
.bucket_alias_table
.get(&EmptyKey, &ga)
.await?
.and_then(|x| *x.state.get())
.ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?,
let bucket = match (self.id, self.global_alias, self.search) {
(Some(id), None, None) => {
let id = parse_bucket_id(&id)?;
garage.bucket_helper().get_existing_bucket(id).await?
}
(None, Some(ga), None) => {
let id = garage
.bucket_alias_table
.get(&EmptyKey, &ga)
.await?
.and_then(|x| *x.state.get())
.ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?;
garage.bucket_helper().get_existing_bucket(id).await?
}
(None, None, Some(search)) => {
let helper = garage.bucket_helper();
if let Some(uuid) = helper.resolve_global_bucket_name(&search).await? {
uuid
if let Some(bucket) = helper.resolve_global_bucket(&search).await? {
bucket
} else {
let hexdec = if search.len() >= 2 {
search
@ -124,7 +130,7 @@ impl RequestHandler for GetBucketInfoRequest {
if candidates.is_empty() {
return Err(Error::Common(CommonError::NoSuchBucket(search.clone())));
} else if candidates.len() == 1 {
candidates.into_iter().next().unwrap().id
candidates.into_iter().next().unwrap()
} else {
return Err(Error::bad_request(format!(
"Several matching buckets: {}",
@ -140,23 +146,18 @@ impl RequestHandler for GetBucketInfoRequest {
}
};
bucket_info_results(garage, bucket_id).await
bucket_info_results(garage, bucket).await
}
}
async fn bucket_info_results(
garage: &Arc<Garage>,
bucket_id: Uuid,
bucket: Bucket,
) -> Result<GetBucketInfoResponse, Error> {
let bucket = garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let counters = garage
.object_counter_table
.table
.get(&bucket_id, &EmptyKey)
.get(&bucket.id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&garage.system.cluster_layout()))
.unwrap_or_default();
@ -164,7 +165,7 @@ async fn bucket_info_results(
let mpu_counters = garage
.mpu_counter_table
.table
.get(&bucket_id, &EmptyKey)
.get(&bucket.id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&garage.system.cluster_layout()))
.unwrap_or_default();
@ -336,7 +337,7 @@ impl RequestHandler for CreateBucketRequest {
}
Ok(CreateBucketResponse(
bucket_info_results(garage, bucket.id).await?,
bucket_info_results(garage, bucket).await?,
))
}
}
@ -444,7 +445,7 @@ impl RequestHandler for UpdateBucketRequest {
garage.bucket_table.insert(&bucket).await?;
Ok(UpdateBucketResponse(
bucket_info_results(garage, bucket_id).await?,
bucket_info_results(garage, bucket).await?,
))
}
}
@ -534,7 +535,7 @@ pub async fn handle_bucket_change_key_perm(
.set_bucket_key_permissions(bucket.id, &key.key_id, perm)
.await?;
bucket_info_results(garage, bucket.id).await
bucket_info_results(garage, bucket).await
}
// ---- BUCKET ALIASES ----
@ -551,11 +552,11 @@ impl RequestHandler for AddBucketAliasRequest {
let helper = garage.locked_helper().await;
match self.alias {
let bucket = match self.alias {
BucketAliasEnum::Global { global_alias } => {
helper
.set_global_bucket_alias(bucket_id, &global_alias)
.await?;
.await?
}
BucketAliasEnum::Local {
local_alias,
@ -563,12 +564,12 @@ impl RequestHandler for AddBucketAliasRequest {
} => {
helper
.set_local_bucket_alias(bucket_id, &access_key_id, &local_alias)
.await?;
.await?
}
}
};
Ok(AddBucketAliasResponse(
bucket_info_results(garage, bucket_id).await?,
bucket_info_results(garage, bucket).await?,
))
}
}
@ -585,11 +586,11 @@ impl RequestHandler for RemoveBucketAliasRequest {
let helper = garage.locked_helper().await;
match self.alias {
let bucket = match self.alias {
BucketAliasEnum::Global { global_alias } => {
helper
.unset_global_bucket_alias(bucket_id, &global_alias)
.await?;
.await?
}
BucketAliasEnum::Local {
local_alias,
@ -597,12 +598,12 @@ impl RequestHandler for RemoveBucketAliasRequest {
} => {
helper
.unset_local_bucket_alias(bucket_id, &access_key_id, &local_alias)
.await?;
.await?
}
}
};
Ok(RemoveBucketAliasResponse(
bucket_info_results(garage, bucket_id).await?,
bucket_info_results(garage, bucket).await?,
))
}
}

View file

@ -151,12 +151,11 @@ async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error>
(domain.to_string(), true)
};
let bucket_id = match garage
let bucket = match garage
.bucket_helper()
.resolve_global_bucket_name(&bucket_name)
.await?
.resolve_global_bucket_fast(&bucket_name)?
{
Some(bucket_id) => bucket_id,
Some(b) => b,
None => return Ok(false),
};
@ -164,11 +163,6 @@ async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error>
return Ok(true);
}
let bucket = garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket_state = bucket.state.as_option().unwrap();
let bucket_website_config = bucket_state.website_config.get();

View file

@ -9,9 +9,7 @@ use hyper::{body::Body, body::Incoming as IncomingBody, Request, Response, Statu
use garage_model::bucket_table::{BucketParams, CorsRule as GarageCorsRule};
use garage_model::garage::Garage;
use crate::common_error::{
helper_error_as_internal, CommonError, OkOrBadRequest, OkOrInternalError,
};
use crate::common_error::{CommonError, OkOrBadRequest, OkOrInternalError};
use crate::helpers::*;
pub fn find_matching_cors_rule<'a, B>(
@ -76,7 +74,7 @@ pub fn add_cors_headers(
Ok(())
}
pub async fn handle_options_api(
pub fn handle_options_api(
garage: Arc<Garage>,
req: &Request<IncomingBody>,
bucket_name: Option<String>,
@ -93,16 +91,8 @@ pub async fn handle_options_api(
// OPTIONS calls are not auhtenticated).
if let Some(bn) = bucket_name {
let helper = garage.bucket_helper();
let bucket_id = helper
.resolve_global_bucket_name(&bn)
.await
.map_err(helper_error_as_internal)?;
if let Some(id) = bucket_id {
let bucket = garage
.bucket_helper()
.get_existing_bucket(id)
.await
.map_err(helper_error_as_internal)?;
let bucket_opt = helper.resolve_global_bucket_fast(&bn)?;
if let Some(bucket) = bucket_opt {
let bucket_params = bucket.state.into_option().unwrap();
handle_options_for_bucket(req, &bucket_params)
} else {

View file

@ -64,12 +64,12 @@ pub struct VerifiedRequest {
pub content_sha256_header: ContentSha256Header,
}
pub async fn verify_request(
pub fn verify_request(
garage: &Garage,
mut req: Request<IncomingBody>,
service: &'static str,
) -> Result<VerifiedRequest, Error> {
let checked_signature = payload::check_payload_signature(&garage, &mut req, service).await?;
let checked_signature = payload::check_payload_signature(&garage, &mut req, service)?;
let request = streaming::parse_streaming_body(
req,

View file

@ -32,7 +32,7 @@ pub struct CheckedSignature {
pub signature_header: Option<String>,
}
pub async fn check_payload_signature(
pub fn check_payload_signature(
garage: &Garage,
request: &mut Request<IncomingBody>,
service: &'static str,
@ -43,9 +43,9 @@ pub async fn check_payload_signature(
// We check for presigned-URL-style authentication first, because
// the browser or something else could inject an Authorization header
// that is totally unrelated to AWS signatures.
check_presigned_signature(garage, service, request, query).await
check_presigned_signature(garage, service, request, query)
} else if request.headers().contains_key(AUTHORIZATION) {
check_standard_signature(garage, service, request, query).await
check_standard_signature(garage, service, request, query)
} else {
// Unsigned (anonymous) request
let content_sha256 = request
@ -93,7 +93,7 @@ fn parse_x_amz_content_sha256(header: Option<&str>) -> Result<ContentSha256Heade
}
}
async fn check_standard_signature(
fn check_standard_signature(
garage: &Garage,
service: &'static str,
request: &Request<IncomingBody>,
@ -128,7 +128,7 @@ async fn check_standard_signature(
trace!("canonical request:\n{}", canonical_request);
trace!("string to sign:\n{}", string_to_sign);
let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?;
let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes())?;
let content_sha256_header = parse_x_amz_content_sha256(Some(&authorization.content_sha256))?;
@ -139,7 +139,7 @@ async fn check_standard_signature(
})
}
async fn check_presigned_signature(
fn check_presigned_signature(
garage: &Garage,
service: &'static str,
request: &mut Request<IncomingBody>,
@ -178,7 +178,7 @@ async fn check_presigned_signature(
trace!("canonical request (presigned url):\n{}", canonical_request);
trace!("string to sign (presigned url):\n{}", string_to_sign);
let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?;
let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes())?;
// In the page on presigned URLs, AWS specifies that if a signed query
// parameter and a signed header of the same name have different values,
@ -378,7 +378,7 @@ pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
Ok(Utc.from_utc_datetime(&date))
}
pub async fn verify_v4(
pub fn verify_v4(
garage: &Garage,
service: &str,
auth: &Authorization,
@ -391,8 +391,7 @@ pub async fn verify_v4(
let key = garage
.key_table
.get(&EmptyKey, &auth.key_id)
.await?
.get_local(&EmptyKey, &auth.key_id)?
.filter(|k| !k.state.is_deleted())
.ok_or_else(|| Error::forbidden(format!("No such key: {}", &auth.key_id)))?;
let key_p = key.params().unwrap();

View file

@ -77,25 +77,19 @@ impl ApiHandler for K2VApiServer {
// The OPTIONS method is processed early, before we even check for an API key
if let Endpoint::Options = endpoint {
let options_res = handle_options_api(garage, &req, Some(bucket_name))
.await
.ok_or_bad_request("Error handling OPTIONS")?;
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
let verified_request = verify_request(&garage, req, "k2v").await?;
let verified_request = verify_request(&garage, req, "k2v")?;
let req = verified_request.request;
let api_key = verified_request.access_key;
let bucket_id = garage
.bucket_helper()
.resolve_bucket(&bucket_name, &api_key)
.await
.map_err(pass_helper_error)?;
let bucket = garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await
.map_err(helper_error_as_internal)?;
.resolve_bucket_fast(&bucket_name, &api_key)
.map_err(pass_helper_error)?;
let bucket_id = bucket.id;
let bucket_params = bucket.state.into_option().unwrap();
let allowed = match endpoint.authorization_type() {

View file

@ -2,8 +2,8 @@ use err_derive::Error;
use hyper::header::HeaderValue;
use hyper::{HeaderMap, StatusCode};
pub(crate) use garage_api_common::common_error::pass_helper_error;
use garage_api_common::common_error::{commonErrorDerivative, CommonError};
pub(crate) use garage_api_common::common_error::{helper_error_as_internal, pass_helper_error};
pub use garage_api_common::common_error::{
CommonErrorDerivative, OkOrBadRequest, OkOrInternalError,
};

View file

@ -118,11 +118,11 @@ impl ApiHandler for S3ApiServer {
return handle_post_object(garage, req, bucket_name.unwrap()).await;
}
if let Endpoint::Options = endpoint {
let options_res = handle_options_api(garage, &req, bucket_name).await?;
let options_res = handle_options_api(garage, &req, bucket_name)?;
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
let verified_request = verify_request(&garage, req, "s3").await?;
let verified_request = verify_request(&garage, req, "s3")?;
let req = verified_request.request;
let api_key = verified_request.access_key;
@ -140,15 +140,11 @@ impl ApiHandler for S3ApiServer {
return handle_create_bucket(&garage, req, &api_key.key_id, bucket_name).await;
}
let bucket_id = garage
.bucket_helper()
.resolve_bucket(&bucket_name, &api_key)
.await
.map_err(pass_helper_error)?;
let bucket = garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
.resolve_bucket_fast(&bucket_name, &api_key)
.map_err(pass_helper_error)?;
let bucket_id = bucket.id;
let bucket_params = bucket.state.into_option().unwrap();
let allowed = match endpoint.authorization_type() {

View file

@ -143,21 +143,16 @@ pub async fn handle_create_bucket(
let api_key = helper.key().get_existing_key(api_key_id).await?;
let key_params = api_key.params().unwrap();
let existing_bucket = if let Some(Some(bucket_id)) = key_params.local_aliases.get(&bucket_name)
{
Some(*bucket_id)
} else {
helper
.bucket()
.resolve_global_bucket_name(&bucket_name)
.await?
};
let existing_bucket = helper
.bucket()
.resolve_bucket(&bucket_name, &api_key.key_id)
.await?;
if let Some(bucket_id) = existing_bucket {
if let Some(bucket) = existing_bucket {
// Check we have write or owner permission on the bucket,
// in that case it's fine, return 200 OK, bucket exists;
// otherwise return a forbidden error.
let kp = api_key.bucket_permissions(&bucket_id);
let kp = api_key.bucket_permissions(&bucket.id);
if !(kp.allow_write || kp.allow_owner) {
return Err(CommonError::BucketAlreadyExists.into());
}

View file

@ -683,16 +683,15 @@ async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object,
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
let (source_bucket, source_key) = parse_bucket_key(&copy_source, None)?;
let source_bucket_id = garage
let source_bucket = garage
.bucket_helper()
.resolve_bucket(&source_bucket.to_string(), api_key)
.await
.resolve_bucket_fast(&source_bucket.to_string(), api_key)
.map_err(pass_helper_error)?;
if !api_key.allow_read(&source_bucket_id) {
if !api_key.allow_read(&source_bucket.id) {
return Err(Error::forbidden(format!(
"Reading from bucket {} not allowed for this key",
source_bucket
"Reading from bucket {:?} not allowed for this key",
source_bucket.id
)));
}
@ -700,7 +699,7 @@ async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object,
let source_object = garage
.object_table
.get(&source_bucket_id, &source_key.to_string())
.get(&source_bucket.id, &source_key.to_string())
.await?
.ok_or(Error::NoSuchKey)?;

View file

@ -104,22 +104,18 @@ pub async fn handle_post_object(
key.to_owned()
};
let api_key = verify_v4(&garage, "s3", &authorization, policy.as_bytes()).await?;
let api_key = verify_v4(&garage, "s3", &authorization, policy.as_bytes())?;
let bucket_id = garage
let bucket = garage
.bucket_helper()
.resolve_bucket(&bucket_name, &api_key)
.await
.resolve_bucket_fast(&bucket_name, &api_key)
.map_err(pass_helper_error)?;
let bucket_id = bucket.id;
if !api_key.allow_write(&bucket_id) {
return Err(Error::forbidden("Operation is not allowed for this key."));
}
let bucket = garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket_params = bucket.state.into_option().unwrap();
let matching_cors_rule = find_matching_cors_rule(
&bucket_params,

View file

@ -18,7 +18,6 @@ garage_db.workspace = true
garage_net.workspace = true
garage_rpc.workspace = true
garage_util.workspace = true
garage_table.workspace = true
opentelemetry.workspace = true

View file

@ -33,8 +33,6 @@ use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System;
use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
use crate::layout::*;
use crate::metrics::*;
@ -74,8 +72,8 @@ impl Rpc for BlockRpc {
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
/// Quorum of nodes for write operations
pub write_quorum: usize,
/// Data layout
pub(crate) data_layout: ArcSwap<DataLayout>,
@ -122,7 +120,7 @@ impl BlockManager {
pub fn new(
db: &db::Db,
config: &Config,
replication: TableShardedReplication,
write_quorum: usize,
system: Arc<System>,
) -> Result<Arc<Self>, Error> {
// Load or compute layout, i.e. assignment of data blocks to the different data directories
@ -166,7 +164,7 @@ impl BlockManager {
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
let block_manager = Arc::new(Self {
replication,
write_quorum,
data_layout: ArcSwap::new(Arc::new(data_layout)),
data_layout_persister,
data_fsync: config.data_fsync,
@ -338,6 +336,19 @@ impl BlockManager {
Err(err)
}
/// Returns the set of nodes that should store a copy of a given block.
/// These are the nodes assigned to the block's hash in the current
/// layout version only: since blocks are immutable, we don't need to
/// do complex logic when several layour versions are active at once,
/// just move them directly to the new nodes.
pub(crate) fn storage_nodes_of(&self, hash: &Hash) -> Vec<Uuid> {
self.system
.cluster_layout()
.current()
.nodes_of(hash)
.collect()
}
// ---- Public interface ----
/// Ask nodes that might have a block for it, return it as a stream
@ -370,7 +381,7 @@ impl BlockManager {
prevent_compression: bool,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
let who = self.system.cluster_layout().current_storage_nodes_of(&hash);
let who = self.storage_nodes_of(&hash);
let compression_level = self.compression_level.filter(|_| !prevent_compression);
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
@ -400,7 +411,7 @@ impl BlockManager {
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_drop_on_completion(permit)
.with_quorum(self.replication.write_quorum()),
.with_quorum(self.write_quorum),
)
.await?;

View file

@ -27,8 +27,6 @@ use garage_util::tranquilizer::Tranquilizer;
use garage_rpc::system::System;
use garage_rpc::*;
use garage_table::replication::TableReplication;
use crate::manager::*;
// The delay between the time where a resync operation fails
@ -377,11 +375,8 @@ impl BlockResyncManager {
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
let mut who = manager
.system
.cluster_layout()
.current_storage_nodes_of(hash);
if who.len() < manager.replication.write_quorum() {
let mut who = manager.storage_nodes_of(hash);
if who.len() < manager.write_quorum {
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
}
who.retain(|id| *id != manager.system.id);
@ -463,10 +458,7 @@ impl BlockResyncManager {
// First, check whether we are still supposed to store that
// block in the latest cluster layout version.
let storage_nodes = manager
.system
.cluster_layout()
.current_storage_nodes_of(&hash);
let storage_nodes = manager.storage_nodes_of(&hash);
if !storage_nodes.contains(&manager.system.id) {
info!(

View file

@ -154,13 +154,6 @@ impl Garage {
info!("Initialize membership management system...");
let system = System::new(network_key, replication_factor, consistency_mode, &config)?;
let data_rep_param = TableShardedReplication {
system: system.clone(),
replication_factor: replication_factor.into(),
write_quorum: replication_factor.write_quorum(consistency_mode),
read_quorum: 1,
};
let meta_rep_param = TableShardedReplication {
system: system.clone(),
replication_factor: replication_factor.into(),
@ -173,7 +166,8 @@ impl Garage {
};
info!("Initialize block manager...");
let block_manager = BlockManager::new(&db, &config, data_rep_param, system.clone())?;
let block_write_quorum = replication_factor.write_quorum(consistency_mode);
let block_manager = BlockManager::new(&db, &config, block_write_quorum, system.clone())?;
block_manager.register_bg_vars(&mut bg_vars);
// ---- admin tables ----

View file

@ -1,7 +1,7 @@
use std::time::Duration;
use garage_util::data::*;
use garage_util::error::OkOrMessage;
use garage_util::error::{Error as GarageError, OkOrMessage};
use garage_util::time::*;
use garage_table::util::*;
@ -16,61 +16,172 @@ pub struct BucketHelper<'a>(pub(crate) &'a Garage);
#[allow(clippy::ptr_arg)]
impl<'a> BucketHelper<'a> {
pub async fn resolve_global_bucket_name(
// ================
// Local functions to find buckets FAST.
// This is only for the fast path in API requests.
// They do not provide the read-after-write guarantee
// when used in conjunction with other operations that
// modify buckets and bucket aliases.
// ================
/// Return bucket corresponding to global bucket name, if it exists
/// (and is not a tombstone entry).
///
/// The name can be of two forms:
/// 1. A global bucket alias
/// 2. The full ID of a bucket encoded in hex
///
/// Note that there is no possible ambiguity between the two forms,
/// as the maximum length of a bucket name is 63 characters, and the full
/// hex id is 64 chars long.
///
/// This will not do any network interaction to check the alias and
/// bucket tables, it will only check the local copy of the table.
/// As a consequence, it does not provide read-after-write guarantees.
pub fn resolve_global_bucket_fast(
&self,
bucket_name: &String,
) -> Result<Option<Uuid>, Error> {
// Bucket names in Garage are aliases, true bucket identifiers
// are 32-byte UUIDs. This function resolves bucket names into
// their full identifier by looking up in the bucket_alias_table.
// This function also allows buckets to be identified by their
// full UUID (hex-encoded). Here, if the name to be resolved is a
// hex string of the correct length, it is directly parsed as a bucket
// identifier which is returned. There is no risk of this conflicting
// with an actual bucket name: bucket names are max 63 chars long by
// the AWS spec, and hex-encoded UUIDs are 64 chars long.
) -> Result<Option<Bucket>, GarageError> {
let hexbucket = hex::decode(bucket_name.as_str())
.ok()
.and_then(|by| Uuid::try_from(&by));
if let Some(bucket_id) = hexbucket {
Ok(self
.0
.bucket_table
.get(&EmptyKey, &bucket_id)
.await?
.filter(|x| !x.state.is_deleted())
.map(|_| bucket_id))
} else {
Ok(self
.0
.bucket_alias_table
.get(&EmptyKey, bucket_name)
.await?
.and_then(|x| *x.state.get()))
}
let bucket_id = match hexbucket {
Some(id) => id,
None => {
let alias = self
.0
.bucket_alias_table
.get_local(&EmptyKey, bucket_name)?
.and_then(|x| *x.state.get());
match alias {
Some(id) => id,
None => return Ok(None),
}
}
};
Ok(self
.0
.bucket_table
.get_local(&EmptyKey, &bucket_id)?
.filter(|x| !x.state.is_deleted()))
}
/// Return bucket corresponding to a bucket name from the perspective of
/// a given access key, if it exists (and is not a tombstone entry).
///
/// The name can be of three forms:
/// 1. A global bucket alias
/// 2. A local bucket alias
/// 3. The full ID of a bucket encoded in hex
///
/// This will not do any network interaction, it will only check the local
/// copy of the bucket and global alias table. It will also resolve local
/// aliases directly using the data provided in the `api_key` parameter.
/// As a consequence, it does not provide read-after-write guarantees.
///
/// In case no such bucket is found, this function returns a NoSuchBucket error.
#[allow(clippy::ptr_arg)]
pub async fn resolve_bucket(&self, bucket_name: &String, api_key: &Key) -> Result<Uuid, Error> {
pub fn resolve_bucket_fast(
&self,
bucket_name: &String,
api_key: &Key,
) -> Result<Bucket, Error> {
let api_key_params = api_key
.state
.as_option()
.ok_or_message("Key should not be deleted at this point")?;
if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) {
Ok(*bucket_id)
} else {
let bucket_opt =
if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) {
self.0
.bucket_table
.get_local(&EmptyKey, &bucket_id)?
.filter(|x| !x.state.is_deleted())
} else {
self.resolve_global_bucket_fast(bucket_name)?
};
bucket_opt.ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string()))
}
// ================
// Global functions that do quorum reads/writes,
// for admin operations.
// ================
/// This is the same as `resolve_global_bucket_fast`,
/// except that it does quorum reads to ensure consistency.
pub async fn resolve_global_bucket(
&self,
bucket_name: &String,
) -> Result<Option<Bucket>, GarageError> {
let hexbucket = hex::decode(bucket_name.as_str())
.ok()
.and_then(|by| Uuid::try_from(&by));
let bucket_id = match hexbucket {
Some(id) => id,
None => {
let alias = self
.0
.bucket_alias_table
.get(&EmptyKey, bucket_name)
.await?
.and_then(|x| *x.state.get());
match alias {
Some(id) => id,
None => return Ok(None),
}
}
};
Ok(self
.0
.bucket_table
.get(&EmptyKey, &bucket_id)
.await?
.filter(|x| !x.state.is_deleted()))
}
/// Return bucket corresponding to a bucket name from the perspective of
/// a given access key, if it exists (and is not a tombstone entry).
///
/// This is the same as `resolve_bucket_fast`, with the following differences:
///
/// - this function does quorum reads to ensure consistency.
/// - this function fetches the Key entry from the key table to ensure up-to-date data
/// - this function returns None if the bucket is not found, instead of HelperError::NoSuchBucket
#[allow(clippy::ptr_arg)]
pub async fn resolve_bucket(
&self,
bucket_name: &String,
key_id: &String,
) -> Result<Option<Bucket>, GarageError> {
let local_alias = self
.0
.key_table
.get(&EmptyKey, &key_id)
.await?
.and_then(|k| k.state.into_option())
.ok_or_else(|| GarageError::Message(format!("access key {} has been deleted", key_id)))?
.local_aliases
.get(bucket_name)
.copied()
.flatten();
if let Some(bucket_id) = local_alias {
Ok(self
.resolve_global_bucket_name(bucket_name)
.0
.bucket_table
.get(&EmptyKey, &bucket_id)
.await?
.ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string()))?)
.filter(|x| !x.state.is_deleted()))
} else {
Ok(self.resolve_global_bucket(bucket_name).await?)
}
}
/// Returns a Bucket if it is present in bucket table,
/// even if it is in deleted state. Querying a non-existing
/// bucket ID returns an internal error.
pub async fn get_internal_bucket(&self, bucket_id: Uuid) -> Result<Bucket, Error> {
pub(crate) async fn get_internal_bucket(&self, bucket_id: Uuid) -> Result<Bucket, Error> {
Ok(self
.0
.bucket_table

View file

@ -6,6 +6,7 @@ use garage_util::time::*;
use garage_table::util::*;
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::garage::Garage;
use crate::helper::bucket::BucketHelper;
use crate::helper::error::*;
@ -56,7 +57,7 @@ impl<'a> LockedHelper<'a> {
&self,
bucket_id: Uuid,
alias_name: &String,
) -> Result<(), Error> {
) -> Result<Bucket, Error> {
if !is_valid_bucket_name(alias_name) {
return Err(Error::InvalidBucketName(alias_name.to_string()));
}
@ -100,7 +101,7 @@ impl<'a> LockedHelper<'a> {
bucket_p.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, true);
self.0.bucket_table.insert(&bucket).await?;
Ok(())
Ok(bucket)
}
/// Unsets an alias for a bucket in global namespace.
@ -112,7 +113,7 @@ impl<'a> LockedHelper<'a> {
&self,
bucket_id: Uuid,
alias_name: &String,
) -> Result<(), Error> {
) -> Result<Bucket, Error> {
let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?;
let bucket_state = bucket.state.as_option_mut().unwrap();
@ -156,7 +157,7 @@ impl<'a> LockedHelper<'a> {
bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
Ok(())
Ok(bucket)
}
/// Ensures a bucket does not have a certain global alias.
@ -215,7 +216,7 @@ impl<'a> LockedHelper<'a> {
bucket_id: Uuid,
key_id: &String,
alias_name: &String,
) -> Result<(), Error> {
) -> Result<Bucket, Error> {
let key_helper = KeyHelper(self.0);
if !is_valid_bucket_name(alias_name) {
@ -257,7 +258,7 @@ impl<'a> LockedHelper<'a> {
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true);
self.0.bucket_table.insert(&bucket).await?;
Ok(())
Ok(bucket)
}
/// Unsets an alias for a bucket in the local namespace of a key.
@ -271,7 +272,7 @@ impl<'a> LockedHelper<'a> {
bucket_id: Uuid,
key_id: &String,
alias_name: &String,
) -> Result<(), Error> {
) -> Result<Bucket, Error> {
let key_helper = KeyHelper(self.0);
let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?;
@ -330,7 +331,7 @@ impl<'a> LockedHelper<'a> {
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
Ok(())
Ok(bucket)
}
/// Sets permissions for a key on a bucket.

View file

@ -451,10 +451,7 @@ impl K2VRpcHandler {
let mut value = self
.item_table
.data
.read_entry(&key.partition, &key.sort_key)?
.map(|bytes| self.item_table.data.decode_entry(&bytes[..]))
.transpose()?
.get_local(&key.partition, &key.sort_key)?
.unwrap_or_else(|| {
K2VItem::new(
key.partition.bucket_id,

View file

@ -149,14 +149,27 @@ impl LayoutHelper {
self.layout.as_ref().unwrap()
}
/// Returns the current layout version
pub fn current(&self) -> &LayoutVersion {
self.inner().current()
}
/// Returns all layout versions currently active in the cluster
pub fn versions(&self) -> &[LayoutVersion] {
&self.inner().versions
}
/// Returns the latest layout version for which it is safe to read data from,
/// i.e. the version whose version number is sync_map_min
pub fn read_version(&self) -> &LayoutVersion {
let sync_min = self.sync_map_min;
self.versions()
.iter()
.find(|x| x.version == sync_min)
.or(self.versions().last())
.unwrap()
}
pub fn is_check_ok(&self) -> bool {
self.is_check_ok
}
@ -181,6 +194,8 @@ impl LayoutHelper {
self.sync_map_min
}
// ---- helpers for layout synchronization ----
pub fn sync_digest(&self) -> SyncLayoutDigest {
SyncLayoutDigest {
current: self.current().version,
@ -189,50 +204,7 @@ impl LayoutHelper {
}
}
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let sync_min = self.sync_map_min;
let version = self
.versions()
.iter()
.find(|x| x.version == sync_min)
.or(self.versions().last())
.unwrap();
version
.nodes_of(position, version.replication_factor)
.collect()
}
pub fn storage_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.versions()
.iter()
.map(|x| x.nodes_of(position, x.replication_factor).collect())
.collect()
}
pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let mut ret = vec![];
for version in self.versions().iter() {
ret.extend(version.nodes_of(position, version.replication_factor));
}
ret.sort();
ret.dedup();
ret
}
pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let ver = self.current();
ver.nodes_of(position, ver.replication_factor).collect()
}
pub fn trackers_hash(&self) -> Hash {
self.trackers_hash
}
pub fn staging_hash(&self) -> Hash {
self.staging_hash
}
pub fn digest(&self) -> RpcLayoutDigest {
pub(crate) fn digest(&self) -> RpcLayoutDigest {
RpcLayoutDigest {
current_version: self.current().version,
active_versions: self.versions().len(),

View file

@ -180,9 +180,7 @@ impl LayoutHistory {
// Determine set of nodes for partition p in layout version v.
// Sort the node set to avoid duplicate computations.
let mut set = v
.nodes_of(&p_hash, v.replication_factor)
.collect::<Vec<Uuid>>();
let mut set = v.nodes_of(&p_hash).collect::<Vec<Uuid>>();
set.sort();
// If this set was already processed, skip it.

View file

@ -143,16 +143,19 @@ impl LayoutManager {
// ---- ACK LOCKING ----
pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> {
pub fn write_lock_with<T, F>(self: &Arc<Self>, f: F) -> WriteLock<T>
where
F: FnOnce(&LayoutHelper) -> T,
{
let layout = self.layout();
let version = layout.current().version;
let nodes = layout.storage_sets_of(position);
let value = f(&layout);
layout
.ack_lock
.get(&version)
.unwrap()
.fetch_add(1, Ordering::Relaxed);
WriteLock::new(version, self, nodes)
WriteLock::new(version, self, value)
}
// ---- INTERNALS ---

View file

@ -114,9 +114,7 @@ impl LayoutVersion {
}
/// Return the n servers in which data for this hash should be replicated
pub fn nodes_of(&self, position: &Hash, n: usize) -> impl Iterator<Item = Uuid> + '_ {
assert_eq!(n, self.replication_factor);
pub fn nodes_of(&self, position: &Hash) -> impl Iterator<Item = Uuid> + '_ {
let data = &self.ring_assignment_data;
let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) {

View file

@ -573,7 +573,7 @@ impl RpcHelper {
// Compute, for each layout version, the set of nodes that might store
// the block, and put them in their preferred order as of `request_order`.
let mut vernodes = layout.versions().iter().map(|ver| {
let nodes = ver.nodes_of(position, ver.replication_factor);
let nodes = ver.nodes_of(position);
rpc_helper.request_order(layout.current(), nodes)
});
@ -607,7 +607,7 @@ impl RpcHelper {
// Second step: add nodes of older layout versions
let old_ver_iter = layout.inner().old_versions.iter().rev();
for ver in old_ver_iter {
let nodes = ver.nodes_of(position, ver.replication_factor);
let nodes = ver.nodes_of(position);
for node in rpc_helper.request_order(layout.current(), nodes) {
if !ret.contains(&node) {
ret.push(node);

View file

@ -475,10 +475,7 @@ impl System {
let mut partitions_quorum = 0;
let mut partitions_all_ok = 0;
for (_, hash) in partitions.iter() {
let mut write_sets = layout
.versions()
.iter()
.map(|x| x.nodes_of(hash, x.replication_factor));
let mut write_sets = layout.versions().iter().map(|x| x.nodes_of(hash));
let has_quorum = write_sets
.clone()
.all(|set| set.filter(|x| node_up(x)).count() >= quorum);

View file

@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::Duration;
use garage_rpc::layout::*;
use garage_rpc::system::System;
@ -24,29 +25,53 @@ pub struct TableFullReplication {
}
impl TableReplication for TableFullReplication {
type WriteSets = Vec<Vec<Uuid>>;
type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
// Do anti-entropy every 10 seconds.
// Compared to sharded tables, anti-entropy is much less costly as there is
// a single partition hash to exchange.
// Also, it's generally a much bigger problem for fullcopy tables to be out of sync.
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10);
fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
let layout = self.system.cluster_layout();
layout.current().all_nodes().to_vec()
self.system.cluster_layout().all_nodes().to_vec()
}
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
self.system
.cluster_layout()
.read_version()
.all_nodes()
.to_vec()
}
fn read_quorum(&self) -> usize {
1
let layout = self.system.cluster_layout();
let nodes = layout.read_version().all_nodes();
nodes.len().div_euclid(2) + 1
}
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
vec![self.storage_nodes(hash)]
fn write_sets(&self, _hash: &Hash) -> Self::WriteSets {
self.system.layout_manager.write_lock_with(write_sets)
}
fn write_quorum(&self) -> usize {
let nmembers = self.system.cluster_layout().current().all_nodes().len();
if nmembers < 3 {
1
let layout = self.system.cluster_layout();
let min_len = layout
.versions()
.iter()
.map(|x| x.all_nodes().len())
.min()
.unwrap();
let max_quorum = layout
.versions()
.iter()
.map(|x| x.all_nodes().len().div_euclid(2) + 1)
.max()
.unwrap();
if min_len < max_quorum {
warn!("Write quorum will not be respected for TableFullReplication operations due to multiple active layout versions with vastly different number of nodes");
min_len
} else {
nmembers.div_euclid(2) + 1
max_quorum
}
}
@ -56,15 +81,26 @@ impl TableReplication for TableFullReplication {
fn sync_partitions(&self) -> SyncPartitions {
let layout = self.system.cluster_layout();
let layout_version = layout.current().version;
let layout_version = layout.ack_map_min();
let partitions = vec![SyncPartition {
partition: 0u16,
first_hash: [0u8; 32].into(),
last_hash: [0xff; 32].into(),
storage_sets: write_sets(&layout),
}];
SyncPartitions {
layout_version,
partitions: vec![SyncPartition {
partition: 0u16,
first_hash: [0u8; 32].into(),
last_hash: [0xff; 32].into(),
storage_sets: vec![layout.current().all_nodes().to_vec()],
}],
partitions,
}
}
}
fn write_sets(layout: &LayoutHelper) -> Vec<Vec<Uuid>> {
layout
.versions()
.iter()
.map(|x| x.all_nodes().to_vec())
.collect()
}

View file

@ -1,3 +1,5 @@
use std::time::Duration;
use garage_rpc::layout::*;
use garage_util::data::*;
@ -5,6 +7,8 @@ use garage_util::data::*;
pub trait TableReplication: Send + Sync + 'static {
type WriteSets: AsRef<Vec<Vec<Uuid>>> + AsMut<Vec<Vec<Uuid>>> + Send + Sync + 'static;
const ANTI_ENTROPY_INTERVAL: Duration;
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods

View file

@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::Duration;
use garage_rpc::layout::*;
use garage_rpc::system::System;
@ -25,21 +26,37 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
self.system.cluster_layout().storage_nodes_of(hash)
let layout = self.system.cluster_layout();
let mut ret = vec![];
for version in layout.versions().iter() {
ret.extend(version.nodes_of(hash));
}
ret.sort();
ret.dedup();
ret
}
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
self.system.cluster_layout().read_nodes_of(hash)
self.system
.cluster_layout()
.read_version()
.nodes_of(hash)
.collect()
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
self.system.layout_manager.write_sets_of(hash)
self.system
.layout_manager
.write_lock_with(|l| write_sets(l, hash))
}
fn write_quorum(&self) -> usize {
self.write_quorum
@ -57,12 +74,11 @@ impl TableReplication for TableShardedReplication {
.current()
.partitions()
.map(|(partition, first_hash)| {
let storage_sets = layout.storage_sets_of(&first_hash);
SyncPartition {
partition,
first_hash,
last_hash: [0u8; 32].into(), // filled in just after
storage_sets,
storage_sets: write_sets(&layout, &first_hash),
}
})
.collect::<Vec<_>>();
@ -81,3 +97,11 @@ impl TableReplication for TableShardedReplication {
}
}
}
fn write_sets(layout: &LayoutHelper, hash: &Hash) -> Vec<Vec<Uuid>> {
layout
.versions()
.iter()
.map(|x| x.nodes_of(hash).collect())
.collect()
}

View file

@ -27,9 +27,6 @@ use crate::merkle::*;
use crate::replication::*;
use crate::*;
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
@ -514,7 +511,7 @@ impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
partitions.partitions.shuffle(&mut thread_rng());
self.todo = Some(partitions);
self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
self.next_full_sync = Instant::now() + R::ANTI_ENTROPY_INTERVAL;
}
}

View file

@ -482,6 +482,15 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
Ok(ret_vec)
}
pub fn get_local(
self: &Arc<Self>,
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let bytes = self.data.read_entry(partition_key, sort_key)?;
bytes.map(|b| self.data.decode_entry(&b)).transpose()
}
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {