Per-Bucket Consistency
All checks were successful
ci/woodpecker/pr/debug Pipeline was successful

This commit is contained in:
Yureka 2024-03-04 16:58:06 +01:00
parent 060f83edf1
commit 41a17ce14a
38 changed files with 658 additions and 243 deletions

View file

@ -10,6 +10,8 @@ use garage_util::time::*;
use garage_table::*;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
@ -27,6 +29,7 @@ pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<ResBod
let buckets = garage
.bucket_table
.get_range(
(),
&EmptyKey,
None,
Some(DeletedFilter::NotDeleted),
@ -117,11 +120,13 @@ async fn bucket_info_results(
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket_state = bucket.state.as_option().unwrap();
let c = *bucket_state.consistency_mode.get();
let counters = garage
.object_counter_table
.table
.get(&bucket_id, &EmptyKey)
.get(c, &bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&garage.system.cluster_layout()))
.unwrap_or_default();
@ -129,7 +134,7 @@ async fn bucket_info_results(
let mpu_counters = garage
.mpu_counter_table
.table
.get(&bucket_id, &EmptyKey)
.get(c, &bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&garage.system.cluster_layout()))
.unwrap_or_default();
@ -145,7 +150,7 @@ async fn bucket_info_results(
{
if let Some(key) = garage
.key_table
.get(&EmptyKey, k)
.get((), &EmptyKey, k)
.await?
.filter(|k| !k.is_deleted())
{
@ -165,7 +170,7 @@ async fn bucket_info_results(
if relevant_keys.contains_key(k) {
continue;
}
if let Some(key) = garage.key_table.get(&EmptyKey, k).await? {
if let Some(key) = garage.key_table.get((), &EmptyKey, k).await? {
if !key.state.is_deleted() {
relevant_keys.insert(k.clone(), key);
}
@ -185,6 +190,7 @@ async fn bucket_info_results(
.filter(|(_, _, a)| *a)
.map(|(n, _, _)| n.to_string())
.collect::<Vec<_>>(),
consistency_mode: *state.consistency_mode.get(),
website_access: state.website_config.get().is_some(),
website_config: state.website_config.get().clone().map(|wsc| {
GetBucketInfoWebsiteResult {
@ -238,6 +244,7 @@ async fn bucket_info_results(
struct GetBucketInfoResult {
id: String,
global_aliases: Vec<String>,
consistency_mode: ConsistencyMode,
website_access: bool,
#[serde(default)]
website_config: Option<GetBucketInfoWebsiteResult>,
@ -283,7 +290,7 @@ pub async fn handle_create_bucket(
)));
}
if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? {
if let Some(alias) = garage.bucket_alias_table.get((), &EmptyKey, ga).await? {
if alias.state.get().is_some() {
return Err(CommonError::BucketAlreadyExists.into());
}
@ -306,7 +313,7 @@ pub async fn handle_create_bucket(
}
let bucket = Bucket::new();
garage.bucket_table.insert(&bucket).await?;
garage.bucket_table.insert((), &bucket).await?;
if let Some(ga) = &req.global_alias {
helper.set_global_bucket_alias(bucket.id, ga).await?;
@ -394,7 +401,7 @@ pub async fn handle_delete_bucket(
// 4. delete bucket
bucket.state = Deletable::delete();
garage.bucket_table.insert(&bucket).await?;
garage.bucket_table.insert((), &bucket).await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@ -416,6 +423,10 @@ pub async fn handle_update_bucket(
let state = bucket.state.as_option_mut().unwrap();
if let Some(cm) = req.consistency_mode {
state.consistency_mode.update(cm);
}
if let Some(wa) = req.website_access {
if wa.enabled {
state.website_config.update(Some(WebsiteConfig {
@ -441,7 +452,7 @@ pub async fn handle_update_bucket(
});
}
garage.bucket_table.insert(&bucket).await?;
garage.bucket_table.insert((), &bucket).await?;
bucket_info_results(garage, bucket_id).await
}
@ -449,6 +460,7 @@ pub async fn handle_update_bucket(
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct UpdateBucketRequest {
consistency_mode: Option<ConsistencyMode>,
website_access: Option<UpdateBucketWebsiteAccess>,
quotas: Option<ApiBucketQuotas>,
}

View file

@ -17,6 +17,7 @@ pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<ResBody>,
let res = garage
.key_table
.get_range(
(),
&EmptyKey,
None,
Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
@ -68,7 +69,7 @@ pub async fn handle_create_key(
let req = parse_json_body::<CreateKeyRequest, _, Error>(req).await?;
let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key"));
garage.key_table.insert(&key).await?;
garage.key_table.insert((), &key).await?;
key_info_results(garage, key, true).await
}
@ -85,7 +86,10 @@ pub async fn handle_import_key(
) -> Result<Response<ResBody>, Error> {
let req = parse_json_body::<ImportKeyRequest, _, Error>(req).await?;
let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?;
let prev_key = garage
.key_table
.get((), &EmptyKey, &req.access_key_id)
.await?;
if prev_key.is_some() {
return Err(Error::KeyAlreadyExists(req.access_key_id.to_string()));
}
@ -96,7 +100,7 @@ pub async fn handle_import_key(
req.name.as_deref().unwrap_or("Imported key"),
)
.ok_or_bad_request("Invalid key format")?;
garage.key_table.insert(&imported_key).await?;
garage.key_table.insert((), &imported_key).await?;
key_info_results(garage, imported_key, false).await
}
@ -134,7 +138,7 @@ pub async fn handle_update_key(
}
}
garage.key_table.insert(&key).await?;
garage.key_table.insert((), &key).await?;
key_info_results(garage, key, false).await
}
@ -184,7 +188,7 @@ async fn key_info_results(
.filter_map(|(_, _, v)| v.as_ref()),
) {
if !relevant_buckets.contains_key(id) {
if let Some(b) = garage.bucket_table.get(&EmptyKey, id).await? {
if let Some(b) = garage.bucket_table.get((), &EmptyKey, id).await? {
if b.state.as_option().is_some() {
relevant_buckets.insert(*id, b);
}

View file

@ -17,7 +17,10 @@ pub async fn handle_insert_batch(
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
@ -35,7 +38,11 @@ pub async fn handle_insert_batch(
items2.push((it.pk, it.sk, ct, v));
}
garage.k2v.rpc.insert_batch(*bucket_id, items2).await?;
garage
.k2v
.rpc
.insert_batch(*bucket_params.consistency_mode.get(), *bucket_id, items2)
.await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
@ -68,8 +75,12 @@ async fn handle_read_batch_query(
query: ReadBatchQuery,
) -> Result<ReadBatchResponse, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = ctx;
let c = *bucket_params.consistency_mode.get();
let partition = K2VItemPartition {
bucket_id: *bucket_id,
@ -92,7 +103,7 @@ async fn handle_read_batch_query(
let item = garage
.k2v
.item_table
.get(&partition, sk)
.get(c, &partition, sk)
.await?
.filter(|e| K2VItemTable::matches_filter(e, &filter));
match item {
@ -109,6 +120,7 @@ async fn handle_read_batch_query(
query.limit,
Some(filter),
EnumerationOrder::from_reverse(query.reverse),
c,
)
.await?;
@ -162,8 +174,12 @@ async fn handle_delete_batch_query(
query: DeleteBatchQuery,
) -> Result<DeleteBatchResponse, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let c = *bucket_params.consistency_mode.get();
let partition = K2VItemPartition {
bucket_id: *bucket_id,
@ -186,7 +202,7 @@ async fn handle_delete_batch_query(
let item = garage
.k2v
.item_table
.get(&partition, sk)
.get(c, &partition, sk)
.await?
.filter(|e| K2VItemTable::matches_filter(e, &filter));
match item {
@ -196,6 +212,7 @@ async fn handle_delete_batch_query(
.k2v
.rpc
.insert(
c,
*bucket_id,
i.partition.partition_key,
i.sort_key,
@ -217,6 +234,7 @@ async fn handle_delete_batch_query(
None,
Some(filter),
EnumerationOrder::Forward,
c,
)
.await?;
assert!(!more);
@ -236,7 +254,7 @@ async fn handle_delete_batch_query(
.collect::<Vec<_>>();
let n = items.len();
garage.k2v.rpc.insert_batch(*bucket_id, items).await?;
garage.k2v.rpc.insert_batch(c, *bucket_id, items).await?;
n
};
@ -257,7 +275,10 @@ pub(crate) async fn handle_poll_range(
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = ctx;
use garage_model::k2v::sub::PollRange;
@ -269,6 +290,7 @@ pub(crate) async fn handle_poll_range(
.k2v
.rpc
.poll_range(
*bucket_params.consistency_mode.get(),
PollRange {
partition: K2VItemPartition {
bucket_id,

View file

@ -19,7 +19,10 @@ pub async fn handle_read_index(
reverse: Option<bool>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let reverse = reverse.unwrap_or(false);
@ -39,6 +42,7 @@ pub async fn handle_read_index(
limit,
Some((DeletedFilter::NotDeleted, node_id_vec)),
EnumerationOrder::from_reverse(reverse),
*bucket_params.consistency_mode.get(),
)
.await?;

View file

@ -101,7 +101,10 @@ pub async fn handle_read_item(
sort_key: &String,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let format = ReturnFormat::from(req)?;
@ -110,6 +113,7 @@ pub async fn handle_read_item(
.k2v
.item_table
.get(
*bucket_params.consistency_mode.get(),
&K2VItemPartition {
bucket_id: *bucket_id,
partition_key: partition_key.to_string(),
@ -129,7 +133,10 @@ pub async fn handle_insert_item(
sort_key: &str,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let causal_context = req
.headers()
@ -149,6 +156,7 @@ pub async fn handle_insert_item(
.k2v
.rpc
.insert(
*bucket_params.consistency_mode.get(),
*bucket_id,
partition_key.to_string(),
sort_key.to_string(),
@ -169,7 +177,10 @@ pub async fn handle_delete_item(
sort_key: &str,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let causal_context = req
.headers()
@ -185,6 +196,7 @@ pub async fn handle_delete_item(
.k2v
.rpc
.insert(
*bucket_params.consistency_mode.get(),
*bucket_id,
partition_key.to_string(),
sort_key.to_string(),
@ -209,7 +221,10 @@ pub async fn handle_poll_item(
timeout_secs: Option<u64>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let format = ReturnFormat::from(req)?;
@ -222,6 +237,7 @@ pub async fn handle_poll_item(
.k2v
.rpc
.poll_item(
*bucket_params.consistency_mode.get(),
*bucket_id,
partition_key,
sort_key,

View file

@ -4,6 +4,7 @@
use std::sync::Arc;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
@ -22,6 +23,7 @@ pub(crate) async fn read_range<F>(
limit: Option<u64>,
filter: Option<F::Filter>,
enumeration_order: EnumerationOrder,
c: ConsistencyMode,
) -> Result<(Vec<F::E>, bool, Option<String>), Error>
where
F: TableSchema<S = String> + 'static,
@ -54,6 +56,7 @@ where
);
let get_ret = table
.get_range(
c,
partition_key,
start.clone(),
filter.clone(),

View file

@ -70,10 +70,10 @@ pub async fn handle_list_buckets(
let mut aliases = HashMap::new();
for bucket_id in ids.iter() {
let bucket = garage.bucket_table.get(&EmptyKey, bucket_id).await?;
let bucket = garage.bucket_table.get((), &EmptyKey, bucket_id).await?;
if let Some(bucket) = bucket {
for (alias, _, _active) in bucket.aliases().iter().filter(|(_, _, active)| *active) {
let alias_opt = garage.bucket_alias_table.get(&EmptyKey, alias).await?;
let alias_opt = garage.bucket_alias_table.get((), &EmptyKey, alias).await?;
if let Some(alias_ent) = alias_opt {
if *alias_ent.state.get() == Some(*bucket_id) {
aliases.insert(alias_ent.name().to_string(), *bucket_id);
@ -187,7 +187,7 @@ pub async fn handle_create_bucket(
}
let bucket = Bucket::new();
garage.bucket_table.insert(&bucket).await?;
garage.bucket_table.insert((), &bucket).await?;
helper
.set_bucket_key_permissions(bucket.id, &api_key.key_id, BucketKeyPerm::ALL_PERMISSIONS)
@ -268,7 +268,7 @@ pub async fn handle_delete_bucket(ctx: ReqCtx) -> Result<Response<ResBody>, Erro
state: Deletable::delete(),
};
// 3. delete bucket
garage.bucket_table.insert(&bucket).await?;
garage.bucket_table.insert((), &bucket).await?;
} else if is_local_alias {
// Just unalias
helper

View file

@ -9,6 +9,7 @@ use hyper::{Request, Response};
use serde::Serialize;
use garage_net::bytes_buf::BytesBuf;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::data::*;
@ -33,13 +34,15 @@ pub async fn handle_copy(
) -> Result<Response<ResBody>, Error> {
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let source_object = get_copy_source(&ctx, req).await?;
let (source_object, source_c) = get_copy_source(&ctx, req).await?;
let ReqCtx {
garage,
bucket_id: dest_bucket_id,
bucket_params: dest_bucket_params,
..
} = ctx;
let dest_c = *dest_bucket_params.consistency_mode.get();
let (source_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
@ -80,13 +83,13 @@ pub async fn handle_copy(
dest_key.to_string(),
vec![dest_object_version],
);
garage.object_table.insert(&dest_object).await?;
garage.object_table.insert(dest_c, &dest_object).await?;
}
ObjectVersionData::FirstBlock(_meta, first_block_hash) => {
// Get block list from source version
let source_version = garage
.version_table
.get(&source_version.uuid, &EmptyKey)
.get(source_c, &source_version.uuid, &EmptyKey)
.await?;
let source_version = source_version.ok_or(Error::NoSuchKey)?;
@ -106,7 +109,7 @@ pub async fn handle_copy(
dest_key.to_string(),
vec![tmp_dest_object_version],
);
garage.object_table.insert(&tmp_dest_object).await?;
garage.object_table.insert(dest_c, &tmp_dest_object).await?;
// Write version in the version table. Even with empty block list,
// this means that the BlockRef entries linked to this version cannot be
@ -120,7 +123,7 @@ pub async fn handle_copy(
},
false,
);
garage.version_table.insert(&dest_version).await?;
garage.version_table.insert(dest_c, &dest_version).await?;
// Fill in block list for version and insert block refs
for (bk, bv) in source_version.blocks.items().iter() {
@ -137,8 +140,10 @@ pub async fn handle_copy(
})
.collect::<Vec<_>>();
futures::try_join!(
garage.version_table.insert(&dest_version),
garage.block_ref_table.insert_many(&dest_block_refs[..]),
garage.version_table.insert(dest_c, &dest_version),
garage
.block_ref_table
.insert_many(dest_c, &dest_block_refs[..]),
)?;
// Insert final object
@ -160,7 +165,7 @@ pub async fn handle_copy(
dest_key.to_string(),
vec![dest_object_version],
);
garage.object_table.insert(&dest_object).await?;
garage.object_table.insert(dest_c, &dest_object).await?;
}
}
@ -193,12 +198,17 @@ pub async fn handle_upload_part_copy(
let dest_upload_id = multipart::decode_upload_id(upload_id)?;
let dest_key = dest_key.to_string();
let (source_object, (_, _, mut dest_mpu)) = futures::try_join!(
let ((source_object, source_c), (_, _, mut dest_mpu)) = futures::try_join!(
get_copy_source(&ctx, req),
multipart::get_upload(&ctx, &dest_key, &dest_upload_id)
)?;
let ReqCtx { garage, .. } = ctx;
let ReqCtx {
garage,
bucket_params: dest_bucket_params,
..
} = ctx;
let dest_c = *dest_bucket_params.consistency_mode.get();
let (source_object_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
@ -244,7 +254,7 @@ pub async fn handle_upload_part_copy(
// and destination version to check part hasn't yet been uploaded
let source_version = garage
.version_table
.get(&source_object_version.uuid, &EmptyKey)
.get(source_c, &source_object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?;
@ -304,7 +314,7 @@ pub async fn handle_upload_part_copy(
size: None,
},
);
garage.mpu_table.insert(&dest_mpu).await?;
garage.mpu_table.insert(dest_c, &dest_mpu).await?;
let mut dest_version = Version::new(
dest_version_id,
@ -390,7 +400,7 @@ pub async fn handle_upload_part_copy(
if must_upload {
garage2
.block_manager
.rpc_put_block(final_hash, data, None)
.rpc_put_block(dest_c, final_hash, data, None)
.await
} else {
Ok(())
@ -398,9 +408,9 @@ pub async fn handle_upload_part_copy(
},
async {
// Thing 2: we need to insert the block in the version
garage.version_table.insert(&dest_version).await?;
garage.version_table.insert(dest_c, &dest_version).await?;
// Thing 3: we need to add a block reference
garage.block_ref_table.insert(&block_ref).await
garage.block_ref_table.insert(dest_c, &block_ref).await
},
// Thing 4: we need to prefetch the next block
defragmenter.next(),
@ -422,7 +432,7 @@ pub async fn handle_upload_part_copy(
size: Some(current_offset),
},
);
garage.mpu_table.insert(&dest_mpu).await?;
garage.mpu_table.insert(dest_c, &dest_mpu).await?;
// LGTM
let resp_xml = s3_xml::to_xml_with_header(&CopyPartResult {
@ -440,24 +450,33 @@ pub async fn handle_upload_part_copy(
.body(string_body(resp_xml))?)
}
async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> {
async fn get_copy_source(
ctx: &ReqCtx,
req: &Request<ReqBody>,
) -> Result<(Object, ConsistencyMode), Error> {
let ReqCtx {
garage, api_key, ..
} = ctx;
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
let (source_bucket, source_key) = parse_bucket_key(&copy_source, None)?;
let (source_bucket_name, source_key) = parse_bucket_key(&copy_source, None)?;
let source_bucket_id = garage
.bucket_helper()
.resolve_bucket(&source_bucket.to_string(), api_key)
.resolve_bucket(&source_bucket_name.to_string(), api_key)
.await?;
let source_bucket = garage
.bucket_helper()
.get_existing_bucket(source_bucket_id)
.await?;
let source_bucket_state = source_bucket.state.as_option().unwrap();
let source_c = *source_bucket_state.consistency_mode.get();
if !api_key.allow_read(&source_bucket_id) {
return Err(Error::forbidden(format!(
"Reading from bucket {} not allowed for this key",
source_bucket
source_bucket_name
)));
}
@ -465,11 +484,11 @@ async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object,
let source_object = garage
.object_table
.get(&source_bucket_id, &source_key.to_string())
.get(source_c, &source_bucket_id, &source_key.to_string())
.await?
.ok_or(Error::NoSuchKey)?;
Ok(source_object)
Ok((source_object, source_c))
}
fn extract_source_info(

View file

@ -57,7 +57,7 @@ pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error>
bucket_params.cors_config.update(None);
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.insert((), &Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()
@ -91,7 +91,7 @@ pub async fn handle_put_cors(
.update(Some(conf.into_garage_cors_config()?));
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.insert((), &Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()

View file

@ -14,11 +14,16 @@ use crate::signature::verify_signed_content;
async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = ctx;
let c = *bucket_params.consistency_mode.get();
let object = garage
.object_table
.get(bucket_id, &key.to_string())
.get(c, bucket_id, &key.to_string())
.await?
.ok_or(Error::NoSuchKey)?; // No need to delete
@ -49,7 +54,7 @@ async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid),
}],
);
garage.object_table.insert(&object).await?;
garage.object_table.insert(c, &object).await?;
Ok((deleted_version, del_uuid))
}

View file

@ -14,11 +14,13 @@ use hyper::{body::Body, Request, Response, StatusCode};
use tokio::sync::mpsc;
use garage_net::stream::ByteStream;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::EmptyKey;
use garage_util::data::*;
use garage_util::error::OkOrMessage;
use garage_model::bucket_table::BucketParams;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
@ -136,7 +138,15 @@ pub async fn handle_head(
key: &str,
part_number: Option<u64>,
) -> Result<Response<ResBody>, Error> {
handle_head_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number).await
handle_head_without_ctx(
ctx.garage,
req,
ctx.bucket_id,
&ctx.bucket_params,
key,
part_number,
)
.await
}
/// Handle HEAD request for website
@ -144,12 +154,15 @@ pub async fn handle_head_without_ctx(
garage: Arc<Garage>,
req: &Request<impl Body>,
bucket_id: Uuid,
bucket_params: &BucketParams,
key: &str,
part_number: Option<u64>,
) -> Result<Response<ResBody>, Error> {
let c = *bucket_params.consistency_mode.get();
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
.get(c, &bucket_id, &key.to_string())
.await?
.ok_or(Error::NoSuchKey)?;
@ -194,7 +207,7 @@ pub async fn handle_head_without_ctx(
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
.version_table
.get(&object_version.uuid, &EmptyKey)
.get(c, &object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?;
@ -234,7 +247,16 @@ pub async fn handle_get(
part_number: Option<u64>,
overrides: GetObjectOverrides,
) -> Result<Response<ResBody>, Error> {
handle_get_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number, overrides).await
handle_get_without_ctx(
ctx.garage,
req,
ctx.bucket_id,
&ctx.bucket_params,
key,
part_number,
overrides,
)
.await
}
/// Handle GET request
@ -242,13 +264,16 @@ pub async fn handle_get_without_ctx(
garage: Arc<Garage>,
req: &Request<impl Body>,
bucket_id: Uuid,
bucket_params: &BucketParams,
key: &str,
part_number: Option<u64>,
overrides: GetObjectOverrides,
) -> Result<Response<ResBody>, Error> {
let c = *bucket_params.consistency_mode.get();
let object = garage
.object_table
.get(&bucket_id, &key.to_string())
.get(c, &bucket_id, &key.to_string())
.await?
.ok_or(Error::NoSuchKey)?;
@ -277,10 +302,11 @@ pub async fn handle_get_without_ctx(
(Some(_), Some(_)) => Err(Error::bad_request(
"Cannot specify both partNumber and Range header",
)),
(Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await,
(Some(pn), None) => handle_get_part(garage, c, last_v, last_v_data, last_v_meta, pn).await,
(None, Some(range)) => {
handle_get_range(
garage,
c,
last_v,
last_v_data,
last_v_meta,
@ -289,12 +315,15 @@ pub async fn handle_get_without_ctx(
)
.await
}
(None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await,
(None, None) => {
handle_get_full(garage, c, last_v, last_v_data, last_v_meta, overrides).await
}
}
}
async fn handle_get_full(
garage: Arc<Garage>,
c: ConsistencyMode,
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
@ -321,7 +350,7 @@ async fn handle_get_full(
match async {
let garage2 = garage.clone();
let version_fut = tokio::spawn(async move {
garage2.version_table.get(&version_uuid, &EmptyKey).await
garage2.version_table.get(c, &version_uuid, &EmptyKey).await
});
let stream_block_0 = garage
@ -362,6 +391,7 @@ async fn handle_get_full(
async fn handle_get_range(
garage: Arc<Garage>,
c: ConsistencyMode,
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
@ -394,7 +424,7 @@ async fn handle_get_range(
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
let version = garage
.version_table
.get(&version.uuid, &EmptyKey)
.get(c, &version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?;
@ -406,6 +436,7 @@ async fn handle_get_range(
async fn handle_get_part(
garage: Arc<Garage>,
c: ConsistencyMode,
object_version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
@ -432,7 +463,7 @@ async fn handle_get_part(
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
.version_table
.get(&object_version.uuid, &EmptyKey)
.get(c, &object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?;

View file

@ -44,7 +44,7 @@ pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, E
bucket_params.lifecycle_config.update(None);
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.insert((), &Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()
@ -78,7 +78,7 @@ pub async fn handle_put_lifecycle(
bucket_params.lifecycle_config.update(Some(config));
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.insert((), &Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()

View file

@ -63,11 +63,17 @@ pub async fn handle_list(
ctx: ReqCtx,
query: &ListObjectsQuery,
) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = &ctx;
let ReqCtx {
garage,
bucket_params,
..
} = &ctx;
let c = *bucket_params.consistency_mode.get();
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
t.get_range(
c,
&bucket,
key,
Some(ObjectFilter::IsData),
@ -169,12 +175,18 @@ pub async fn handle_list_multipart_upload(
ctx: ReqCtx,
query: &ListMultipartUploadsQuery,
) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = &ctx;
let ReqCtx {
garage,
bucket_params,
..
} = &ctx;
let c = *bucket_params.consistency_mode.get();
let io = |bucket, key, count| {
let t = &garage.object_table;
async move {
t.get_range(
c,
&bucket,
key,
Some(ObjectFilter::IsUploading {

View file

@ -5,6 +5,7 @@ use futures::prelude::*;
use hyper::{Request, Response};
use md5::{Digest as Md5Digest, Md5};
use garage_rpc::replication_mode::ConsistencyMode;
use garage_table::*;
use garage_util::data::*;
@ -32,9 +33,12 @@ pub async fn handle_create_multipart_upload(
garage,
bucket_id,
bucket_name,
bucket_params,
..
} = &ctx;
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
let c = *bucket_params.consistency_mode.get();
let existing_object = garage.object_table.get(c, &bucket_id, key).await?;
let upload_id = gen_uuid();
let timestamp = next_timestamp(existing_object.as_ref());
@ -51,13 +55,13 @@ pub async fn handle_create_multipart_upload(
},
};
let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&object).await?;
garage.object_table.insert(c, &object).await?;
// Create multipart upload in mpu table
// This multipart upload will hold references to uploaded parts
// (which are entries in the Version table)
let mpu = MultipartUpload::new(upload_id, timestamp, *bucket_id, key.into(), false);
garage.mpu_table.insert(&mpu).await?;
garage.mpu_table.insert(c, &mpu).await?;
// Send success response
let result = s3_xml::InitiateMultipartUploadResult {
@ -79,7 +83,12 @@ pub async fn handle_put_part(
upload_id: &str,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = &ctx;
let ReqCtx {
garage,
bucket_params,
..
} = &ctx;
let c = *bucket_params.consistency_mode.get();
let upload_id = decode_upload_id(upload_id)?;
@ -112,6 +121,7 @@ pub async fn handle_put_part(
// before everything is finished (cleanup is done using the Drop trait).
let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
garage: garage.clone(),
consistency_mode: c,
upload_id,
version_uuid,
}));
@ -126,14 +136,14 @@ pub async fn handle_put_part(
size: None,
},
);
garage.mpu_table.insert(&mpu).await?;
garage.mpu_table.insert(c, &mpu).await?;
let version = Version::new(
version_uuid,
VersionBacklink::MultipartUpload { upload_id },
false,
);
garage.version_table.insert(&version).await?;
garage.version_table.insert(c, &version).await?;
// Copy data to version
let (total_size, data_md5sum, data_sha256sum, _) =
@ -157,7 +167,7 @@ pub async fn handle_put_part(
size: Some(total_size),
},
);
garage.mpu_table.insert(&mpu).await?;
garage.mpu_table.insert(c, &mpu).await?;
// We were not interrupted, everything went fine.
// We won't have to clean up on drop.
@ -173,6 +183,7 @@ pub async fn handle_put_part(
struct InterruptedCleanup(Option<InterruptedCleanupInner>);
struct InterruptedCleanupInner {
garage: Arc<Garage>,
consistency_mode: ConsistencyMode,
upload_id: Uuid,
version_uuid: Uuid,
}
@ -193,7 +204,12 @@ impl Drop for InterruptedCleanup {
},
true,
);
if let Err(e) = info.garage.version_table.insert(&version).await {
if let Err(e) = info
.garage
.version_table
.insert(info.consistency_mode, &version)
.await
{
warn!("Cannot cleanup after aborted UploadPart: {}", e);
}
});
@ -212,8 +228,10 @@ pub async fn handle_complete_multipart_upload(
garage,
bucket_id,
bucket_name,
bucket_params,
..
} = &ctx;
let c = *bucket_params.consistency_mode.get();
let body = http_body_util::BodyExt::collect(req.into_body())
.await?
@ -279,7 +297,7 @@ pub async fn handle_complete_multipart_upload(
let grg = &garage;
let parts_versions = futures::future::try_join_all(parts.iter().map(|p| async move {
grg.version_table
.get(&p.version, &EmptyKey)
.get(c, &p.version, &EmptyKey)
.await?
.ok_or_internal_error("Part version missing from version table")
}))
@ -308,14 +326,14 @@ pub async fn handle_complete_multipart_upload(
);
}
}
garage.version_table.insert(&final_version).await?;
garage.version_table.insert(c, &final_version).await?;
let block_refs = final_version.blocks.items().iter().map(|(_, b)| BlockRef {
block: b.hash,
version: upload_id,
deleted: false.into(),
});
garage.block_ref_table.insert_many(block_refs).await?;
garage.block_ref_table.insert_many(c, block_refs).await?;
// Calculate etag of final object
// To understand how etags are calculated, read more here:
@ -336,7 +354,7 @@ pub async fn handle_complete_multipart_upload(
if let Err(e) = check_quotas(&ctx, total_size, Some(&object)).await {
object_version.state = ObjectVersionState::Aborted;
let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
garage.object_table.insert(c, &final_object).await?;
return Err(e);
}
@ -352,7 +370,7 @@ pub async fn handle_complete_multipart_upload(
));
let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
garage.object_table.insert(c, &final_object).await?;
// Send response saying ok we're done
let result = s3_xml::CompleteMultipartUploadResult {
@ -373,8 +391,12 @@ pub async fn handle_abort_multipart_upload(
upload_id: &str,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = &ctx;
let c = *bucket_params.consistency_mode.get();
let upload_id = decode_upload_id(upload_id)?;
@ -382,7 +404,7 @@ pub async fn handle_abort_multipart_upload(
object_version.state = ObjectVersionState::Aborted;
let final_object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
garage.object_table.insert(c, &final_object).await?;
Ok(Response::new(empty_body()))
}
@ -396,13 +418,21 @@ pub(crate) async fn get_upload(
upload_id: &Uuid,
) -> Result<(Object, ObjectVersion, MultipartUpload), Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = ctx;
let c = *bucket_params.consistency_mode.get();
let (object, mpu) = futures::try_join!(
garage.object_table.get(bucket_id, key).map_err(Error::from),
garage
.object_table
.get(c, bucket_id, key)
.map_err(Error::from),
garage
.mpu_table
.get(upload_id, &EmptyKey)
.get(c, upload_id, &EmptyKey)
.map_err(Error::from),
)?;

View file

@ -20,6 +20,7 @@ use opentelemetry::{
};
use garage_net::bytes_buf::BytesBuf;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::async_hash::*;
@ -71,13 +72,20 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256: Option<FixedBytes32>,
) -> Result<(Uuid, String), Error> {
let ReqCtx {
garage, bucket_id, ..
garage,
bucket_id,
bucket_params,
..
} = ctx;
let c = *bucket_params.consistency_mode.get();
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let (first_block_opt, existing_object) = try_join!(
chunker.next(),
garage.object_table.get(bucket_id, key).map_err(Error::from),
garage
.object_table
.get(c, bucket_id, key)
.map_err(Error::from),
)?;
let first_block = first_block_opt.unwrap_or_default();
@ -120,7 +128,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
};
let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
garage.object_table.insert(c, &object).await?;
return Ok((version_uuid, data_md5sum_hex));
}
@ -131,6 +139,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
garage: garage.clone(),
bucket_id: *bucket_id,
consistency_mode: c,
key: key.into(),
version_uuid,
version_timestamp,
@ -147,7 +156,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
},
};
let object = Object::new(*bucket_id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
garage.object_table.insert(c, &object).await?;
// Initialize corresponding entry in version table
// Write this entry now, even with empty block list,
@ -161,7 +170,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
},
false,
);
garage.version_table.insert(&version).await?;
garage.version_table.insert(c, &version).await?;
// Transfer data and verify checksum
let (total_size, data_md5sum, data_sha256sum, first_block_hash) =
@ -187,7 +196,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
first_block_hash,
));
let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
garage.object_table.insert(c, &object).await?;
// We were not interrupted, everything went fine.
// We won't have to clean up on drop.
@ -235,6 +244,7 @@ pub(crate) async fn check_quotas(
bucket_params,
..
} = ctx;
let c = *bucket_params.consistency_mode.get();
let quotas = bucket_params.quotas.get();
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
@ -244,7 +254,7 @@ pub(crate) async fn check_quotas(
let counters = garage
.object_counter_table
.table
.get(bucket_id, &EmptyKey)
.get(c, bucket_id, &EmptyKey)
.await?;
let counters = counters
@ -451,7 +461,12 @@ async fn put_block_and_meta(
block: Bytes,
order_tag: OrderTag,
) -> Result<(), GarageError> {
let ReqCtx { garage, .. } = ctx;
let ReqCtx {
garage,
bucket_params,
..
} = ctx;
let c = *bucket_params.consistency_mode.get();
let mut version = version.clone();
version.blocks.put(
@ -474,9 +489,9 @@ async fn put_block_and_meta(
futures::try_join!(
garage
.block_manager
.rpc_put_block(hash, block, Some(order_tag)),
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
.rpc_put_block(c, hash, block, Some(order_tag)),
garage.version_table.insert(c, &version),
garage.block_ref_table.insert(c, &block_ref),
)?;
Ok(())
}
@ -529,6 +544,7 @@ struct InterruptedCleanup(Option<InterruptedCleanupInner>);
struct InterruptedCleanupInner {
garage: Arc<Garage>,
bucket_id: Uuid,
consistency_mode: ConsistencyMode,
key: String,
version_uuid: Uuid,
version_timestamp: u64,
@ -549,7 +565,12 @@ impl Drop for InterruptedCleanup {
state: ObjectVersionState::Aborted,
};
let object = Object::new(info.bucket_id, info.key, vec![object_version]);
if let Err(e) = info.garage.object_table.insert(&object).await {
if let Err(e) = info
.garage
.object_table
.insert(info.consistency_mode, &object)
.await
{
warn!("Cannot cleanup after aborted PutObject: {}", e);
}
});

View file

@ -49,7 +49,7 @@ pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Err
bucket_params.website_config.update(None);
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.insert((), &Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()
@ -83,7 +83,7 @@ pub async fn handle_put_website(
.update(Some(conf.into_garage_website_config()?));
garage
.bucket_table
.insert(&Bucket::present(bucket_id, bucket_params))
.insert((), &Bucket::present(bucket_id, bucket_params))
.await?;
Ok(Response::builder()

View file

@ -370,7 +370,7 @@ pub async fn verify_v4(
let key = garage
.key_table
.get(&EmptyKey, &auth.key_id)
.get((), &EmptyKey, &auth.key_id)
.await?
.filter(|k| !k.state.is_deleted())
.ok_or_else(|| Error::forbidden(format!("No such key: {}", &auth.key_id)))?;

View file

@ -29,6 +29,7 @@ use garage_util::metrics::RecordDuration;
use garage_util::persister::{Persister, PersisterShared};
use garage_util::time::msec_to_rfc3339;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System;
use garage_rpc::*;
@ -343,6 +344,7 @@ impl BlockManager {
/// Send block to nodes that should have it
pub async fn rpc_put_block(
&self,
consistency_mode: ConsistencyMode,
hash: Hash,
data: Bytes,
order_tag: Option<OrderTag>,
@ -367,7 +369,7 @@ impl BlockManager {
who.as_ref(),
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_quorum(self.system.layout_manager.write_quorum()),
.with_quorum(self.system.layout_manager.write_quorum(consistency_mode)),
)
.await?;

View file

@ -380,7 +380,12 @@ impl BlockResyncManager {
.layout_manager
.layout()
.storage_nodes_of(hash);
if who.len() < manager.system.layout_manager.write_quorum() {
if who.len()
< manager
.system
.layout_manager
.write_quorum(Default::default())
{
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
}
who.retain(|id| *id != manager.system.id);

View file

@ -30,7 +30,14 @@ impl AdminRpcHandler {
let block_refs = self
.garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.get_range(
Default::default(),
&hash,
None,
None,
10000,
Default::default(),
)
.await?;
let mut versions = vec![];
let mut uploads = vec![];
@ -38,11 +45,16 @@ impl AdminRpcHandler {
if let Some(v) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.get(Default::default(), &br.version, &EmptyKey)
.await?
{
if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink {
if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
if let Some(u) = self
.garage
.mpu_table
.get(Default::default(), upload_id, &EmptyKey)
.await?
{
uploads.push(u);
}
}
@ -108,14 +120,21 @@ impl AdminRpcHandler {
let block_refs = self
.garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.get_range(
Default::default(),
&hash,
None,
None,
10000,
Default::default(),
)
.await?;
for br in block_refs {
if let Some(version) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.get(Default::default(), &br.version, &EmptyKey)
.await?
{
self.handle_block_purge_version_backlink(
@ -127,7 +146,10 @@ impl AdminRpcHandler {
if !version.deleted.get() {
let deleted_version = Version::new(version.uuid, version.backlink, true);
self.garage.version_table.insert(&deleted_version).await?;
self.garage
.version_table
.insert(Default::default(), &deleted_version)
.await?;
ver_dels += 1;
}
}
@ -152,11 +174,19 @@ impl AdminRpcHandler {
let (bucket_id, key, ov_id) = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
VersionBacklink::MultipartUpload { upload_id } => {
if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
if let Some(mut mpu) = self
.garage
.mpu_table
.get(Default::default(), upload_id, &EmptyKey)
.await?
{
if !mpu.deleted.get() {
mpu.parts.clear();
mpu.deleted.set();
self.garage.mpu_table.insert(&mpu).await?;
self.garage
.mpu_table
.insert(Default::default(), &mpu)
.await?;
*mpu_dels += 1;
}
(mpu.bucket_id, mpu.key.clone(), *upload_id)
@ -166,7 +196,12 @@ impl AdminRpcHandler {
}
};
if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? {
if let Some(object) = self
.garage
.object_table
.get(Default::default(), &bucket_id, &key)
.await?
{
let ov = object.versions().iter().rev().find(|v| v.is_complete());
if let Some(ov) = ov {
if ov.uuid == ov_id {
@ -180,7 +215,10 @@ impl AdminRpcHandler {
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
self.garage.object_table.insert(&deleted_object).await?;
self.garage
.object_table
.insert(Default::default(), &deleted_object)
.await?;
*obj_dels += 1;
}
}

View file

@ -39,6 +39,7 @@ impl AdminRpcHandler {
.garage
.bucket_table
.get_range(
(),
&EmptyKey,
None,
Some(DeletedFilter::NotDeleted),
@ -63,12 +64,14 @@ impl AdminRpcHandler {
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket_params = bucket.state.as_option().unwrap();
let c = *bucket_params.consistency_mode.get();
let counters = self
.garage
.object_counter_table
.table
.get(&bucket_id, &EmptyKey)
.get(c, &bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&self.garage.system.cluster_layout()))
.unwrap_or_default();
@ -77,42 +80,28 @@ impl AdminRpcHandler {
.garage
.mpu_counter_table
.table
.get(&bucket_id, &EmptyKey)
.get(c, &bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&self.garage.system.cluster_layout()))
.unwrap_or_default();
let mut relevant_keys = HashMap::new();
for (k, _) in bucket
.state
.as_option()
.unwrap()
.authorized_keys
.items()
.iter()
{
for (k, _) in bucket_params.authorized_keys.items().iter() {
if let Some(key) = self
.garage
.key_table
.get(&EmptyKey, k)
.get((), &EmptyKey, k)
.await?
.filter(|k| !k.is_deleted())
{
relevant_keys.insert(k.clone(), key);
}
}
for ((k, _), _, _) in bucket
.state
.as_option()
.unwrap()
.local_aliases
.items()
.iter()
{
for ((k, _), _, _) in bucket_params.local_aliases.items().iter() {
if relevant_keys.contains_key(k) {
continue;
}
if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? {
if let Some(key) = self.garage.key_table.get((), &EmptyKey, k).await? {
relevant_keys.insert(k.clone(), key);
}
}
@ -136,7 +125,12 @@ impl AdminRpcHandler {
let helper = self.garage.locked_helper().await;
if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
if let Some(alias) = self
.garage
.bucket_alias_table
.get((), &EmptyKey, name)
.await?
{
if alias.state.get().is_some() {
return Err(Error::BadRequest(format!("Bucket {} already exists", name)));
}
@ -145,7 +139,7 @@ impl AdminRpcHandler {
// ---- done checking, now commit ----
let bucket = Bucket::new();
self.garage.bucket_table.insert(&bucket).await?;
self.garage.bucket_table.insert((), &bucket).await?;
helper.set_global_bucket_alias(bucket.id, name).await?;
@ -170,7 +164,7 @@ impl AdminRpcHandler {
let bucket_alias = self
.garage
.bucket_alias_table
.get(&EmptyKey, &query.name)
.get((), &EmptyKey, &query.name)
.await?;
// Check bucket doesn't have other aliases
@ -225,7 +219,7 @@ impl AdminRpcHandler {
// 3. delete bucket
bucket.state = Deletable::delete();
self.garage.bucket_table.insert(&bucket).await?;
self.garage.bucket_table.insert((), &bucket).await?;
Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
}
@ -405,7 +399,7 @@ impl AdminRpcHandler {
};
bucket_state.website_config.update(website);
self.garage.bucket_table.insert(&bucket).await?;
self.garage.bucket_table.insert((), &bucket).await?;
let msg = if query.allow {
format!("Website access allowed for {}", &query.bucket)
@ -462,7 +456,7 @@ impl AdminRpcHandler {
}
bucket_state.quotas.update(quotas);
self.garage.bucket_table.insert(&bucket).await?;
self.garage.bucket_table.insert((), &bucket).await?;
Ok(AdminRpc::Ok(format!(
"Quotas updated for {}",

View file

@ -28,6 +28,7 @@ impl AdminRpcHandler {
.garage
.key_table
.get_range(
(),
&EmptyKey,
None,
Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
@ -57,7 +58,7 @@ impl AdminRpcHandler {
async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> {
let key = Key::new(&query.name);
self.garage.key_table.insert(&key).await?;
self.garage.key_table.insert((), &key).await?;
self.key_info_result(key).await
}
@ -71,7 +72,7 @@ impl AdminRpcHandler {
.unwrap()
.name
.update(query.new_name.clone());
self.garage.key_table.insert(&key).await?;
self.garage.key_table.insert((), &key).await?;
self.key_info_result(key).await
}
@ -106,7 +107,7 @@ impl AdminRpcHandler {
if query.create_bucket {
key.params_mut().unwrap().allow_create_bucket.update(true);
}
self.garage.key_table.insert(&key).await?;
self.garage.key_table.insert((), &key).await?;
self.key_info_result(key).await
}
@ -119,7 +120,7 @@ impl AdminRpcHandler {
if query.create_bucket {
key.params_mut().unwrap().allow_create_bucket.update(false);
}
self.garage.key_table.insert(&key).await?;
self.garage.key_table.insert((), &key).await?;
self.key_info_result(key).await
}
@ -128,14 +129,18 @@ impl AdminRpcHandler {
return Err(Error::BadRequest("This command is intended to re-import keys that were previously generated by Garage. If you want to create a new key, use `garage key new` instead. Add the --yes flag if you really want to re-import a key.".to_string()));
}
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
let prev_key = self
.garage
.key_table
.get((), &EmptyKey, &query.key_id)
.await?;
if prev_key.is_some() {
return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
}
let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name)
.ok_or_bad_request("Invalid key format")?;
self.garage.key_table.insert(&imported_key).await?;
self.garage.key_table.insert((), &imported_key).await?;
self.key_info_result(imported_key).await
}
@ -151,7 +156,7 @@ impl AdminRpcHandler {
.items()
.iter()
{
if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? {
if let Some(b) = self.garage.bucket_table.get((), &EmptyKey, id).await? {
relevant_buckets.insert(*id, b);
}
}

View file

@ -176,7 +176,7 @@ impl TableRepair for RepairVersions {
let ref_exists = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => garage
.object_table
.get(bucket_id, key)
.get(Default::default(), bucket_id, key)
.await?
.map(|o| {
o.versions().iter().any(|x| {
@ -186,7 +186,7 @@ impl TableRepair for RepairVersions {
.unwrap_or(false),
VersionBacklink::MultipartUpload { upload_id } => garage
.mpu_table
.get(upload_id, &EmptyKey)
.get(Default::default(), upload_id, &EmptyKey)
.await?
.map(|u| !u.deleted.get())
.unwrap_or(false),
@ -196,7 +196,10 @@ impl TableRepair for RepairVersions {
info!("Repair versions: marking version as deleted: {:?}", version);
garage
.version_table
.insert(&Version::new(version.uuid, version.backlink, true))
.insert(
Default::default(),
&Version::new(version.uuid, version.backlink, true),
)
.await?;
return Ok(true);
}
@ -222,7 +225,7 @@ impl TableRepair for RepairBlockRefs {
if !block_ref.deleted.get() {
let ref_exists = garage
.version_table
.get(&block_ref.version, &EmptyKey)
.get(Default::default(), &block_ref.version, &EmptyKey)
.await?
.map(|v| !v.deleted.get())
.unwrap_or(false);
@ -233,7 +236,10 @@ impl TableRepair for RepairBlockRefs {
block_ref
);
block_ref.deleted.set();
garage.block_ref_table.insert(&block_ref).await?;
garage
.block_ref_table
.insert(Default::default(), &block_ref)
.await?;
return Ok(true);
}
}
@ -258,7 +264,7 @@ impl TableRepair for RepairMpu {
if !mpu.deleted.get() {
let ref_exists = garage
.object_table
.get(&mpu.bucket_id, &mpu.key)
.get(Default::default(), &mpu.bucket_id, &mpu.key)
.await?
.map(|o| {
o.versions()
@ -274,7 +280,7 @@ impl TableRepair for RepairMpu {
);
mpu.parts.clear();
mpu.deleted.set();
garage.mpu_table.insert(&mpu).await?;
garage.mpu_table.insert(Default::default(), &mpu).await?;
return Ok(true);
}
}

View file

@ -1,3 +1,4 @@
use garage_rpc::replication_mode::ConsistencyMode;
use garage_table::crdt::*;
use garage_table::*;
use garage_util::data::*;
@ -119,7 +120,94 @@ mod v08 {
impl garage_util::migrate::InitialFormat for Bucket {}
}
pub use v08::*;
mod v011 {
use super::v08;
pub use super::v08::{
BucketQuotas, CorsRule, LifecycleExpiration, LifecycleFilter, LifecycleRule, WebsiteConfig,
};
use crate::permission::BucketKeyPerm;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_util::crdt;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
/// A bucket is a collection of objects
///
/// Its parameters are not directly accessible as:
/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
/// ID of the bucket
pub id: Uuid,
/// State, and configuration if not deleted, of the bucket
pub state: crdt::Deletable<BucketParams>,
}
/// Configuration for a bucket
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
/// Bucket's creation date
pub creation_date: u64,
/// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::Map<String, BucketKeyPerm>,
/// Map of aliases that are or have been given to this bucket
/// in the global namespace
/// (not authoritative: this is just used as an indication to
/// map back to aliases when doing ListBuckets)
pub aliases: crdt::LwwMap<String, bool>,
/// Map of aliases that are or have been given to this bucket
/// in namespaces local to keys
/// key = (access key id, alias name)
pub local_aliases: crdt::LwwMap<(String, String), bool>,
/// Whether to enable read-after-write consistency for this bucket
pub consistency_mode: crdt::Lww<ConsistencyMode>,
/// Whether this bucket is allowed for website access
/// (under all of its global alias names),
/// and if so, the website configuration XML document
pub website_config: crdt::Lww<Option<WebsiteConfig>>,
/// CORS rules
pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
/// Lifecycle configuration
#[serde(default)]
pub lifecycle_config: crdt::Lww<Option<Vec<LifecycleRule>>>,
/// Bucket quotas
#[serde(default)]
pub quotas: crdt::Lww<BucketQuotas>,
}
impl garage_util::migrate::Migrate for Bucket {
const VERSION_MARKER: &'static [u8] = b"G011lh";
type Previous = v08::Bucket;
fn migrate(previous: Self::Previous) -> Self {
Self {
id: previous.id,
state: match previous.state {
crdt::Deletable::Present(prev_state) => {
crdt::Deletable::Present(BucketParams {
creation_date: prev_state.creation_date,
authorized_keys: prev_state.authorized_keys,
aliases: prev_state.aliases,
local_aliases: prev_state.local_aliases,
website_config: prev_state.website_config,
cors_config: prev_state.cors_config,
lifecycle_config: prev_state.lifecycle_config,
quotas: prev_state.quotas,
consistency_mode: crdt::Lww::new(ConsistencyMode::Consistent),
})
}
crdt::Deletable::Deleted => crdt::Deletable::Deleted,
},
}
}
}
}
pub use v011::*;
impl AutoCrdt for BucketQuotas {
const WARN_IF_DIFFERENT: bool = true;
@ -133,6 +221,7 @@ impl BucketParams {
authorized_keys: crdt::Map::new(),
aliases: crdt::LwwMap::new(),
local_aliases: crdt::LwwMap::new(),
consistency_mode: crdt::Lww::new(ConsistencyMode::Consistent),
website_config: crdt::Lww::new(None),
cors_config: crdt::Lww::new(None),
lifecycle_config: crdt::Lww::new(None),

View file

@ -36,7 +36,7 @@ impl<'a> BucketHelper<'a> {
Ok(self
.0
.bucket_table
.get(&EmptyKey, &bucket_id)
.get((), &EmptyKey, &bucket_id)
.await?
.filter(|x| !x.state.is_deleted())
.map(|_| bucket_id))
@ -44,7 +44,7 @@ impl<'a> BucketHelper<'a> {
Ok(self
.0
.bucket_alias_table
.get(&EmptyKey, bucket_name)
.get((), &EmptyKey, bucket_name)
.await?
.and_then(|x| *x.state.get()))
}
@ -74,7 +74,7 @@ impl<'a> BucketHelper<'a> {
Ok(self
.0
.bucket_table
.get(&EmptyKey, &bucket_id)
.get((), &EmptyKey, &bucket_id)
.await?
.ok_or_message(format!("Bucket {:?} does not exist", bucket_id))?)
}
@ -86,7 +86,7 @@ impl<'a> BucketHelper<'a> {
pub async fn get_existing_bucket(&self, bucket_id: Uuid) -> Result<Bucket, Error> {
self.0
.bucket_table
.get(&EmptyKey, &bucket_id)
.get((), &EmptyKey, &bucket_id)
.await?
.filter(|b| !b.is_deleted())
.ok_or_else(|| Error::NoSuchBucket(hex::encode(bucket_id)))
@ -95,10 +95,20 @@ impl<'a> BucketHelper<'a> {
// ----
pub async fn is_bucket_empty(&self, bucket_id: Uuid) -> Result<bool, Error> {
let consistency_mode = *self
.get_existing_bucket(bucket_id)
.await?
.state
.as_option()
.unwrap()
.consistency_mode
.get();
let objects = self
.0
.object_table
.get_range(
consistency_mode,
&bucket_id,
None,
Some(ObjectFilter::IsData),
@ -124,6 +134,7 @@ impl<'a> BucketHelper<'a> {
.counter_table
.table
.get_range(
consistency_mode,
&bucket_id,
None,
Some((DeletedFilter::NotDeleted, node_id_vec)),
@ -151,6 +162,12 @@ impl<'a> BucketHelper<'a> {
older_than: Duration,
) -> Result<usize, Error> {
let older_than = now_msec() - older_than.as_millis() as u64;
let consistency_mode = self
.get_existing_bucket(*bucket_id)
.await?
.params()
.map(|params| *params.consistency_mode.get())
.unwrap_or_default();
let mut ret = 0usize;
let mut start = None;
@ -160,6 +177,7 @@ impl<'a> BucketHelper<'a> {
.0
.object_table
.get_range(
consistency_mode,
bucket_id,
start,
Some(ObjectFilter::IsUploading {
@ -196,7 +214,10 @@ impl<'a> BucketHelper<'a> {
.collect::<Vec<_>>();
ret += abortions.len();
self.0.object_table.insert_many(abortions).await?;
self.0
.object_table
.insert_many(consistency_mode, abortions)
.await?;
if objects.len() < 1000 {
break;

View file

@ -16,7 +16,7 @@ impl<'a> KeyHelper<'a> {
Ok(self
.0
.key_table
.get(&EmptyKey, key_id)
.get((), &EmptyKey, key_id)
.await?
.ok_or_message(format!("Key {} does not exist", key_id))?)
}
@ -28,7 +28,7 @@ impl<'a> KeyHelper<'a> {
pub async fn get_existing_key(&self, key_id: &String) -> Result<Key, Error> {
self.0
.key_table
.get(&EmptyKey, key_id)
.get((), &EmptyKey, key_id)
.await?
.filter(|b| !b.state.is_deleted())
.ok_or_else(|| Error::NoSuchAccessKey(key_id.to_string()))
@ -44,6 +44,7 @@ impl<'a> KeyHelper<'a> {
.0
.key_table
.get_range(
(),
&EmptyKey,
None,
Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())),

View file

@ -56,7 +56,11 @@ impl<'a> LockedHelper<'a> {
let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?;
let alias = self.0.bucket_alias_table.get(&EmptyKey, alias_name).await?;
let alias = self
.0
.bucket_alias_table
.get((), &EmptyKey, alias_name)
.await?;
if let Some(existing_alias) = alias.as_ref() {
if let Some(p_bucket) = existing_alias.state.get() {
@ -88,10 +92,10 @@ impl<'a> LockedHelper<'a> {
a
}
};
self.0.bucket_alias_table.insert(&alias).await?;
self.0.bucket_alias_table.insert((), &alias).await?;
bucket_p.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, true);
self.0.bucket_table.insert(&bucket).await?;
self.0.bucket_table.insert((), &bucket).await?;
Ok(())
}
@ -112,7 +116,7 @@ impl<'a> LockedHelper<'a> {
let mut alias = self
.0
.bucket_alias_table
.get(&EmptyKey, alias_name)
.get((), &EmptyKey, alias_name)
.await?
.filter(|a| a.state.get().map(|x| x == bucket_id).unwrap_or(false))
.ok_or_message(format!(
@ -144,10 +148,10 @@ impl<'a> LockedHelper<'a> {
// writes are now done and all writes use timestamp alias_ts
alias.state = Lww::raw(alias_ts, None);
self.0.bucket_alias_table.insert(&alias).await?;
self.0.bucket_alias_table.insert((), &alias).await?;
bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
self.0.bucket_table.insert((), &bucket).await?;
Ok(())
}
@ -168,7 +172,7 @@ impl<'a> LockedHelper<'a> {
let mut alias = self
.0
.bucket_alias_table
.get(&EmptyKey, alias_name)
.get((), &EmptyKey, alias_name)
.await?
.ok_or_else(|| Error::NoSuchBucket(alias_name.to_string()))?;
@ -186,12 +190,12 @@ impl<'a> LockedHelper<'a> {
if alias.state.get() == &Some(bucket_id) {
alias.state = Lww::raw(alias_ts, None);
self.0.bucket_alias_table.insert(&alias).await?;
self.0.bucket_alias_table.insert((), &alias).await?;
}
if let Some(bucket_state) = bucket.state.as_option_mut() {
bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
self.0.bucket_table.insert((), &bucket).await?;
}
Ok(())
@ -245,10 +249,10 @@ impl<'a> LockedHelper<'a> {
// writes are now done and all writes use timestamp alias_ts
key_param.local_aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, Some(bucket_id));
self.0.key_table.insert(&key).await?;
self.0.key_table.insert((), &key).await?;
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true);
self.0.bucket_table.insert(&bucket).await?;
self.0.bucket_table.insert((), &bucket).await?;
Ok(())
}
@ -317,10 +321,10 @@ impl<'a> LockedHelper<'a> {
// writes are now done and all writes use timestamp alias_ts
key_param.local_aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, None);
self.0.key_table.insert(&key).await?;
self.0.key_table.insert((), &key).await?;
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
self.0.bucket_table.insert((), &bucket).await?;
Ok(())
}
@ -365,12 +369,12 @@ impl<'a> LockedHelper<'a> {
if let Some(bstate) = bucket.state.as_option_mut() {
bstate.authorized_keys = Map::put_mutator(key_id.clone(), perm);
self.0.bucket_table.insert(&bucket).await?;
self.0.bucket_table.insert((), &bucket).await?;
}
if let Some(kstate) = key.state.as_option_mut() {
kstate.authorized_buckets = Map::put_mutator(bucket_id, perm);
self.0.key_table.insert(&key).await?;
self.0.key_table.insert((), &key).await?;
}
Ok(())
@ -403,7 +407,7 @@ impl<'a> LockedHelper<'a> {
// 3. Actually delete key
key.state = Deletable::delete();
self.0.key_table.insert(key).await?;
self.0.key_table.insert((), key).await?;
Ok(())
}

View file

@ -23,6 +23,7 @@ use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::now_msec;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_rpc::system::System;
use garage_rpc::*;
@ -43,8 +44,8 @@ const TIMESTAMP_KEY: &[u8] = b"timestamp";
#[derive(Debug, Serialize, Deserialize)]
enum K2VRpc {
Ok,
InsertItem(InsertedItem),
InsertManyItems(Vec<InsertedItem>),
InsertItem(ConsistencyMode, InsertedItem),
InsertManyItems(ConsistencyMode, Vec<InsertedItem>),
PollItem {
key: PollKey,
causal_context: CausalContext,
@ -113,6 +114,7 @@ impl K2VRpcHandler {
pub async fn insert(
&self,
consistency_mode: ConsistencyMode,
bucket_id: Uuid,
partition_key: String,
sort_key: String,
@ -135,12 +137,15 @@ impl K2VRpcHandler {
.try_call_many(
&self.endpoint,
&who,
K2VRpc::InsertItem(InsertedItem {
partition,
sort_key,
causal_context,
value,
}),
K2VRpc::InsertItem(
consistency_mode,
InsertedItem {
partition,
sort_key,
causal_context,
value,
},
),
RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1),
)
.await?;
@ -150,6 +155,7 @@ impl K2VRpcHandler {
pub async fn insert_batch(
&self,
consistency_mode: ConsistencyMode,
bucket_id: Uuid,
items: Vec<(String, String, Option<CausalContext>, DvvsValue)>,
) -> Result<(), Error> {
@ -189,7 +195,7 @@ impl K2VRpcHandler {
.try_call_many(
&self.endpoint,
&nodes[..],
K2VRpc::InsertManyItems(items),
K2VRpc::InsertManyItems(consistency_mode, items),
RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1),
)
.await?;
@ -206,6 +212,7 @@ impl K2VRpcHandler {
pub async fn poll_item(
&self,
consistency_mode: ConsistencyMode,
bucket_id: Uuid,
partition_key: String,
sort_key: String,
@ -235,7 +242,12 @@ impl K2VRpcHandler {
timeout_msec,
},
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.item_table.data.replication.read_quorum())
.with_quorum(
self.item_table
.data
.replication
.read_quorum(consistency_mode),
)
.send_all_at_once(true)
.without_timeout(),
);
@ -266,6 +278,7 @@ impl K2VRpcHandler {
pub async fn poll_range(
&self,
consistency_mode: ConsistencyMode,
range: PollRange,
seen_str: Option<String>,
timeout_msec: u64,
@ -288,7 +301,11 @@ impl K2VRpcHandler {
.data
.replication
.read_nodes(&range.partition.hash());
let quorum = self.item_table.data.replication.read_quorum();
let quorum = self
.item_table
.data
.replication
.read_quorum(consistency_mode);
let msg = K2VRpc::PollRange {
range,
seen_str,
@ -376,7 +393,11 @@ impl K2VRpcHandler {
// ---- internal handlers ----
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
async fn handle_insert(
&self,
c: ConsistencyMode,
item: &InsertedItem,
) -> Result<K2VRpc, Error> {
let new = {
let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
self.local_insert(&local_timestamp_tree, item)?
@ -384,13 +405,17 @@ impl K2VRpcHandler {
// Propagate to rest of network
if let Some(updated) = new {
self.item_table.insert(&updated).await?;
self.item_table.insert(c, &updated).await?;
}
Ok(K2VRpc::Ok)
}
async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
async fn handle_insert_many(
&self,
c: ConsistencyMode,
items: &[InsertedItem],
) -> Result<K2VRpc, Error> {
let mut updated_vec = vec![];
{
@ -406,7 +431,7 @@ impl K2VRpcHandler {
// Propagate to rest of network
if !updated_vec.is_empty() {
self.item_table.insert_many(&updated_vec).await?;
self.item_table.insert_many(c, &updated_vec).await?;
}
Ok(K2VRpc::Ok)
@ -546,8 +571,8 @@ impl K2VRpcHandler {
impl EndpointHandler<K2VRpc> for K2VRpcHandler {
async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> {
match message {
K2VRpc::InsertItem(item) => self.handle_insert(item).await,
K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await,
K2VRpc::InsertItem(c, item) => self.handle_insert(*c, item).await,
K2VRpc::InsertManyItems(c, items) => self.handle_insert_many(*c, &items[..]).await,
K2VRpc::PollItem {
key,
causal_context,

View file

@ -1,5 +1,6 @@
use std::sync::Arc;
use garage_rpc::replication_mode::ConsistencyMode;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::encode::nonversioned_decode;
@ -71,19 +72,23 @@ impl Migrate {
self.garage
.bucket_table
.insert(&Bucket {
id: bucket_id,
state: Deletable::Present(BucketParams {
creation_date: now_msec(),
authorized_keys: Map::new(),
aliases: LwwMap::new(),
local_aliases: LwwMap::new(),
website_config: Lww::new(website),
cors_config: Lww::new(None),
lifecycle_config: Lww::new(None),
quotas: Lww::new(Default::default()),
}),
})
.insert(
(),
&Bucket {
id: bucket_id,
state: Deletable::Present(BucketParams {
creation_date: now_msec(),
authorized_keys: Map::new(),
aliases: LwwMap::new(),
local_aliases: LwwMap::new(),
consistency_mode: Lww::new(ConsistencyMode::Consistent),
website_config: Lww::new(website),
cors_config: Lww::new(None),
lifecycle_config: Lww::new(None),
quotas: Lww::new(Default::default()),
}),
},
)
.await?;
helper.set_global_bucket_alias(bucket_id, &new_name).await?;

View file

@ -253,7 +253,7 @@ async fn process_object(
_ => {
match garage
.bucket_table
.get(&EmptyKey, &object.bucket_id)
.get((), &EmptyKey, &object.bucket_id)
.await?
{
Some(b) => b,

View file

@ -139,12 +139,14 @@ impl LayoutManager {
}
}
pub fn read_quorum(self: &Arc<Self>) -> usize {
self.replication_factor.read_quorum(self.consistency_mode)
pub fn read_quorum(self: &Arc<Self>, bucket_consistency_mode: ConsistencyMode) -> usize {
self.replication_factor
.read_quorum(bucket_consistency_mode.min(self.consistency_mode))
}
pub fn write_quorum(self: &Arc<Self>) -> usize {
self.replication_factor.write_quorum(self.consistency_mode)
pub fn write_quorum(self: &Arc<Self>, bucket_consistency_mode: ConsistencyMode) -> usize {
self.replication_factor
.write_quorum(bucket_consistency_mode.min(self.consistency_mode))
}
// ---- ACK LOCKING ----

View file

@ -51,7 +51,7 @@ impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
return Ok(WorkerState::Idle);
}
self.0.insert_many(values).await?;
self.0.insert_many(Default::default(), values).await?;
self.0.data.insert_queue.db().transaction(|tx| {
for (k, v) in kv_pairs.iter() {

View file

@ -25,6 +25,7 @@ pub struct TableFullReplication {
impl TableReplication for TableFullReplication {
type WriteSets = Vec<Vec<Uuid>>;
type ConsistencyParam = ();
fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
let layout = self.system.cluster_layout();
@ -34,14 +35,14 @@ impl TableReplication for TableFullReplication {
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
}
fn read_quorum(&self) -> usize {
fn read_quorum(&self, _: ()) -> usize {
1
}
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
vec![self.storage_nodes(hash)]
}
fn write_quorum(&self) -> usize {
fn write_quorum(&self, _: ()) -> usize {
let nmembers = self.system.cluster_layout().current().all_nodes().len();
let max_faults = if nmembers > 1 { 1 } else { 0 };

View file

@ -4,6 +4,7 @@ use garage_util::data::*;
/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync + 'static {
type WriteSets: AsRef<Vec<Vec<Uuid>>> + AsMut<Vec<Vec<Uuid>>> + Send + Sync + 'static;
type ConsistencyParam: Send + Default;
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
@ -14,12 +15,12 @@ pub trait TableReplication: Send + Sync + 'static {
/// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
fn read_quorum(&self, consistency_param: Self::ConsistencyParam) -> usize;
/// Which nodes to send writes to
fn write_sets(&self, hash: &Hash) -> Self::WriteSets;
/// Responses needed to consider a write succesfull in each set
fn write_quorum(&self) -> usize;
fn write_quorum(&self, consistency_param: Self::ConsistencyParam) -> usize;
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash

View file

@ -2,6 +2,7 @@ use std::sync::Arc;
use garage_rpc::layout::manager::LayoutManager;
use garage_rpc::layout::*;
use garage_rpc::replication_mode::*;
use garage_util::data::*;
use crate::replication::*;
@ -20,6 +21,7 @@ pub struct TableShardedReplication {
impl TableReplication for TableShardedReplication {
type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
type ConsistencyParam = ConsistencyMode;
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
self.layout_manager.layout().storage_nodes_of(hash)
@ -28,15 +30,15 @@ impl TableReplication for TableShardedReplication {
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
self.layout_manager.layout().read_nodes_of(hash)
}
fn read_quorum(&self) -> usize {
self.layout_manager.read_quorum()
fn read_quorum(&self, c: ConsistencyMode) -> usize {
self.layout_manager.read_quorum(c)
}
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
self.layout_manager.write_sets_of(hash)
}
fn write_quorum(&self) -> usize {
self.layout_manager.write_quorum()
fn write_quorum(&self, c: ConsistencyMode) -> usize {
self.layout_manager.write_quorum(c)
}
fn partition_of(&self, hash: &Hash) -> Partition {

View file

@ -118,7 +118,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
);
let mut result_tracker = QuorumSetResultTracker::new(
&partition.storage_sets,
self.data.replication.write_quorum(),
self.data.replication.write_quorum(Default::default()),
);
let mut sync_futures = result_tracker
@ -190,7 +190,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
);
break;
}
if nodes.len() < self.data.replication.write_quorum() {
if nodes.len() < self.data.replication.write_quorum(Default::default()) {
return Err(Error::Message(
"Not offloading as we don't have a quorum of nodes to write to."
.to_string(),

View file

@ -104,11 +104,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
bg.spawn_worker(InsertQueueWorker(self.clone()));
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
pub async fn insert(&self, c: R::ConsistencyParam, e: &F::E) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert", F::TABLE_NAME));
self.insert_internal(e)
self.insert_internal(c, e)
.bound_record_duration(&self.data.metrics.put_request_duration)
.with_context(Context::current_with_span(span))
.await?;
@ -118,7 +118,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
Ok(())
}
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
async fn insert_internal(&self, c: R::ConsistencyParam, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
let who = self.data.replication.write_sets(&hash);
@ -132,7 +132,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
who.as_ref(),
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.write_quorum()),
.with_quorum(self.data.replication.write_quorum(c)),
)
.await?;
@ -144,7 +144,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
self.data.queue_insert(tx, e)
}
pub async fn insert_many<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
pub async fn insert_many<I, IE>(
self: &Arc<Self>,
c: R::ConsistencyParam,
entries: I,
) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
IE: Borrow<F::E> + Send + Sync,
@ -152,7 +156,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert_many", F::TABLE_NAME));
self.insert_many_internal(entries)
self.insert_many_internal(c, entries)
.bound_record_duration(&self.data.metrics.put_request_duration)
.with_context(Context::current_with_span(span))
.await?;
@ -162,7 +166,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
Ok(())
}
async fn insert_many_internal<I, IE>(self: &Arc<Self>, entries: I) -> Result<(), Error>
async fn insert_many_internal<I, IE>(
self: &Arc<Self>,
c: R::ConsistencyParam,
entries: I,
) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
IE: Borrow<F::E> + Send + Sync,
@ -181,7 +189,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
// a quorum of nodes has answered OK, then the insert has succeeded and
// consistency properties (read-after-write) are preserved.
let quorum = self.data.replication.write_quorum();
let quorum = self.data.replication.write_quorum(c);
// Serialize all entries and compute the write sets for each of them.
// In the case of sharded table replication, this also takes an "ack lock"
@ -283,6 +291,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
pub async fn get(
self: &Arc<Self>,
c: R::ConsistencyParam,
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
@ -290,7 +299,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let span = tracer.start(format!("{} get", F::TABLE_NAME));
let res = self
.get_internal(partition_key, sort_key)
.get_internal(c, partition_key, sort_key)
.bound_record_duration(&self.data.metrics.get_request_duration)
.with_context(Context::current_with_span(span))
.await?;
@ -302,6 +311,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn get_internal(
self: &Arc<Self>,
c: R::ConsistencyParam,
partition_key: &F::P,
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
@ -317,7 +327,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
&who,
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.read_quorum()),
.with_quorum(self.data.replication.read_quorum(c)),
)
.await?;
@ -359,6 +369,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
pub async fn get_range(
self: &Arc<Self>,
c: R::ConsistencyParam,
partition_key: &F::P,
begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
@ -370,6 +381,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let res = self
.get_range_internal(
c,
partition_key,
begin_sort_key,
filter,
@ -387,6 +399,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn get_range_internal(
self: &Arc<Self>,
c: R::ConsistencyParam,
partition_key: &F::P,
begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
@ -412,7 +425,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
&who,
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.read_quorum()),
.with_quorum(self.data.replication.read_quorum(c)),
)
.await?;

View file

@ -28,6 +28,7 @@ use garage_api::s3::error::{
};
use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx};
use garage_model::bucket_table::BucketParams;
use garage_model::garage::Garage;
use garage_table::*;
@ -182,11 +183,20 @@ impl WebServer {
}
}
async fn check_key_exists(self: &Arc<Self>, bucket_id: Uuid, key: &str) -> Result<bool, Error> {
async fn check_key_exists(
self: &Arc<Self>,
bucket_id: Uuid,
bucket_params: &BucketParams,
key: &str,
) -> Result<bool, Error> {
let exists = self
.garage
.object_table
.get(&bucket_id, &key.to_string())
.get(
*bucket_params.consistency_mode.get(),
&bucket_id,
&key.to_string(),
)
.await?
.map(|object| object.versions().iter().any(|v| v.is_data()))
.unwrap_or(false);
@ -211,7 +221,7 @@ impl WebServer {
let bucket_id = self
.garage
.bucket_alias_table
.get(&EmptyKey, &bucket_name.to_string())
.get((), &EmptyKey, &bucket_name.to_string())
.await?
.and_then(|x| x.state.take())
.ok_or(Error::NotFound)?;
@ -246,13 +256,22 @@ impl WebServer {
.map_err(ApiError::from)
.map(|res| res.map(|_empty_body: EmptyBody| empty_body())),
Method::HEAD => {
handle_head_without_ctx(self.garage.clone(), req, bucket_id, &key, None).await
handle_head_without_ctx(
self.garage.clone(),
req,
bucket_id,
&bucket_params,
&key,
None,
)
.await
}
Method::GET => {
handle_get_without_ctx(
self.garage.clone(),
req,
bucket_id,
&bucket_params,
&key,
None,
Default::default(),
@ -265,7 +284,9 @@ impl WebServer {
// Try implicit redirect on error
let ret_doc_with_redir = match (&ret_doc, may_redirect) {
(Err(ApiError::NoSuchKey), ImplicitRedirect::To { key, url })
if self.check_key_exists(bucket_id, key.as_str()).await? =>
if self
.check_key_exists(bucket_id, &bucket_params, key.as_str())
.await? =>
{
Ok(Response::builder()
.status(StatusCode::FOUND)
@ -306,6 +327,7 @@ impl WebServer {
self.garage.clone(),
&req2,
bucket_id,
&bucket_params,
&error_document,
None,
Default::default(),