diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index ac3cba00..4d53cc16 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -10,6 +10,8 @@ use garage_util::time::*; use garage_table::*; +use garage_rpc::replication_mode::ConsistencyMode; + use garage_model::bucket_alias_table::*; use garage_model::bucket_table::*; use garage_model::garage::Garage; @@ -27,6 +29,7 @@ pub async fn handle_list_buckets(garage: &Arc) -> Result>(), + consistency_mode: *state.consistency_mode.get(), website_access: state.website_config.get().is_some(), website_config: state.website_config.get().clone().map(|wsc| { GetBucketInfoWebsiteResult { @@ -238,6 +244,7 @@ async fn bucket_info_results( struct GetBucketInfoResult { id: String, global_aliases: Vec, + consistency_mode: ConsistencyMode, website_access: bool, #[serde(default)] website_config: Option, @@ -283,7 +290,7 @@ pub async fn handle_create_bucket( ))); } - if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? { + if let Some(alias) = garage.bucket_alias_table.get((), &EmptyKey, ga).await? { if alias.state.get().is_some() { return Err(CommonError::BucketAlreadyExists.into()); } @@ -306,7 +313,7 @@ pub async fn handle_create_bucket( } let bucket = Bucket::new(); - garage.bucket_table.insert(&bucket).await?; + garage.bucket_table.insert((), &bucket).await?; if let Some(ga) = &req.global_alias { helper.set_global_bucket_alias(bucket.id, ga).await?; @@ -394,7 +401,7 @@ pub async fn handle_delete_bucket( // 4. delete bucket bucket.state = Deletable::delete(); - garage.bucket_table.insert(&bucket).await?; + garage.bucket_table.insert((), &bucket).await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -416,6 +423,10 @@ pub async fn handle_update_bucket( let state = bucket.state.as_option_mut().unwrap(); + if let Some(cm) = req.consistency_mode { + state.consistency_mode.update(cm); + } + if let Some(wa) = req.website_access { if wa.enabled { state.website_config.update(Some(WebsiteConfig { @@ -441,7 +452,7 @@ pub async fn handle_update_bucket( }); } - garage.bucket_table.insert(&bucket).await?; + garage.bucket_table.insert((), &bucket).await?; bucket_info_results(garage, bucket_id).await } @@ -449,6 +460,7 @@ pub async fn handle_update_bucket( #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct UpdateBucketRequest { + consistency_mode: Option, website_access: Option, quotas: Option, } diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 291b6d54..9f1a152c 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -17,6 +17,7 @@ pub async fn handle_list_keys(garage: &Arc) -> Result, let res = garage .key_table .get_range( + (), &EmptyKey, None, Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), @@ -68,7 +69,7 @@ pub async fn handle_create_key( let req = parse_json_body::(req).await?; let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key")); - garage.key_table.insert(&key).await?; + garage.key_table.insert((), &key).await?; key_info_results(garage, key, true).await } @@ -85,7 +86,10 @@ pub async fn handle_import_key( ) -> Result, Error> { let req = parse_json_body::(req).await?; - let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?; + let prev_key = garage + .key_table + .get((), &EmptyKey, &req.access_key_id) + .await?; if prev_key.is_some() { return Err(Error::KeyAlreadyExists(req.access_key_id.to_string())); } @@ -96,7 +100,7 @@ pub async fn handle_import_key( req.name.as_deref().unwrap_or("Imported key"), ) .ok_or_bad_request("Invalid key format")?; - garage.key_table.insert(&imported_key).await?; + garage.key_table.insert((), &imported_key).await?; key_info_results(garage, imported_key, false).await } @@ -134,7 +138,7 @@ pub async fn handle_update_key( } } - garage.key_table.insert(&key).await?; + garage.key_table.insert((), &key).await?; key_info_results(garage, key, false).await } @@ -184,7 +188,7 @@ async fn key_info_results( .filter_map(|(_, _, v)| v.as_ref()), ) { if !relevant_buckets.contains_key(id) { - if let Some(b) = garage.bucket_table.get(&EmptyKey, id).await? { + if let Some(b) = garage.bucket_table.get((), &EmptyKey, id).await? { if b.state.as_option().is_some() { relevant_buckets.insert(*id, b); } diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 02b7ae8b..b3d96dff 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -17,7 +17,10 @@ pub async fn handle_insert_batch( req: Request, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; let items = parse_json_body::, _, Error>(req).await?; @@ -35,7 +38,11 @@ pub async fn handle_insert_batch( items2.push((it.pk, it.sk, ct, v)); } - garage.k2v.rpc.insert_batch(*bucket_id, items2).await?; + garage + .k2v + .rpc + .insert_batch(*bucket_params.consistency_mode.get(), *bucket_id, items2) + .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -68,8 +75,12 @@ async fn handle_read_batch_query( query: ReadBatchQuery, ) -> Result { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = ctx; + let c = *bucket_params.consistency_mode.get(); let partition = K2VItemPartition { bucket_id: *bucket_id, @@ -92,7 +103,7 @@ async fn handle_read_batch_query( let item = garage .k2v .item_table - .get(&partition, sk) + .get(c, &partition, sk) .await? .filter(|e| K2VItemTable::matches_filter(e, &filter)); match item { @@ -109,6 +120,7 @@ async fn handle_read_batch_query( query.limit, Some(filter), EnumerationOrder::from_reverse(query.reverse), + c, ) .await?; @@ -162,8 +174,12 @@ async fn handle_delete_batch_query( query: DeleteBatchQuery, ) -> Result { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; + let c = *bucket_params.consistency_mode.get(); let partition = K2VItemPartition { bucket_id: *bucket_id, @@ -186,7 +202,7 @@ async fn handle_delete_batch_query( let item = garage .k2v .item_table - .get(&partition, sk) + .get(c, &partition, sk) .await? .filter(|e| K2VItemTable::matches_filter(e, &filter)); match item { @@ -196,6 +212,7 @@ async fn handle_delete_batch_query( .k2v .rpc .insert( + c, *bucket_id, i.partition.partition_key, i.sort_key, @@ -217,6 +234,7 @@ async fn handle_delete_batch_query( None, Some(filter), EnumerationOrder::Forward, + c, ) .await?; assert!(!more); @@ -236,7 +254,7 @@ async fn handle_delete_batch_query( .collect::>(); let n = items.len(); - garage.k2v.rpc.insert_batch(*bucket_id, items).await?; + garage.k2v.rpc.insert_batch(c, *bucket_id, items).await?; n }; @@ -257,7 +275,10 @@ pub(crate) async fn handle_poll_range( req: Request, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = ctx; use garage_model::k2v::sub::PollRange; @@ -269,6 +290,7 @@ pub(crate) async fn handle_poll_range( .k2v .rpc .poll_range( + *bucket_params.consistency_mode.get(), PollRange { partition: K2VItemPartition { bucket_id, diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index e3397238..9e804d05 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -19,7 +19,10 @@ pub async fn handle_read_index( reverse: Option, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; let reverse = reverse.unwrap_or(false); @@ -39,6 +42,7 @@ pub async fn handle_read_index( limit, Some((DeletedFilter::NotDeleted, node_id_vec)), EnumerationOrder::from_reverse(reverse), + *bucket_params.consistency_mode.get(), ) .await?; diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index af3af4e4..dfa30b33 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -101,7 +101,10 @@ pub async fn handle_read_item( sort_key: &String, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; let format = ReturnFormat::from(req)?; @@ -110,6 +113,7 @@ pub async fn handle_read_item( .k2v .item_table .get( + *bucket_params.consistency_mode.get(), &K2VItemPartition { bucket_id: *bucket_id, partition_key: partition_key.to_string(), @@ -129,7 +133,10 @@ pub async fn handle_insert_item( sort_key: &str, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; let causal_context = req .headers() @@ -149,6 +156,7 @@ pub async fn handle_insert_item( .k2v .rpc .insert( + *bucket_params.consistency_mode.get(), *bucket_id, partition_key.to_string(), sort_key.to_string(), @@ -169,7 +177,10 @@ pub async fn handle_delete_item( sort_key: &str, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; let causal_context = req .headers() @@ -185,6 +196,7 @@ pub async fn handle_delete_item( .k2v .rpc .insert( + *bucket_params.consistency_mode.get(), *bucket_id, partition_key.to_string(), sort_key.to_string(), @@ -209,7 +221,10 @@ pub async fn handle_poll_item( timeout_secs: Option, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; let format = ReturnFormat::from(req)?; @@ -222,6 +237,7 @@ pub async fn handle_poll_item( .k2v .rpc .poll_item( + *bucket_params.consistency_mode.get(), *bucket_id, partition_key, sort_key, diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs index bb9d3be5..4fd5ba04 100644 --- a/src/api/k2v/range.rs +++ b/src/api/k2v/range.rs @@ -4,6 +4,7 @@ use std::sync::Arc; +use garage_rpc::replication_mode::ConsistencyMode; use garage_table::replication::TableShardedReplication; use garage_table::*; @@ -22,6 +23,7 @@ pub(crate) async fn read_range( limit: Option, filter: Option, enumeration_order: EnumerationOrder, + c: ConsistencyMode, ) -> Result<(Vec, bool, Option), Error> where F: TableSchema + 'static, @@ -54,6 +56,7 @@ where ); let get_ret = table .get_range( + c, partition_key, start.clone(), filter.clone(), diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 6a12aa9c..1cf156b9 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -70,10 +70,10 @@ pub async fn handle_list_buckets( let mut aliases = HashMap::new(); for bucket_id in ids.iter() { - let bucket = garage.bucket_table.get(&EmptyKey, bucket_id).await?; + let bucket = garage.bucket_table.get((), &EmptyKey, bucket_id).await?; if let Some(bucket) = bucket { for (alias, _, _active) in bucket.aliases().iter().filter(|(_, _, active)| *active) { - let alias_opt = garage.bucket_alias_table.get(&EmptyKey, alias).await?; + let alias_opt = garage.bucket_alias_table.get((), &EmptyKey, alias).await?; if let Some(alias_ent) = alias_opt { if *alias_ent.state.get() == Some(*bucket_id) { aliases.insert(alias_ent.name().to_string(), *bucket_id); @@ -187,7 +187,7 @@ pub async fn handle_create_bucket( } let bucket = Bucket::new(); - garage.bucket_table.insert(&bucket).await?; + garage.bucket_table.insert((), &bucket).await?; helper .set_bucket_key_permissions(bucket.id, &api_key.key_id, BucketKeyPerm::ALL_PERMISSIONS) @@ -268,7 +268,7 @@ pub async fn handle_delete_bucket(ctx: ReqCtx) -> Result, Erro state: Deletable::delete(), }; // 3. delete bucket - garage.bucket_table.insert(&bucket).await?; + garage.bucket_table.insert((), &bucket).await?; } else if is_local_alias { // Just unalias helper diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 3c2bd483..c1160f87 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -9,6 +9,7 @@ use hyper::{Request, Response}; use serde::Serialize; use garage_net::bytes_buf::BytesBuf; +use garage_rpc::replication_mode::ConsistencyMode; use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::data::*; @@ -33,13 +34,15 @@ pub async fn handle_copy( ) -> Result, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; - let source_object = get_copy_source(&ctx, req).await?; + let (source_object, source_c) = get_copy_source(&ctx, req).await?; let ReqCtx { garage, bucket_id: dest_bucket_id, + bucket_params: dest_bucket_params, .. } = ctx; + let dest_c = *dest_bucket_params.consistency_mode.get(); let (source_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -80,13 +83,13 @@ pub async fn handle_copy( dest_key.to_string(), vec![dest_object_version], ); - garage.object_table.insert(&dest_object).await?; + garage.object_table.insert(dest_c, &dest_object).await?; } ObjectVersionData::FirstBlock(_meta, first_block_hash) => { // Get block list from source version let source_version = garage .version_table - .get(&source_version.uuid, &EmptyKey) + .get(source_c, &source_version.uuid, &EmptyKey) .await?; let source_version = source_version.ok_or(Error::NoSuchKey)?; @@ -106,7 +109,7 @@ pub async fn handle_copy( dest_key.to_string(), vec![tmp_dest_object_version], ); - garage.object_table.insert(&tmp_dest_object).await?; + garage.object_table.insert(dest_c, &tmp_dest_object).await?; // Write version in the version table. Even with empty block list, // this means that the BlockRef entries linked to this version cannot be @@ -120,7 +123,7 @@ pub async fn handle_copy( }, false, ); - garage.version_table.insert(&dest_version).await?; + garage.version_table.insert(dest_c, &dest_version).await?; // Fill in block list for version and insert block refs for (bk, bv) in source_version.blocks.items().iter() { @@ -137,8 +140,10 @@ pub async fn handle_copy( }) .collect::>(); futures::try_join!( - garage.version_table.insert(&dest_version), - garage.block_ref_table.insert_many(&dest_block_refs[..]), + garage.version_table.insert(dest_c, &dest_version), + garage + .block_ref_table + .insert_many(dest_c, &dest_block_refs[..]), )?; // Insert final object @@ -160,7 +165,7 @@ pub async fn handle_copy( dest_key.to_string(), vec![dest_object_version], ); - garage.object_table.insert(&dest_object).await?; + garage.object_table.insert(dest_c, &dest_object).await?; } } @@ -193,12 +198,17 @@ pub async fn handle_upload_part_copy( let dest_upload_id = multipart::decode_upload_id(upload_id)?; let dest_key = dest_key.to_string(); - let (source_object, (_, _, mut dest_mpu)) = futures::try_join!( + let ((source_object, source_c), (_, _, mut dest_mpu)) = futures::try_join!( get_copy_source(&ctx, req), multipart::get_upload(&ctx, &dest_key, &dest_upload_id) )?; - let ReqCtx { garage, .. } = ctx; + let ReqCtx { + garage, + bucket_params: dest_bucket_params, + .. + } = ctx; + let dest_c = *dest_bucket_params.consistency_mode.get(); let (source_object_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -244,7 +254,7 @@ pub async fn handle_upload_part_copy( // and destination version to check part hasn't yet been uploaded let source_version = garage .version_table - .get(&source_object_version.uuid, &EmptyKey) + .get(source_c, &source_object_version.uuid, &EmptyKey) .await? .ok_or(Error::NoSuchKey)?; @@ -304,7 +314,7 @@ pub async fn handle_upload_part_copy( size: None, }, ); - garage.mpu_table.insert(&dest_mpu).await?; + garage.mpu_table.insert(dest_c, &dest_mpu).await?; let mut dest_version = Version::new( dest_version_id, @@ -390,7 +400,7 @@ pub async fn handle_upload_part_copy( if must_upload { garage2 .block_manager - .rpc_put_block(final_hash, data, None) + .rpc_put_block(dest_c, final_hash, data, None) .await } else { Ok(()) @@ -398,9 +408,9 @@ pub async fn handle_upload_part_copy( }, async { // Thing 2: we need to insert the block in the version - garage.version_table.insert(&dest_version).await?; + garage.version_table.insert(dest_c, &dest_version).await?; // Thing 3: we need to add a block reference - garage.block_ref_table.insert(&block_ref).await + garage.block_ref_table.insert(dest_c, &block_ref).await }, // Thing 4: we need to prefetch the next block defragmenter.next(), @@ -422,7 +432,7 @@ pub async fn handle_upload_part_copy( size: Some(current_offset), }, ); - garage.mpu_table.insert(&dest_mpu).await?; + garage.mpu_table.insert(dest_c, &dest_mpu).await?; // LGTM let resp_xml = s3_xml::to_xml_with_header(&CopyPartResult { @@ -440,24 +450,33 @@ pub async fn handle_upload_part_copy( .body(string_body(resp_xml))?) } -async fn get_copy_source(ctx: &ReqCtx, req: &Request) -> Result { +async fn get_copy_source( + ctx: &ReqCtx, + req: &Request, +) -> Result<(Object, ConsistencyMode), Error> { let ReqCtx { garage, api_key, .. } = ctx; - let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; - let (source_bucket, source_key) = parse_bucket_key(©_source, None)?; + let (source_bucket_name, source_key) = parse_bucket_key(©_source, None)?; let source_bucket_id = garage .bucket_helper() - .resolve_bucket(&source_bucket.to_string(), api_key) + .resolve_bucket(&source_bucket_name.to_string(), api_key) .await?; + let source_bucket = garage + .bucket_helper() + .get_existing_bucket(source_bucket_id) + .await?; + let source_bucket_state = source_bucket.state.as_option().unwrap(); + let source_c = *source_bucket_state.consistency_mode.get(); + if !api_key.allow_read(&source_bucket_id) { return Err(Error::forbidden(format!( "Reading from bucket {} not allowed for this key", - source_bucket + source_bucket_name ))); } @@ -465,11 +484,11 @@ async fn get_copy_source(ctx: &ReqCtx, req: &Request) -> Result Result, Error> bucket_params.cors_config.update(None); garage .bucket_table - .insert(&Bucket::present(bucket_id, bucket_params)) + .insert((), &Bucket::present(bucket_id, bucket_params)) .await?; Ok(Response::builder() @@ -91,7 +91,7 @@ pub async fn handle_put_cors( .update(Some(conf.into_garage_cors_config()?)); garage .bucket_table - .insert(&Bucket::present(bucket_id, bucket_params)) + .insert((), &Bucket::present(bucket_id, bucket_params)) .await?; Ok(Response::builder() diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 57f6f948..8f9152cb 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -14,11 +14,16 @@ use crate::signature::verify_signed_content; async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = ctx; + let c = *bucket_params.consistency_mode.get(); + let object = garage .object_table - .get(bucket_id, &key.to_string()) + .get(c, bucket_id, &key.to_string()) .await? .ok_or(Error::NoSuchKey)?; // No need to delete @@ -49,7 +54,7 @@ async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), }], ); - garage.object_table.insert(&object).await?; + garage.object_table.insert(c, &object).await?; Ok((deleted_version, del_uuid)) } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index ed996fb1..90560e6b 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -14,11 +14,13 @@ use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; use garage_net::stream::ByteStream; +use garage_rpc::replication_mode::ConsistencyMode; use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; use garage_util::error::OkOrMessage; +use garage_model::bucket_table::BucketParams; use garage_model::garage::Garage; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; @@ -136,7 +138,15 @@ pub async fn handle_head( key: &str, part_number: Option, ) -> Result, Error> { - handle_head_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number).await + handle_head_without_ctx( + ctx.garage, + req, + ctx.bucket_id, + &ctx.bucket_params, + key, + part_number, + ) + .await } /// Handle HEAD request for website @@ -144,12 +154,15 @@ pub async fn handle_head_without_ctx( garage: Arc, req: &Request, bucket_id: Uuid, + bucket_params: &BucketParams, key: &str, part_number: Option, ) -> Result, Error> { + let c = *bucket_params.consistency_mode.get(); + let object = garage .object_table - .get(&bucket_id, &key.to_string()) + .get(c, &bucket_id, &key.to_string()) .await? .ok_or(Error::NoSuchKey)?; @@ -194,7 +207,7 @@ pub async fn handle_head_without_ctx( ObjectVersionData::FirstBlock(_, _) => { let version = garage .version_table - .get(&object_version.uuid, &EmptyKey) + .get(c, &object_version.uuid, &EmptyKey) .await? .ok_or(Error::NoSuchKey)?; @@ -234,7 +247,16 @@ pub async fn handle_get( part_number: Option, overrides: GetObjectOverrides, ) -> Result, Error> { - handle_get_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number, overrides).await + handle_get_without_ctx( + ctx.garage, + req, + ctx.bucket_id, + &ctx.bucket_params, + key, + part_number, + overrides, + ) + .await } /// Handle GET request @@ -242,13 +264,16 @@ pub async fn handle_get_without_ctx( garage: Arc, req: &Request, bucket_id: Uuid, + bucket_params: &BucketParams, key: &str, part_number: Option, overrides: GetObjectOverrides, ) -> Result, Error> { + let c = *bucket_params.consistency_mode.get(); + let object = garage .object_table - .get(&bucket_id, &key.to_string()) + .get(c, &bucket_id, &key.to_string()) .await? .ok_or(Error::NoSuchKey)?; @@ -277,10 +302,11 @@ pub async fn handle_get_without_ctx( (Some(_), Some(_)) => Err(Error::bad_request( "Cannot specify both partNumber and Range header", )), - (Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await, + (Some(pn), None) => handle_get_part(garage, c, last_v, last_v_data, last_v_meta, pn).await, (None, Some(range)) => { handle_get_range( garage, + c, last_v, last_v_data, last_v_meta, @@ -289,12 +315,15 @@ pub async fn handle_get_without_ctx( ) .await } - (None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await, + (None, None) => { + handle_get_full(garage, c, last_v, last_v_data, last_v_meta, overrides).await + } } } async fn handle_get_full( garage: Arc, + c: ConsistencyMode, version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, @@ -321,7 +350,7 @@ async fn handle_get_full( match async { let garage2 = garage.clone(); let version_fut = tokio::spawn(async move { - garage2.version_table.get(&version_uuid, &EmptyKey).await + garage2.version_table.get(c, &version_uuid, &EmptyKey).await }); let stream_block_0 = garage @@ -362,6 +391,7 @@ async fn handle_get_full( async fn handle_get_range( garage: Arc, + c: ConsistencyMode, version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, @@ -394,7 +424,7 @@ async fn handle_get_range( ObjectVersionData::FirstBlock(_meta, _first_block_hash) => { let version = garage .version_table - .get(&version.uuid, &EmptyKey) + .get(c, &version.uuid, &EmptyKey) .await? .ok_or(Error::NoSuchKey)?; @@ -406,6 +436,7 @@ async fn handle_get_range( async fn handle_get_part( garage: Arc, + c: ConsistencyMode, object_version: &ObjectVersion, version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, @@ -432,7 +463,7 @@ async fn handle_get_part( ObjectVersionData::FirstBlock(_, _) => { let version = garage .version_table - .get(&object_version.uuid, &EmptyKey) + .get(c, &object_version.uuid, &EmptyKey) .await? .ok_or(Error::NoSuchKey)?; diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 7eb1c2cb..6a0f19ff 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -44,7 +44,7 @@ pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result, E bucket_params.lifecycle_config.update(None); garage .bucket_table - .insert(&Bucket::present(bucket_id, bucket_params)) + .insert((), &Bucket::present(bucket_id, bucket_params)) .await?; Ok(Response::builder() @@ -78,7 +78,7 @@ pub async fn handle_put_lifecycle( bucket_params.lifecycle_config.update(Some(config)); garage .bucket_table - .insert(&Bucket::present(bucket_id, bucket_params)) + .insert((), &Bucket::present(bucket_id, bucket_params)) .await?; Ok(Response::builder() diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 302c03f4..70087024 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -63,11 +63,17 @@ pub async fn handle_list( ctx: ReqCtx, query: &ListObjectsQuery, ) -> Result, Error> { - let ReqCtx { garage, .. } = &ctx; + let ReqCtx { + garage, + bucket_params, + .. + } = &ctx; + let c = *bucket_params.consistency_mode.get(); let io = |bucket, key, count| { let t = &garage.object_table; async move { t.get_range( + c, &bucket, key, Some(ObjectFilter::IsData), @@ -169,12 +175,18 @@ pub async fn handle_list_multipart_upload( ctx: ReqCtx, query: &ListMultipartUploadsQuery, ) -> Result, Error> { - let ReqCtx { garage, .. } = &ctx; + let ReqCtx { + garage, + bucket_params, + .. + } = &ctx; + let c = *bucket_params.consistency_mode.get(); let io = |bucket, key, count| { let t = &garage.object_table; async move { t.get_range( + c, &bucket, key, Some(ObjectFilter::IsUploading { diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 1d5aeb26..af9f8418 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -5,6 +5,7 @@ use futures::prelude::*; use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; +use garage_rpc::replication_mode::ConsistencyMode; use garage_table::*; use garage_util::data::*; @@ -32,9 +33,12 @@ pub async fn handle_create_multipart_upload( garage, bucket_id, bucket_name, + bucket_params, .. } = &ctx; - let existing_object = garage.object_table.get(&bucket_id, &key).await?; + let c = *bucket_params.consistency_mode.get(); + + let existing_object = garage.object_table.get(c, &bucket_id, key).await?; let upload_id = gen_uuid(); let timestamp = next_timestamp(existing_object.as_ref()); @@ -51,13 +55,13 @@ pub async fn handle_create_multipart_upload( }, }; let object = Object::new(*bucket_id, key.to_string(), vec![object_version]); - garage.object_table.insert(&object).await?; + garage.object_table.insert(c, &object).await?; // Create multipart upload in mpu table // This multipart upload will hold references to uploaded parts // (which are entries in the Version table) let mpu = MultipartUpload::new(upload_id, timestamp, *bucket_id, key.into(), false); - garage.mpu_table.insert(&mpu).await?; + garage.mpu_table.insert(c, &mpu).await?; // Send success response let result = s3_xml::InitiateMultipartUploadResult { @@ -79,7 +83,12 @@ pub async fn handle_put_part( upload_id: &str, content_sha256: Option, ) -> Result, Error> { - let ReqCtx { garage, .. } = &ctx; + let ReqCtx { + garage, + bucket_params, + .. + } = &ctx; + let c = *bucket_params.consistency_mode.get(); let upload_id = decode_upload_id(upload_id)?; @@ -112,6 +121,7 @@ pub async fn handle_put_part( // before everything is finished (cleanup is done using the Drop trait). let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { garage: garage.clone(), + consistency_mode: c, upload_id, version_uuid, })); @@ -126,14 +136,14 @@ pub async fn handle_put_part( size: None, }, ); - garage.mpu_table.insert(&mpu).await?; + garage.mpu_table.insert(c, &mpu).await?; let version = Version::new( version_uuid, VersionBacklink::MultipartUpload { upload_id }, false, ); - garage.version_table.insert(&version).await?; + garage.version_table.insert(c, &version).await?; // Copy data to version let (total_size, data_md5sum, data_sha256sum, _) = @@ -157,7 +167,7 @@ pub async fn handle_put_part( size: Some(total_size), }, ); - garage.mpu_table.insert(&mpu).await?; + garage.mpu_table.insert(c, &mpu).await?; // We were not interrupted, everything went fine. // We won't have to clean up on drop. @@ -173,6 +183,7 @@ pub async fn handle_put_part( struct InterruptedCleanup(Option); struct InterruptedCleanupInner { garage: Arc, + consistency_mode: ConsistencyMode, upload_id: Uuid, version_uuid: Uuid, } @@ -193,7 +204,12 @@ impl Drop for InterruptedCleanup { }, true, ); - if let Err(e) = info.garage.version_table.insert(&version).await { + if let Err(e) = info + .garage + .version_table + .insert(info.consistency_mode, &version) + .await + { warn!("Cannot cleanup after aborted UploadPart: {}", e); } }); @@ -212,8 +228,10 @@ pub async fn handle_complete_multipart_upload( garage, bucket_id, bucket_name, + bucket_params, .. } = &ctx; + let c = *bucket_params.consistency_mode.get(); let body = http_body_util::BodyExt::collect(req.into_body()) .await? @@ -279,7 +297,7 @@ pub async fn handle_complete_multipart_upload( let grg = &garage; let parts_versions = futures::future::try_join_all(parts.iter().map(|p| async move { grg.version_table - .get(&p.version, &EmptyKey) + .get(c, &p.version, &EmptyKey) .await? .ok_or_internal_error("Part version missing from version table") })) @@ -308,14 +326,14 @@ pub async fn handle_complete_multipart_upload( ); } } - garage.version_table.insert(&final_version).await?; + garage.version_table.insert(c, &final_version).await?; let block_refs = final_version.blocks.items().iter().map(|(_, b)| BlockRef { block: b.hash, version: upload_id, deleted: false.into(), }); - garage.block_ref_table.insert_many(block_refs).await?; + garage.block_ref_table.insert_many(c, block_refs).await?; // Calculate etag of final object // To understand how etags are calculated, read more here: @@ -336,7 +354,7 @@ pub async fn handle_complete_multipart_upload( if let Err(e) = check_quotas(&ctx, total_size, Some(&object)).await { object_version.state = ObjectVersionState::Aborted; let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]); - garage.object_table.insert(&final_object).await?; + garage.object_table.insert(c, &final_object).await?; return Err(e); } @@ -352,7 +370,7 @@ pub async fn handle_complete_multipart_upload( )); let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]); - garage.object_table.insert(&final_object).await?; + garage.object_table.insert(c, &final_object).await?; // Send response saying ok we're done let result = s3_xml::CompleteMultipartUploadResult { @@ -373,8 +391,12 @@ pub async fn handle_abort_multipart_upload( upload_id: &str, ) -> Result, Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = &ctx; + let c = *bucket_params.consistency_mode.get(); let upload_id = decode_upload_id(upload_id)?; @@ -382,7 +404,7 @@ pub async fn handle_abort_multipart_upload( object_version.state = ObjectVersionState::Aborted; let final_object = Object::new(*bucket_id, key.to_string(), vec![object_version]); - garage.object_table.insert(&final_object).await?; + garage.object_table.insert(c, &final_object).await?; Ok(Response::new(empty_body())) } @@ -396,13 +418,21 @@ pub(crate) async fn get_upload( upload_id: &Uuid, ) -> Result<(Object, ObjectVersion, MultipartUpload), Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = ctx; + let c = *bucket_params.consistency_mode.get(); + let (object, mpu) = futures::try_join!( - garage.object_table.get(bucket_id, key).map_err(Error::from), + garage + .object_table + .get(c, bucket_id, key) + .map_err(Error::from), garage .mpu_table - .get(upload_id, &EmptyKey) + .get(c, upload_id, &EmptyKey) .map_err(Error::from), )?; diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index f06aa7a2..b235c1b8 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -20,6 +20,7 @@ use opentelemetry::{ }; use garage_net::bytes_buf::BytesBuf; +use garage_rpc::replication_mode::ConsistencyMode; use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::async_hash::*; @@ -71,13 +72,20 @@ pub(crate) async fn save_stream> + Unpin>( content_sha256: Option, ) -> Result<(Uuid, String), Error> { let ReqCtx { - garage, bucket_id, .. + garage, + bucket_id, + bucket_params, + .. } = ctx; + let c = *bucket_params.consistency_mode.get(); let mut chunker = StreamChunker::new(body, garage.config.block_size); let (first_block_opt, existing_object) = try_join!( chunker.next(), - garage.object_table.get(bucket_id, key).map_err(Error::from), + garage + .object_table + .get(c, bucket_id, key) + .map_err(Error::from), )?; let first_block = first_block_opt.unwrap_or_default(); @@ -120,7 +128,7 @@ pub(crate) async fn save_stream> + Unpin>( }; let object = Object::new(*bucket_id, key.into(), vec![object_version]); - garage.object_table.insert(&object).await?; + garage.object_table.insert(c, &object).await?; return Ok((version_uuid, data_md5sum_hex)); } @@ -131,6 +139,7 @@ pub(crate) async fn save_stream> + Unpin>( let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { garage: garage.clone(), bucket_id: *bucket_id, + consistency_mode: c, key: key.into(), version_uuid, version_timestamp, @@ -147,7 +156,7 @@ pub(crate) async fn save_stream> + Unpin>( }, }; let object = Object::new(*bucket_id, key.into(), vec![object_version.clone()]); - garage.object_table.insert(&object).await?; + garage.object_table.insert(c, &object).await?; // Initialize corresponding entry in version table // Write this entry now, even with empty block list, @@ -161,7 +170,7 @@ pub(crate) async fn save_stream> + Unpin>( }, false, ); - garage.version_table.insert(&version).await?; + garage.version_table.insert(c, &version).await?; // Transfer data and verify checksum let (total_size, data_md5sum, data_sha256sum, first_block_hash) = @@ -187,7 +196,7 @@ pub(crate) async fn save_stream> + Unpin>( first_block_hash, )); let object = Object::new(*bucket_id, key.into(), vec![object_version]); - garage.object_table.insert(&object).await?; + garage.object_table.insert(c, &object).await?; // We were not interrupted, everything went fine. // We won't have to clean up on drop. @@ -235,6 +244,7 @@ pub(crate) async fn check_quotas( bucket_params, .. } = ctx; + let c = *bucket_params.consistency_mode.get(); let quotas = bucket_params.quotas.get(); if quotas.max_objects.is_none() && quotas.max_size.is_none() { @@ -244,7 +254,7 @@ pub(crate) async fn check_quotas( let counters = garage .object_counter_table .table - .get(bucket_id, &EmptyKey) + .get(c, bucket_id, &EmptyKey) .await?; let counters = counters @@ -451,7 +461,12 @@ async fn put_block_and_meta( block: Bytes, order_tag: OrderTag, ) -> Result<(), GarageError> { - let ReqCtx { garage, .. } = ctx; + let ReqCtx { + garage, + bucket_params, + .. + } = ctx; + let c = *bucket_params.consistency_mode.get(); let mut version = version.clone(); version.blocks.put( @@ -474,9 +489,9 @@ async fn put_block_and_meta( futures::try_join!( garage .block_manager - .rpc_put_block(hash, block, Some(order_tag)), - garage.version_table.insert(&version), - garage.block_ref_table.insert(&block_ref), + .rpc_put_block(c, hash, block, Some(order_tag)), + garage.version_table.insert(c, &version), + garage.block_ref_table.insert(c, &block_ref), )?; Ok(()) } @@ -529,6 +544,7 @@ struct InterruptedCleanup(Option); struct InterruptedCleanupInner { garage: Arc, bucket_id: Uuid, + consistency_mode: ConsistencyMode, key: String, version_uuid: Uuid, version_timestamp: u64, @@ -549,7 +565,12 @@ impl Drop for InterruptedCleanup { state: ObjectVersionState::Aborted, }; let object = Object::new(info.bucket_id, info.key, vec![object_version]); - if let Err(e) = info.garage.object_table.insert(&object).await { + if let Err(e) = info + .garage + .object_table + .insert(info.consistency_mode, &object) + .await + { warn!("Cannot cleanup after aborted PutObject: {}", e); } }); diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index 6af55677..3f2da68a 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -49,7 +49,7 @@ pub async fn handle_delete_website(ctx: ReqCtx) -> Result, Err bucket_params.website_config.update(None); garage .bucket_table - .insert(&Bucket::present(bucket_id, bucket_params)) + .insert((), &Bucket::present(bucket_id, bucket_params)) .await?; Ok(Response::builder() @@ -83,7 +83,7 @@ pub async fn handle_put_website( .update(Some(conf.into_garage_website_config()?)); garage .bucket_table - .insert(&Bucket::present(bucket_id, bucket_params)) + .insert((), &Bucket::present(bucket_id, bucket_params)) .await?; Ok(Response::builder() diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs index d72736bb..f34ce186 100644 --- a/src/api/signature/payload.rs +++ b/src/api/signature/payload.rs @@ -370,7 +370,7 @@ pub async fn verify_v4( let key = garage .key_table - .get(&EmptyKey, &auth.key_id) + .get((), &EmptyKey, &auth.key_id) .await? .filter(|k| !k.state.is_deleted()) .ok_or_else(|| Error::forbidden(format!("No such key: {}", &auth.key_id)))?; diff --git a/src/block/manager.rs b/src/block/manager.rs index 218ef9eb..c8fc40cf 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -29,6 +29,7 @@ use garage_util::metrics::RecordDuration; use garage_util::persister::{Persister, PersisterShared}; use garage_util::time::msec_to_rfc3339; +use garage_rpc::replication_mode::ConsistencyMode; use garage_rpc::rpc_helper::OrderTag; use garage_rpc::system::System; use garage_rpc::*; @@ -343,6 +344,7 @@ impl BlockManager { /// Send block to nodes that should have it pub async fn rpc_put_block( &self, + consistency_mode: ConsistencyMode, hash: Hash, data: Bytes, order_tag: Option, @@ -367,7 +369,7 @@ impl BlockManager { who.as_ref(), put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) - .with_quorum(self.system.layout_manager.write_quorum()), + .with_quorum(self.system.layout_manager.write_quorum(consistency_mode)), ) .await?; diff --git a/src/block/resync.rs b/src/block/resync.rs index 180e7bcf..62c3ed21 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -380,7 +380,12 @@ impl BlockResyncManager { .layout_manager .layout() .storage_nodes_of(hash); - if who.len() < manager.system.layout_manager.write_quorum() { + if who.len() + < manager + .system + .layout_manager + .write_quorum(Default::default()) + { return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string())); } who.retain(|id| *id != manager.system.id); diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index edeb88c0..7369ff85 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -30,7 +30,14 @@ impl AdminRpcHandler { let block_refs = self .garage .block_ref_table - .get_range(&hash, None, None, 10000, Default::default()) + .get_range( + Default::default(), + &hash, + None, + None, + 10000, + Default::default(), + ) .await?; let mut versions = vec![]; let mut uploads = vec![]; @@ -38,11 +45,16 @@ impl AdminRpcHandler { if let Some(v) = self .garage .version_table - .get(&br.version, &EmptyKey) + .get(Default::default(), &br.version, &EmptyKey) .await? { if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink { - if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { + if let Some(u) = self + .garage + .mpu_table + .get(Default::default(), upload_id, &EmptyKey) + .await? + { uploads.push(u); } } @@ -108,14 +120,21 @@ impl AdminRpcHandler { let block_refs = self .garage .block_ref_table - .get_range(&hash, None, None, 10000, Default::default()) + .get_range( + Default::default(), + &hash, + None, + None, + 10000, + Default::default(), + ) .await?; for br in block_refs { if let Some(version) = self .garage .version_table - .get(&br.version, &EmptyKey) + .get(Default::default(), &br.version, &EmptyKey) .await? { self.handle_block_purge_version_backlink( @@ -127,7 +146,10 @@ impl AdminRpcHandler { if !version.deleted.get() { let deleted_version = Version::new(version.uuid, version.backlink, true); - self.garage.version_table.insert(&deleted_version).await?; + self.garage + .version_table + .insert(Default::default(), &deleted_version) + .await?; ver_dels += 1; } } @@ -152,11 +174,19 @@ impl AdminRpcHandler { let (bucket_id, key, ov_id) = match &version.backlink { VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), VersionBacklink::MultipartUpload { upload_id } => { - if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { + if let Some(mut mpu) = self + .garage + .mpu_table + .get(Default::default(), upload_id, &EmptyKey) + .await? + { if !mpu.deleted.get() { mpu.parts.clear(); mpu.deleted.set(); - self.garage.mpu_table.insert(&mpu).await?; + self.garage + .mpu_table + .insert(Default::default(), &mpu) + .await?; *mpu_dels += 1; } (mpu.bucket_id, mpu.key.clone(), *upload_id) @@ -166,7 +196,12 @@ impl AdminRpcHandler { } }; - if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? { + if let Some(object) = self + .garage + .object_table + .get(Default::default(), &bucket_id, &key) + .await? + { let ov = object.versions().iter().rev().find(|v| v.is_complete()); if let Some(ov) = ov { if ov.uuid == ov_id { @@ -180,7 +215,10 @@ impl AdminRpcHandler { state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), }], ); - self.garage.object_table.insert(&deleted_object).await?; + self.garage + .object_table + .insert(Default::default(), &deleted_object) + .await?; *obj_dels += 1; } } diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs index 6b1190f8..eb9fde77 100644 --- a/src/garage/admin/bucket.rs +++ b/src/garage/admin/bucket.rs @@ -39,6 +39,7 @@ impl AdminRpcHandler { .garage .bucket_table .get_range( + (), &EmptyKey, None, Some(DeletedFilter::NotDeleted), @@ -63,12 +64,14 @@ impl AdminRpcHandler { .bucket_helper() .get_existing_bucket(bucket_id) .await?; + let bucket_params = bucket.state.as_option().unwrap(); + let c = *bucket_params.consistency_mode.get(); let counters = self .garage .object_counter_table .table - .get(&bucket_id, &EmptyKey) + .get(c, &bucket_id, &EmptyKey) .await? .map(|x| x.filtered_values(&self.garage.system.cluster_layout())) .unwrap_or_default(); @@ -77,42 +80,28 @@ impl AdminRpcHandler { .garage .mpu_counter_table .table - .get(&bucket_id, &EmptyKey) + .get(c, &bucket_id, &EmptyKey) .await? .map(|x| x.filtered_values(&self.garage.system.cluster_layout())) .unwrap_or_default(); let mut relevant_keys = HashMap::new(); - for (k, _) in bucket - .state - .as_option() - .unwrap() - .authorized_keys - .items() - .iter() - { + for (k, _) in bucket_params.authorized_keys.items().iter() { if let Some(key) = self .garage .key_table - .get(&EmptyKey, k) + .get((), &EmptyKey, k) .await? .filter(|k| !k.is_deleted()) { relevant_keys.insert(k.clone(), key); } } - for ((k, _), _, _) in bucket - .state - .as_option() - .unwrap() - .local_aliases - .items() - .iter() - { + for ((k, _), _, _) in bucket_params.local_aliases.items().iter() { if relevant_keys.contains_key(k) { continue; } - if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? { + if let Some(key) = self.garage.key_table.get((), &EmptyKey, k).await? { relevant_keys.insert(k.clone(), key); } } @@ -136,7 +125,12 @@ impl AdminRpcHandler { let helper = self.garage.locked_helper().await; - if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? { + if let Some(alias) = self + .garage + .bucket_alias_table + .get((), &EmptyKey, name) + .await? + { if alias.state.get().is_some() { return Err(Error::BadRequest(format!("Bucket {} already exists", name))); } @@ -145,7 +139,7 @@ impl AdminRpcHandler { // ---- done checking, now commit ---- let bucket = Bucket::new(); - self.garage.bucket_table.insert(&bucket).await?; + self.garage.bucket_table.insert((), &bucket).await?; helper.set_global_bucket_alias(bucket.id, name).await?; @@ -170,7 +164,7 @@ impl AdminRpcHandler { let bucket_alias = self .garage .bucket_alias_table - .get(&EmptyKey, &query.name) + .get((), &EmptyKey, &query.name) .await?; // Check bucket doesn't have other aliases @@ -225,7 +219,7 @@ impl AdminRpcHandler { // 3. delete bucket bucket.state = Deletable::delete(); - self.garage.bucket_table.insert(&bucket).await?; + self.garage.bucket_table.insert((), &bucket).await?; Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name))) } @@ -405,7 +399,7 @@ impl AdminRpcHandler { }; bucket_state.website_config.update(website); - self.garage.bucket_table.insert(&bucket).await?; + self.garage.bucket_table.insert((), &bucket).await?; let msg = if query.allow { format!("Website access allowed for {}", &query.bucket) @@ -462,7 +456,7 @@ impl AdminRpcHandler { } bucket_state.quotas.update(quotas); - self.garage.bucket_table.insert(&bucket).await?; + self.garage.bucket_table.insert((), &bucket).await?; Ok(AdminRpc::Ok(format!( "Quotas updated for {}", diff --git a/src/garage/admin/key.rs b/src/garage/admin/key.rs index bd010d2c..66c9f8ed 100644 --- a/src/garage/admin/key.rs +++ b/src/garage/admin/key.rs @@ -28,6 +28,7 @@ impl AdminRpcHandler { .garage .key_table .get_range( + (), &EmptyKey, None, Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), @@ -57,7 +58,7 @@ impl AdminRpcHandler { async fn handle_create_key(&self, query: &KeyNewOpt) -> Result { let key = Key::new(&query.name); - self.garage.key_table.insert(&key).await?; + self.garage.key_table.insert((), &key).await?; self.key_info_result(key).await } @@ -71,7 +72,7 @@ impl AdminRpcHandler { .unwrap() .name .update(query.new_name.clone()); - self.garage.key_table.insert(&key).await?; + self.garage.key_table.insert((), &key).await?; self.key_info_result(key).await } @@ -106,7 +107,7 @@ impl AdminRpcHandler { if query.create_bucket { key.params_mut().unwrap().allow_create_bucket.update(true); } - self.garage.key_table.insert(&key).await?; + self.garage.key_table.insert((), &key).await?; self.key_info_result(key).await } @@ -119,7 +120,7 @@ impl AdminRpcHandler { if query.create_bucket { key.params_mut().unwrap().allow_create_bucket.update(false); } - self.garage.key_table.insert(&key).await?; + self.garage.key_table.insert((), &key).await?; self.key_info_result(key).await } @@ -128,14 +129,18 @@ impl AdminRpcHandler { return Err(Error::BadRequest("This command is intended to re-import keys that were previously generated by Garage. If you want to create a new key, use `garage key new` instead. Add the --yes flag if you really want to re-import a key.".to_string())); } - let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; + let prev_key = self + .garage + .key_table + .get((), &EmptyKey, &query.key_id) + .await?; if prev_key.is_some() { return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); } let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name) .ok_or_bad_request("Invalid key format")?; - self.garage.key_table.insert(&imported_key).await?; + self.garage.key_table.insert((), &imported_key).await?; self.key_info_result(imported_key).await } @@ -151,7 +156,7 @@ impl AdminRpcHandler { .items() .iter() { - if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? { + if let Some(b) = self.garage.bucket_table.get((), &EmptyKey, id).await? { relevant_buckets.insert(*id, b); } } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 9e4de873..84d1038e 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -176,7 +176,7 @@ impl TableRepair for RepairVersions { let ref_exists = match &version.backlink { VersionBacklink::Object { bucket_id, key } => garage .object_table - .get(bucket_id, key) + .get(Default::default(), bucket_id, key) .await? .map(|o| { o.versions().iter().any(|x| { @@ -186,7 +186,7 @@ impl TableRepair for RepairVersions { .unwrap_or(false), VersionBacklink::MultipartUpload { upload_id } => garage .mpu_table - .get(upload_id, &EmptyKey) + .get(Default::default(), upload_id, &EmptyKey) .await? .map(|u| !u.deleted.get()) .unwrap_or(false), @@ -196,7 +196,10 @@ impl TableRepair for RepairVersions { info!("Repair versions: marking version as deleted: {:?}", version); garage .version_table - .insert(&Version::new(version.uuid, version.backlink, true)) + .insert( + Default::default(), + &Version::new(version.uuid, version.backlink, true), + ) .await?; return Ok(true); } @@ -222,7 +225,7 @@ impl TableRepair for RepairBlockRefs { if !block_ref.deleted.get() { let ref_exists = garage .version_table - .get(&block_ref.version, &EmptyKey) + .get(Default::default(), &block_ref.version, &EmptyKey) .await? .map(|v| !v.deleted.get()) .unwrap_or(false); @@ -233,7 +236,10 @@ impl TableRepair for RepairBlockRefs { block_ref ); block_ref.deleted.set(); - garage.block_ref_table.insert(&block_ref).await?; + garage + .block_ref_table + .insert(Default::default(), &block_ref) + .await?; return Ok(true); } } @@ -258,7 +264,7 @@ impl TableRepair for RepairMpu { if !mpu.deleted.get() { let ref_exists = garage .object_table - .get(&mpu.bucket_id, &mpu.key) + .get(Default::default(), &mpu.bucket_id, &mpu.key) .await? .map(|o| { o.versions() @@ -274,7 +280,7 @@ impl TableRepair for RepairMpu { ); mpu.parts.clear(); mpu.deleted.set(); - garage.mpu_table.insert(&mpu).await?; + garage.mpu_table.insert(Default::default(), &mpu).await?; return Ok(true); } } diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 1dbdfac2..d964b9fd 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,3 +1,4 @@ +use garage_rpc::replication_mode::ConsistencyMode; use garage_table::crdt::*; use garage_table::*; use garage_util::data::*; @@ -119,7 +120,94 @@ mod v08 { impl garage_util::migrate::InitialFormat for Bucket {} } -pub use v08::*; +mod v011 { + use super::v08; + pub use super::v08::{ + BucketQuotas, CorsRule, LifecycleExpiration, LifecycleFilter, LifecycleRule, WebsiteConfig, + }; + use crate::permission::BucketKeyPerm; + use garage_rpc::replication_mode::ConsistencyMode; + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + /// A bucket is a collection of objects + /// + /// Its parameters are not directly accessible as: + /// - It must be possible to merge paramaters, hence the use of a LWW CRDT. + /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Bucket { + /// ID of the bucket + pub id: Uuid, + /// State, and configuration if not deleted, of the bucket + pub state: crdt::Deletable, + } + + /// Configuration for a bucket + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct BucketParams { + /// Bucket's creation date + pub creation_date: u64, + /// Map of key with access to the bucket, and what kind of access they give + pub authorized_keys: crdt::Map, + + /// Map of aliases that are or have been given to this bucket + /// in the global namespace + /// (not authoritative: this is just used as an indication to + /// map back to aliases when doing ListBuckets) + pub aliases: crdt::LwwMap, + /// Map of aliases that are or have been given to this bucket + /// in namespaces local to keys + /// key = (access key id, alias name) + pub local_aliases: crdt::LwwMap<(String, String), bool>, + + /// Whether to enable read-after-write consistency for this bucket + pub consistency_mode: crdt::Lww, + /// Whether this bucket is allowed for website access + /// (under all of its global alias names), + /// and if so, the website configuration XML document + pub website_config: crdt::Lww>, + /// CORS rules + pub cors_config: crdt::Lww>>, + /// Lifecycle configuration + #[serde(default)] + pub lifecycle_config: crdt::Lww>>, + /// Bucket quotas + #[serde(default)] + pub quotas: crdt::Lww, + } + + impl garage_util::migrate::Migrate for Bucket { + const VERSION_MARKER: &'static [u8] = b"G011lh"; + + type Previous = v08::Bucket; + + fn migrate(previous: Self::Previous) -> Self { + Self { + id: previous.id, + state: match previous.state { + crdt::Deletable::Present(prev_state) => { + crdt::Deletable::Present(BucketParams { + creation_date: prev_state.creation_date, + authorized_keys: prev_state.authorized_keys, + aliases: prev_state.aliases, + local_aliases: prev_state.local_aliases, + website_config: prev_state.website_config, + cors_config: prev_state.cors_config, + lifecycle_config: prev_state.lifecycle_config, + quotas: prev_state.quotas, + consistency_mode: crdt::Lww::new(ConsistencyMode::Consistent), + }) + } + crdt::Deletable::Deleted => crdt::Deletable::Deleted, + }, + } + } + } +} + +pub use v011::*; impl AutoCrdt for BucketQuotas { const WARN_IF_DIFFERENT: bool = true; @@ -133,6 +221,7 @@ impl BucketParams { authorized_keys: crdt::Map::new(), aliases: crdt::LwwMap::new(), local_aliases: crdt::LwwMap::new(), + consistency_mode: crdt::Lww::new(ConsistencyMode::Consistent), website_config: crdt::Lww::new(None), cors_config: crdt::Lww::new(None), lifecycle_config: crdt::Lww::new(None), diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index a712d683..aec60de9 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -36,7 +36,7 @@ impl<'a> BucketHelper<'a> { Ok(self .0 .bucket_table - .get(&EmptyKey, &bucket_id) + .get((), &EmptyKey, &bucket_id) .await? .filter(|x| !x.state.is_deleted()) .map(|_| bucket_id)) @@ -44,7 +44,7 @@ impl<'a> BucketHelper<'a> { Ok(self .0 .bucket_alias_table - .get(&EmptyKey, bucket_name) + .get((), &EmptyKey, bucket_name) .await? .and_then(|x| *x.state.get())) } @@ -74,7 +74,7 @@ impl<'a> BucketHelper<'a> { Ok(self .0 .bucket_table - .get(&EmptyKey, &bucket_id) + .get((), &EmptyKey, &bucket_id) .await? .ok_or_message(format!("Bucket {:?} does not exist", bucket_id))?) } @@ -86,7 +86,7 @@ impl<'a> BucketHelper<'a> { pub async fn get_existing_bucket(&self, bucket_id: Uuid) -> Result { self.0 .bucket_table - .get(&EmptyKey, &bucket_id) + .get((), &EmptyKey, &bucket_id) .await? .filter(|b| !b.is_deleted()) .ok_or_else(|| Error::NoSuchBucket(hex::encode(bucket_id))) @@ -95,10 +95,20 @@ impl<'a> BucketHelper<'a> { // ---- pub async fn is_bucket_empty(&self, bucket_id: Uuid) -> Result { + let consistency_mode = *self + .get_existing_bucket(bucket_id) + .await? + .state + .as_option() + .unwrap() + .consistency_mode + .get(); + let objects = self .0 .object_table .get_range( + consistency_mode, &bucket_id, None, Some(ObjectFilter::IsData), @@ -124,6 +134,7 @@ impl<'a> BucketHelper<'a> { .counter_table .table .get_range( + consistency_mode, &bucket_id, None, Some((DeletedFilter::NotDeleted, node_id_vec)), @@ -151,6 +162,12 @@ impl<'a> BucketHelper<'a> { older_than: Duration, ) -> Result { let older_than = now_msec() - older_than.as_millis() as u64; + let consistency_mode = self + .get_existing_bucket(*bucket_id) + .await? + .params() + .map(|params| *params.consistency_mode.get()) + .unwrap_or_default(); let mut ret = 0usize; let mut start = None; @@ -160,6 +177,7 @@ impl<'a> BucketHelper<'a> { .0 .object_table .get_range( + consistency_mode, bucket_id, start, Some(ObjectFilter::IsUploading { @@ -196,7 +214,10 @@ impl<'a> BucketHelper<'a> { .collect::>(); ret += abortions.len(); - self.0.object_table.insert_many(abortions).await?; + self.0 + .object_table + .insert_many(consistency_mode, abortions) + .await?; if objects.len() < 1000 { break; diff --git a/src/model/helper/key.rs b/src/model/helper/key.rs index b8a99d55..203b2141 100644 --- a/src/model/helper/key.rs +++ b/src/model/helper/key.rs @@ -16,7 +16,7 @@ impl<'a> KeyHelper<'a> { Ok(self .0 .key_table - .get(&EmptyKey, key_id) + .get((), &EmptyKey, key_id) .await? .ok_or_message(format!("Key {} does not exist", key_id))?) } @@ -28,7 +28,7 @@ impl<'a> KeyHelper<'a> { pub async fn get_existing_key(&self, key_id: &String) -> Result { self.0 .key_table - .get(&EmptyKey, key_id) + .get((), &EmptyKey, key_id) .await? .filter(|b| !b.state.is_deleted()) .ok_or_else(|| Error::NoSuchAccessKey(key_id.to_string())) @@ -44,6 +44,7 @@ impl<'a> KeyHelper<'a> { .0 .key_table .get_range( + (), &EmptyKey, None, Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())), diff --git a/src/model/helper/locked.rs b/src/model/helper/locked.rs index f8e06add..d1e8adc9 100644 --- a/src/model/helper/locked.rs +++ b/src/model/helper/locked.rs @@ -56,7 +56,11 @@ impl<'a> LockedHelper<'a> { let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?; - let alias = self.0.bucket_alias_table.get(&EmptyKey, alias_name).await?; + let alias = self + .0 + .bucket_alias_table + .get((), &EmptyKey, alias_name) + .await?; if let Some(existing_alias) = alias.as_ref() { if let Some(p_bucket) = existing_alias.state.get() { @@ -88,10 +92,10 @@ impl<'a> LockedHelper<'a> { a } }; - self.0.bucket_alias_table.insert(&alias).await?; + self.0.bucket_alias_table.insert((), &alias).await?; bucket_p.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, true); - self.0.bucket_table.insert(&bucket).await?; + self.0.bucket_table.insert((), &bucket).await?; Ok(()) } @@ -112,7 +116,7 @@ impl<'a> LockedHelper<'a> { let mut alias = self .0 .bucket_alias_table - .get(&EmptyKey, alias_name) + .get((), &EmptyKey, alias_name) .await? .filter(|a| a.state.get().map(|x| x == bucket_id).unwrap_or(false)) .ok_or_message(format!( @@ -144,10 +148,10 @@ impl<'a> LockedHelper<'a> { // writes are now done and all writes use timestamp alias_ts alias.state = Lww::raw(alias_ts, None); - self.0.bucket_alias_table.insert(&alias).await?; + self.0.bucket_alias_table.insert((), &alias).await?; bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false); - self.0.bucket_table.insert(&bucket).await?; + self.0.bucket_table.insert((), &bucket).await?; Ok(()) } @@ -168,7 +172,7 @@ impl<'a> LockedHelper<'a> { let mut alias = self .0 .bucket_alias_table - .get(&EmptyKey, alias_name) + .get((), &EmptyKey, alias_name) .await? .ok_or_else(|| Error::NoSuchBucket(alias_name.to_string()))?; @@ -186,12 +190,12 @@ impl<'a> LockedHelper<'a> { if alias.state.get() == &Some(bucket_id) { alias.state = Lww::raw(alias_ts, None); - self.0.bucket_alias_table.insert(&alias).await?; + self.0.bucket_alias_table.insert((), &alias).await?; } if let Some(bucket_state) = bucket.state.as_option_mut() { bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false); - self.0.bucket_table.insert(&bucket).await?; + self.0.bucket_table.insert((), &bucket).await?; } Ok(()) @@ -245,10 +249,10 @@ impl<'a> LockedHelper<'a> { // writes are now done and all writes use timestamp alias_ts key_param.local_aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, Some(bucket_id)); - self.0.key_table.insert(&key).await?; + self.0.key_table.insert((), &key).await?; bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true); - self.0.bucket_table.insert(&bucket).await?; + self.0.bucket_table.insert((), &bucket).await?; Ok(()) } @@ -317,10 +321,10 @@ impl<'a> LockedHelper<'a> { // writes are now done and all writes use timestamp alias_ts key_param.local_aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, None); - self.0.key_table.insert(&key).await?; + self.0.key_table.insert((), &key).await?; bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false); - self.0.bucket_table.insert(&bucket).await?; + self.0.bucket_table.insert((), &bucket).await?; Ok(()) } @@ -365,12 +369,12 @@ impl<'a> LockedHelper<'a> { if let Some(bstate) = bucket.state.as_option_mut() { bstate.authorized_keys = Map::put_mutator(key_id.clone(), perm); - self.0.bucket_table.insert(&bucket).await?; + self.0.bucket_table.insert((), &bucket).await?; } if let Some(kstate) = key.state.as_option_mut() { kstate.authorized_buckets = Map::put_mutator(bucket_id, perm); - self.0.key_table.insert(&key).await?; + self.0.key_table.insert((), &key).await?; } Ok(()) @@ -403,7 +407,7 @@ impl<'a> LockedHelper<'a> { // 3. Actually delete key key.state = Deletable::delete(); - self.0.key_table.insert(key).await?; + self.0.key_table.insert((), key).await?; Ok(()) } diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index e15f2df8..140d52a6 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -23,6 +23,7 @@ use garage_util::data::*; use garage_util::error::*; use garage_util::time::now_msec; +use garage_rpc::replication_mode::ConsistencyMode; use garage_rpc::system::System; use garage_rpc::*; @@ -43,8 +44,8 @@ const TIMESTAMP_KEY: &[u8] = b"timestamp"; #[derive(Debug, Serialize, Deserialize)] enum K2VRpc { Ok, - InsertItem(InsertedItem), - InsertManyItems(Vec), + InsertItem(ConsistencyMode, InsertedItem), + InsertManyItems(ConsistencyMode, Vec), PollItem { key: PollKey, causal_context: CausalContext, @@ -113,6 +114,7 @@ impl K2VRpcHandler { pub async fn insert( &self, + consistency_mode: ConsistencyMode, bucket_id: Uuid, partition_key: String, sort_key: String, @@ -135,12 +137,15 @@ impl K2VRpcHandler { .try_call_many( &self.endpoint, &who, - K2VRpc::InsertItem(InsertedItem { - partition, - sort_key, - causal_context, - value, - }), + K2VRpc::InsertItem( + consistency_mode, + InsertedItem { + partition, + sort_key, + causal_context, + value, + }, + ), RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1), ) .await?; @@ -150,6 +155,7 @@ impl K2VRpcHandler { pub async fn insert_batch( &self, + consistency_mode: ConsistencyMode, bucket_id: Uuid, items: Vec<(String, String, Option, DvvsValue)>, ) -> Result<(), Error> { @@ -189,7 +195,7 @@ impl K2VRpcHandler { .try_call_many( &self.endpoint, &nodes[..], - K2VRpc::InsertManyItems(items), + K2VRpc::InsertManyItems(consistency_mode, items), RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1), ) .await?; @@ -206,6 +212,7 @@ impl K2VRpcHandler { pub async fn poll_item( &self, + consistency_mode: ConsistencyMode, bucket_id: Uuid, partition_key: String, sort_key: String, @@ -235,7 +242,12 @@ impl K2VRpcHandler { timeout_msec, }, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.item_table.data.replication.read_quorum()) + .with_quorum( + self.item_table + .data + .replication + .read_quorum(consistency_mode), + ) .send_all_at_once(true) .without_timeout(), ); @@ -266,6 +278,7 @@ impl K2VRpcHandler { pub async fn poll_range( &self, + consistency_mode: ConsistencyMode, range: PollRange, seen_str: Option, timeout_msec: u64, @@ -288,7 +301,11 @@ impl K2VRpcHandler { .data .replication .read_nodes(&range.partition.hash()); - let quorum = self.item_table.data.replication.read_quorum(); + let quorum = self + .item_table + .data + .replication + .read_quorum(consistency_mode); let msg = K2VRpc::PollRange { range, seen_str, @@ -376,7 +393,11 @@ impl K2VRpcHandler { // ---- internal handlers ---- - async fn handle_insert(&self, item: &InsertedItem) -> Result { + async fn handle_insert( + &self, + c: ConsistencyMode, + item: &InsertedItem, + ) -> Result { let new = { let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); self.local_insert(&local_timestamp_tree, item)? @@ -384,13 +405,17 @@ impl K2VRpcHandler { // Propagate to rest of network if let Some(updated) = new { - self.item_table.insert(&updated).await?; + self.item_table.insert(c, &updated).await?; } Ok(K2VRpc::Ok) } - async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result { + async fn handle_insert_many( + &self, + c: ConsistencyMode, + items: &[InsertedItem], + ) -> Result { let mut updated_vec = vec![]; { @@ -406,7 +431,7 @@ impl K2VRpcHandler { // Propagate to rest of network if !updated_vec.is_empty() { - self.item_table.insert_many(&updated_vec).await?; + self.item_table.insert_many(c, &updated_vec).await?; } Ok(K2VRpc::Ok) @@ -546,8 +571,8 @@ impl K2VRpcHandler { impl EndpointHandler for K2VRpcHandler { async fn handle(self: &Arc, message: &K2VRpc, _from: NodeID) -> Result { match message { - K2VRpc::InsertItem(item) => self.handle_insert(item).await, - K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await, + K2VRpc::InsertItem(c, item) => self.handle_insert(*c, item).await, + K2VRpc::InsertManyItems(c, items) => self.handle_insert_many(*c, &items[..]).await, K2VRpc::PollItem { key, causal_context, diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 8528382a..b03cf2fb 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use garage_rpc::replication_mode::ConsistencyMode; use garage_util::crdt::*; use garage_util::data::*; use garage_util::encode::nonversioned_decode; @@ -71,19 +72,23 @@ impl Migrate { self.garage .bucket_table - .insert(&Bucket { - id: bucket_id, - state: Deletable::Present(BucketParams { - creation_date: now_msec(), - authorized_keys: Map::new(), - aliases: LwwMap::new(), - local_aliases: LwwMap::new(), - website_config: Lww::new(website), - cors_config: Lww::new(None), - lifecycle_config: Lww::new(None), - quotas: Lww::new(Default::default()), - }), - }) + .insert( + (), + &Bucket { + id: bucket_id, + state: Deletable::Present(BucketParams { + creation_date: now_msec(), + authorized_keys: Map::new(), + aliases: LwwMap::new(), + local_aliases: LwwMap::new(), + consistency_mode: Lww::new(ConsistencyMode::Consistent), + website_config: Lww::new(website), + cors_config: Lww::new(None), + lifecycle_config: Lww::new(None), + quotas: Lww::new(Default::default()), + }), + }, + ) .await?; helper.set_global_bucket_alias(bucket_id, &new_name).await?; diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 50d4283f..9feb484d 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -253,7 +253,7 @@ async fn process_object( _ => { match garage .bucket_table - .get(&EmptyKey, &object.bucket_id) + .get((), &EmptyKey, &object.bucket_id) .await? { Some(b) => b, diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 846eea47..ef05415a 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -139,12 +139,14 @@ impl LayoutManager { } } - pub fn read_quorum(self: &Arc) -> usize { - self.replication_factor.read_quorum(self.consistency_mode) + pub fn read_quorum(self: &Arc, bucket_consistency_mode: ConsistencyMode) -> usize { + self.replication_factor + .read_quorum(bucket_consistency_mode.min(self.consistency_mode)) } - pub fn write_quorum(self: &Arc) -> usize { - self.replication_factor.write_quorum(self.consistency_mode) + pub fn write_quorum(self: &Arc, bucket_consistency_mode: ConsistencyMode) -> usize { + self.replication_factor + .write_quorum(bucket_consistency_mode.min(self.consistency_mode)) } // ---- ACK LOCKING ---- diff --git a/src/table/queue.rs b/src/table/queue.rs index ffe0a4a7..39f0a75f 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -51,7 +51,7 @@ impl Worker for InsertQueueWorker { return Ok(WorkerState::Idle); } - self.0.insert_many(values).await?; + self.0.insert_many(Default::default(), values).await?; self.0.data.insert_queue.db().transaction(|tx| { for (k, v) in kv_pairs.iter() { diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 1e52bb47..4dd76f10 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -25,6 +25,7 @@ pub struct TableFullReplication { impl TableReplication for TableFullReplication { type WriteSets = Vec>; + type ConsistencyParam = (); fn storage_nodes(&self, _hash: &Hash) -> Vec { let layout = self.system.cluster_layout(); @@ -34,14 +35,14 @@ impl TableReplication for TableFullReplication { fn read_nodes(&self, _hash: &Hash) -> Vec { vec![self.system.id] } - fn read_quorum(&self) -> usize { + fn read_quorum(&self, _: ()) -> usize { 1 } fn write_sets(&self, hash: &Hash) -> Self::WriteSets { vec![self.storage_nodes(hash)] } - fn write_quorum(&self) -> usize { + fn write_quorum(&self, _: ()) -> usize { let nmembers = self.system.cluster_layout().current().all_nodes().len(); let max_faults = if nmembers > 1 { 1 } else { 0 }; diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 682c1ea6..6edfd352 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -4,6 +4,7 @@ use garage_util::data::*; /// Trait to describe how a table shall be replicated pub trait TableReplication: Send + Sync + 'static { type WriteSets: AsRef>> + AsMut>> + Send + Sync + 'static; + type ConsistencyParam: Send + Default; // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods @@ -14,12 +15,12 @@ pub trait TableReplication: Send + Sync + 'static { /// Which nodes to send read requests to fn read_nodes(&self, hash: &Hash) -> Vec; /// Responses needed to consider a read succesfull - fn read_quorum(&self) -> usize; + fn read_quorum(&self, consistency_param: Self::ConsistencyParam) -> usize; /// Which nodes to send writes to fn write_sets(&self, hash: &Hash) -> Self::WriteSets; /// Responses needed to consider a write succesfull in each set - fn write_quorum(&self) -> usize; + fn write_quorum(&self, consistency_param: Self::ConsistencyParam) -> usize; // Accessing partitions, for Merkle tree & sync /// Get partition for data with given hash diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index fa5e48d7..40a3ccfa 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use garage_rpc::layout::manager::LayoutManager; use garage_rpc::layout::*; +use garage_rpc::replication_mode::*; use garage_util::data::*; use crate::replication::*; @@ -20,6 +21,7 @@ pub struct TableShardedReplication { impl TableReplication for TableShardedReplication { type WriteSets = WriteLock>>; + type ConsistencyParam = ConsistencyMode; fn storage_nodes(&self, hash: &Hash) -> Vec { self.layout_manager.layout().storage_nodes_of(hash) @@ -28,15 +30,15 @@ impl TableReplication for TableShardedReplication { fn read_nodes(&self, hash: &Hash) -> Vec { self.layout_manager.layout().read_nodes_of(hash) } - fn read_quorum(&self) -> usize { - self.layout_manager.read_quorum() + fn read_quorum(&self, c: ConsistencyMode) -> usize { + self.layout_manager.read_quorum(c) } fn write_sets(&self, hash: &Hash) -> Self::WriteSets { self.layout_manager.write_sets_of(hash) } - fn write_quorum(&self) -> usize { - self.layout_manager.write_quorum() + fn write_quorum(&self, c: ConsistencyMode) -> usize { + self.layout_manager.write_quorum(c) } fn partition_of(&self, hash: &Hash) -> Partition { diff --git a/src/table/sync.rs b/src/table/sync.rs index cd080df0..ab96c493 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -118,7 +118,7 @@ impl TableSyncer { ); let mut result_tracker = QuorumSetResultTracker::new( &partition.storage_sets, - self.data.replication.write_quorum(), + self.data.replication.write_quorum(Default::default()), ); let mut sync_futures = result_tracker @@ -190,7 +190,7 @@ impl TableSyncer { ); break; } - if nodes.len() < self.data.replication.write_quorum() { + if nodes.len() < self.data.replication.write_quorum(Default::default()) { return Err(Error::Message( "Not offloading as we don't have a quorum of nodes to write to." .to_string(), diff --git a/src/table/table.rs b/src/table/table.rs index a5be2910..8e5a6bb4 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -104,11 +104,11 @@ impl Table { bg.spawn_worker(InsertQueueWorker(self.clone())); } - pub async fn insert(&self, e: &F::E) -> Result<(), Error> { + pub async fn insert(&self, c: R::ConsistencyParam, e: &F::E) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} insert", F::TABLE_NAME)); - self.insert_internal(e) + self.insert_internal(c, e) .bound_record_duration(&self.data.metrics.put_request_duration) .with_context(Context::current_with_span(span)) .await?; @@ -118,7 +118,7 @@ impl Table { Ok(()) } - async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { + async fn insert_internal(&self, c: R::ConsistencyParam, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); let who = self.data.replication.write_sets(&hash); @@ -132,7 +132,7 @@ impl Table { who.as_ref(), rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.write_quorum()), + .with_quorum(self.data.replication.write_quorum(c)), ) .await?; @@ -144,7 +144,11 @@ impl Table { self.data.queue_insert(tx, e) } - pub async fn insert_many(self: &Arc, entries: I) -> Result<(), Error> + pub async fn insert_many( + self: &Arc, + c: R::ConsistencyParam, + entries: I, + ) -> Result<(), Error> where I: IntoIterator + Send + Sync, IE: Borrow + Send + Sync, @@ -152,7 +156,7 @@ impl Table { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} insert_many", F::TABLE_NAME)); - self.insert_many_internal(entries) + self.insert_many_internal(c, entries) .bound_record_duration(&self.data.metrics.put_request_duration) .with_context(Context::current_with_span(span)) .await?; @@ -162,7 +166,11 @@ impl Table { Ok(()) } - async fn insert_many_internal(self: &Arc, entries: I) -> Result<(), Error> + async fn insert_many_internal( + self: &Arc, + c: R::ConsistencyParam, + entries: I, + ) -> Result<(), Error> where I: IntoIterator + Send + Sync, IE: Borrow + Send + Sync, @@ -181,7 +189,7 @@ impl Table { // a quorum of nodes has answered OK, then the insert has succeeded and // consistency properties (read-after-write) are preserved. - let quorum = self.data.replication.write_quorum(); + let quorum = self.data.replication.write_quorum(c); // Serialize all entries and compute the write sets for each of them. // In the case of sharded table replication, this also takes an "ack lock" @@ -283,6 +291,7 @@ impl Table { pub async fn get( self: &Arc, + c: R::ConsistencyParam, partition_key: &F::P, sort_key: &F::S, ) -> Result, Error> { @@ -290,7 +299,7 @@ impl Table { let span = tracer.start(format!("{} get", F::TABLE_NAME)); let res = self - .get_internal(partition_key, sort_key) + .get_internal(c, partition_key, sort_key) .bound_record_duration(&self.data.metrics.get_request_duration) .with_context(Context::current_with_span(span)) .await?; @@ -302,6 +311,7 @@ impl Table { async fn get_internal( self: &Arc, + c: R::ConsistencyParam, partition_key: &F::P, sort_key: &F::S, ) -> Result, Error> { @@ -317,7 +327,7 @@ impl Table { &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()), + .with_quorum(self.data.replication.read_quorum(c)), ) .await?; @@ -359,6 +369,7 @@ impl Table { pub async fn get_range( self: &Arc, + c: R::ConsistencyParam, partition_key: &F::P, begin_sort_key: Option, filter: Option, @@ -370,6 +381,7 @@ impl Table { let res = self .get_range_internal( + c, partition_key, begin_sort_key, filter, @@ -387,6 +399,7 @@ impl Table { async fn get_range_internal( self: &Arc, + c: R::ConsistencyParam, partition_key: &F::P, begin_sort_key: Option, filter: Option, @@ -412,7 +425,7 @@ impl Table { &who, rpc, RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(self.data.replication.read_quorum()), + .with_quorum(self.data.replication.read_quorum(c)), ) .await?; diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 69939f65..813e5a47 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -28,6 +28,7 @@ use garage_api::s3::error::{ }; use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx}; +use garage_model::bucket_table::BucketParams; use garage_model::garage::Garage; use garage_table::*; @@ -182,11 +183,20 @@ impl WebServer { } } - async fn check_key_exists(self: &Arc, bucket_id: Uuid, key: &str) -> Result { + async fn check_key_exists( + self: &Arc, + bucket_id: Uuid, + bucket_params: &BucketParams, + key: &str, + ) -> Result { let exists = self .garage .object_table - .get(&bucket_id, &key.to_string()) + .get( + *bucket_params.consistency_mode.get(), + &bucket_id, + &key.to_string(), + ) .await? .map(|object| object.versions().iter().any(|v| v.is_data())) .unwrap_or(false); @@ -211,7 +221,7 @@ impl WebServer { let bucket_id = self .garage .bucket_alias_table - .get(&EmptyKey, &bucket_name.to_string()) + .get((), &EmptyKey, &bucket_name.to_string()) .await? .and_then(|x| x.state.take()) .ok_or(Error::NotFound)?; @@ -246,13 +256,22 @@ impl WebServer { .map_err(ApiError::from) .map(|res| res.map(|_empty_body: EmptyBody| empty_body())), Method::HEAD => { - handle_head_without_ctx(self.garage.clone(), req, bucket_id, &key, None).await + handle_head_without_ctx( + self.garage.clone(), + req, + bucket_id, + &bucket_params, + &key, + None, + ) + .await } Method::GET => { handle_get_without_ctx( self.garage.clone(), req, bucket_id, + &bucket_params, &key, None, Default::default(), @@ -265,7 +284,9 @@ impl WebServer { // Try implicit redirect on error let ret_doc_with_redir = match (&ret_doc, may_redirect) { (Err(ApiError::NoSuchKey), ImplicitRedirect::To { key, url }) - if self.check_key_exists(bucket_id, key.as_str()).await? => + if self + .check_key_exists(bucket_id, &bucket_params, key.as_str()) + .await? => { Ok(Response::builder() .status(StatusCode::FOUND) @@ -306,6 +327,7 @@ impl WebServer { self.garage.clone(), &req2, bucket_id, + &bucket_params, &error_document, None, Default::default(),