diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 7f89d4b2..a91940d7 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -79,18 +79,24 @@ impl RequestHandler for GetBucketInfoRequest { garage: &Arc, _admin: &Admin, ) -> Result { - let bucket_id = match (self.id, self.global_alias, self.search) { - (Some(id), None, None) => parse_bucket_id(&id)?, - (None, Some(ga), None) => garage - .bucket_alias_table - .get(&EmptyKey, &ga) - .await? - .and_then(|x| *x.state.get()) - .ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?, + let bucket = match (self.id, self.global_alias, self.search) { + (Some(id), None, None) => { + let id = parse_bucket_id(&id)?; + garage.bucket_helper().get_existing_bucket(id).await? + } + (None, Some(ga), None) => { + let id = garage + .bucket_alias_table + .get(&EmptyKey, &ga) + .await? + .and_then(|x| *x.state.get()) + .ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?; + garage.bucket_helper().get_existing_bucket(id).await? + } (None, None, Some(search)) => { let helper = garage.bucket_helper(); - if let Some(uuid) = helper.resolve_global_bucket_name(&search).await? { - uuid + if let Some(bucket) = helper.resolve_global_bucket(&search).await? { + bucket } else { let hexdec = if search.len() >= 2 { search @@ -124,7 +130,7 @@ impl RequestHandler for GetBucketInfoRequest { if candidates.is_empty() { return Err(Error::Common(CommonError::NoSuchBucket(search.clone()))); } else if candidates.len() == 1 { - candidates.into_iter().next().unwrap().id + candidates.into_iter().next().unwrap() } else { return Err(Error::bad_request(format!( "Several matching buckets: {}", @@ -140,23 +146,18 @@ impl RequestHandler for GetBucketInfoRequest { } }; - bucket_info_results(garage, bucket_id).await + bucket_info_results(garage, bucket).await } } async fn bucket_info_results( garage: &Arc, - bucket_id: Uuid, + bucket: Bucket, ) -> Result { - let bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - let counters = garage .object_counter_table .table - .get(&bucket_id, &EmptyKey) + .get(&bucket.id, &EmptyKey) .await? .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); @@ -164,7 +165,7 @@ async fn bucket_info_results( let mpu_counters = garage .mpu_counter_table .table - .get(&bucket_id, &EmptyKey) + .get(&bucket.id, &EmptyKey) .await? .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); @@ -336,7 +337,7 @@ impl RequestHandler for CreateBucketRequest { } Ok(CreateBucketResponse( - bucket_info_results(garage, bucket.id).await?, + bucket_info_results(garage, bucket).await?, )) } } @@ -444,7 +445,7 @@ impl RequestHandler for UpdateBucketRequest { garage.bucket_table.insert(&bucket).await?; Ok(UpdateBucketResponse( - bucket_info_results(garage, bucket_id).await?, + bucket_info_results(garage, bucket).await?, )) } } @@ -534,7 +535,7 @@ pub async fn handle_bucket_change_key_perm( .set_bucket_key_permissions(bucket.id, &key.key_id, perm) .await?; - bucket_info_results(garage, bucket.id).await + bucket_info_results(garage, bucket).await } // ---- BUCKET ALIASES ---- @@ -551,11 +552,11 @@ impl RequestHandler for AddBucketAliasRequest { let helper = garage.locked_helper().await; - match self.alias { + let bucket = match self.alias { BucketAliasEnum::Global { global_alias } => { helper .set_global_bucket_alias(bucket_id, &global_alias) - .await?; + .await? } BucketAliasEnum::Local { local_alias, @@ -563,12 +564,12 @@ impl RequestHandler for AddBucketAliasRequest { } => { helper .set_local_bucket_alias(bucket_id, &access_key_id, &local_alias) - .await?; + .await? } - } + }; Ok(AddBucketAliasResponse( - bucket_info_results(garage, bucket_id).await?, + bucket_info_results(garage, bucket).await?, )) } } @@ -585,11 +586,11 @@ impl RequestHandler for RemoveBucketAliasRequest { let helper = garage.locked_helper().await; - match self.alias { + let bucket = match self.alias { BucketAliasEnum::Global { global_alias } => { helper .unset_global_bucket_alias(bucket_id, &global_alias) - .await?; + .await? } BucketAliasEnum::Local { local_alias, @@ -597,12 +598,12 @@ impl RequestHandler for RemoveBucketAliasRequest { } => { helper .unset_local_bucket_alias(bucket_id, &access_key_id, &local_alias) - .await?; + .await? } - } + }; Ok(RemoveBucketAliasResponse( - bucket_info_results(garage, bucket_id).await?, + bucket_info_results(garage, bucket).await?, )) } } diff --git a/src/api/admin/special.rs b/src/api/admin/special.rs index 0ecf82bc..0a4e6705 100644 --- a/src/api/admin/special.rs +++ b/src/api/admin/special.rs @@ -151,12 +151,11 @@ async fn check_domain(garage: &Arc, domain: &str) -> Result (domain.to_string(), true) }; - let bucket_id = match garage + let bucket = match garage .bucket_helper() - .resolve_global_bucket_name(&bucket_name) - .await? + .resolve_global_bucket_fast(&bucket_name)? { - Some(bucket_id) => bucket_id, + Some(b) => b, None => return Ok(false), }; @@ -164,11 +163,6 @@ async fn check_domain(garage: &Arc, domain: &str) -> Result return Ok(true); } - let bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - let bucket_state = bucket.state.as_option().unwrap(); let bucket_website_config = bucket_state.website_config.get(); diff --git a/src/api/common/cors.rs b/src/api/common/cors.rs index 09b55c13..a0ba6e48 100644 --- a/src/api/common/cors.rs +++ b/src/api/common/cors.rs @@ -9,9 +9,7 @@ use hyper::{body::Body, body::Incoming as IncomingBody, Request, Response, Statu use garage_model::bucket_table::{BucketParams, CorsRule as GarageCorsRule}; use garage_model::garage::Garage; -use crate::common_error::{ - helper_error_as_internal, CommonError, OkOrBadRequest, OkOrInternalError, -}; +use crate::common_error::{CommonError, OkOrBadRequest, OkOrInternalError}; use crate::helpers::*; pub fn find_matching_cors_rule<'a, B>( @@ -76,7 +74,7 @@ pub fn add_cors_headers( Ok(()) } -pub async fn handle_options_api( +pub fn handle_options_api( garage: Arc, req: &Request, bucket_name: Option, @@ -93,16 +91,8 @@ pub async fn handle_options_api( // OPTIONS calls are not auhtenticated). if let Some(bn) = bucket_name { let helper = garage.bucket_helper(); - let bucket_id = helper - .resolve_global_bucket_name(&bn) - .await - .map_err(helper_error_as_internal)?; - if let Some(id) = bucket_id { - let bucket = garage - .bucket_helper() - .get_existing_bucket(id) - .await - .map_err(helper_error_as_internal)?; + let bucket_opt = helper.resolve_global_bucket_fast(&bn)?; + if let Some(bucket) = bucket_opt { let bucket_params = bucket.state.into_option().unwrap(); handle_options_for_bucket(req, &bucket_params) } else { diff --git a/src/api/common/signature/mod.rs b/src/api/common/signature/mod.rs index 50fbd304..6f1748c3 100644 --- a/src/api/common/signature/mod.rs +++ b/src/api/common/signature/mod.rs @@ -64,12 +64,12 @@ pub struct VerifiedRequest { pub content_sha256_header: ContentSha256Header, } -pub async fn verify_request( +pub fn verify_request( garage: &Garage, mut req: Request, service: &'static str, ) -> Result { - let checked_signature = payload::check_payload_signature(&garage, &mut req, service).await?; + let checked_signature = payload::check_payload_signature(&garage, &mut req, service)?; let request = streaming::parse_streaming_body( req, diff --git a/src/api/common/signature/payload.rs b/src/api/common/signature/payload.rs index 2d5f8603..8386607d 100644 --- a/src/api/common/signature/payload.rs +++ b/src/api/common/signature/payload.rs @@ -32,7 +32,7 @@ pub struct CheckedSignature { pub signature_header: Option, } -pub async fn check_payload_signature( +pub fn check_payload_signature( garage: &Garage, request: &mut Request, service: &'static str, @@ -43,9 +43,9 @@ pub async fn check_payload_signature( // We check for presigned-URL-style authentication first, because // the browser or something else could inject an Authorization header // that is totally unrelated to AWS signatures. - check_presigned_signature(garage, service, request, query).await + check_presigned_signature(garage, service, request, query) } else if request.headers().contains_key(AUTHORIZATION) { - check_standard_signature(garage, service, request, query).await + check_standard_signature(garage, service, request, query) } else { // Unsigned (anonymous) request let content_sha256 = request @@ -93,7 +93,7 @@ fn parse_x_amz_content_sha256(header: Option<&str>) -> Result, @@ -128,7 +128,7 @@ async fn check_standard_signature( trace!("canonical request:\n{}", canonical_request); trace!("string to sign:\n{}", string_to_sign); - let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?; + let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes())?; let content_sha256_header = parse_x_amz_content_sha256(Some(&authorization.content_sha256))?; @@ -139,7 +139,7 @@ async fn check_standard_signature( }) } -async fn check_presigned_signature( +fn check_presigned_signature( garage: &Garage, service: &'static str, request: &mut Request, @@ -178,7 +178,7 @@ async fn check_presigned_signature( trace!("canonical request (presigned url):\n{}", canonical_request); trace!("string to sign (presigned url):\n{}", string_to_sign); - let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?; + let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes())?; // In the page on presigned URLs, AWS specifies that if a signed query // parameter and a signed header of the same name have different values, @@ -378,7 +378,7 @@ pub fn parse_date(date: &str) -> Result, Error> { Ok(Utc.from_utc_datetime(&date)) } -pub async fn verify_v4( +pub fn verify_v4( garage: &Garage, service: &str, auth: &Authorization, @@ -391,8 +391,7 @@ pub async fn verify_v4( let key = garage .key_table - .get(&EmptyKey, &auth.key_id) - .await? + .get_local(&EmptyKey, &auth.key_id)? .filter(|k| !k.state.is_deleted()) .ok_or_else(|| Error::forbidden(format!("No such key: {}", &auth.key_id)))?; let key_p = key.params().unwrap(); diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index dfa22dd2..8ace37d4 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -77,25 +77,19 @@ impl ApiHandler for K2VApiServer { // The OPTIONS method is processed early, before we even check for an API key if let Endpoint::Options = endpoint { let options_res = handle_options_api(garage, &req, Some(bucket_name)) - .await .ok_or_bad_request("Error handling OPTIONS")?; return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let verified_request = verify_request(&garage, req, "k2v").await?; + let verified_request = verify_request(&garage, req, "k2v")?; let req = verified_request.request; let api_key = verified_request.access_key; - let bucket_id = garage - .bucket_helper() - .resolve_bucket(&bucket_name, &api_key) - .await - .map_err(pass_helper_error)?; let bucket = garage .bucket_helper() - .get_existing_bucket(bucket_id) - .await - .map_err(helper_error_as_internal)?; + .resolve_bucket_fast(&bucket_name, &api_key) + .map_err(pass_helper_error)?; + let bucket_id = bucket.id; let bucket_params = bucket.state.into_option().unwrap(); let allowed = match endpoint.authorization_type() { diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs index 257ff893..55737268 100644 --- a/src/api/k2v/error.rs +++ b/src/api/k2v/error.rs @@ -2,8 +2,8 @@ use err_derive::Error; use hyper::header::HeaderValue; use hyper::{HeaderMap, StatusCode}; +pub(crate) use garage_api_common::common_error::pass_helper_error; use garage_api_common::common_error::{commonErrorDerivative, CommonError}; -pub(crate) use garage_api_common::common_error::{helper_error_as_internal, pass_helper_error}; pub use garage_api_common::common_error::{ CommonErrorDerivative, OkOrBadRequest, OkOrInternalError, }; diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 4cca21ed..1c967d58 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -118,11 +118,11 @@ impl ApiHandler for S3ApiServer { return handle_post_object(garage, req, bucket_name.unwrap()).await; } if let Endpoint::Options = endpoint { - let options_res = handle_options_api(garage, &req, bucket_name).await?; + let options_res = handle_options_api(garage, &req, bucket_name)?; return Ok(options_res.map(|_empty_body: EmptyBody| empty_body())); } - let verified_request = verify_request(&garage, req, "s3").await?; + let verified_request = verify_request(&garage, req, "s3")?; let req = verified_request.request; let api_key = verified_request.access_key; @@ -140,15 +140,11 @@ impl ApiHandler for S3ApiServer { return handle_create_bucket(&garage, req, &api_key.key_id, bucket_name).await; } - let bucket_id = garage - .bucket_helper() - .resolve_bucket(&bucket_name, &api_key) - .await - .map_err(pass_helper_error)?; let bucket = garage .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; + .resolve_bucket_fast(&bucket_name, &api_key) + .map_err(pass_helper_error)?; + let bucket_id = bucket.id; let bucket_params = bucket.state.into_option().unwrap(); let allowed = match endpoint.authorization_type() { diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 3a09e769..7b5d714f 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -143,21 +143,16 @@ pub async fn handle_create_bucket( let api_key = helper.key().get_existing_key(api_key_id).await?; let key_params = api_key.params().unwrap(); - let existing_bucket = if let Some(Some(bucket_id)) = key_params.local_aliases.get(&bucket_name) - { - Some(*bucket_id) - } else { - helper - .bucket() - .resolve_global_bucket_name(&bucket_name) - .await? - }; + let existing_bucket = helper + .bucket() + .resolve_bucket(&bucket_name, &api_key.key_id) + .await?; - if let Some(bucket_id) = existing_bucket { + if let Some(bucket) = existing_bucket { // Check we have write or owner permission on the bucket, // in that case it's fine, return 200 OK, bucket exists; // otherwise return a forbidden error. - let kp = api_key.bucket_permissions(&bucket_id); + let kp = api_key.bucket_permissions(&bucket.id); if !(kp.allow_write || kp.allow_owner) { return Err(CommonError::BucketAlreadyExists.into()); } diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 7c67a65d..8892d4ff 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -683,16 +683,15 @@ async fn get_copy_source(ctx: &ReqCtx, req: &Request) -> Result) -> Result(pub(crate) &'a Garage); #[allow(clippy::ptr_arg)] impl<'a> BucketHelper<'a> { - pub async fn resolve_global_bucket_name( + // ================ + // Local functions to find buckets FAST. + // This is only for the fast path in API requests. + // They do not conserve the read-after-write guarantee. + // ================ + + /// Return bucket ID corresponding to global bucket name. + /// + /// The name can be of two forms: + /// 1. A global bucket alias + /// 2. The full ID of a bucket encoded in hex + /// + /// This will not do any network interaction to check the alias table, + /// it will only check the local copy of the table. + /// As a consequence, it does not conserve read-after-write guarantees. + pub fn resolve_global_bucket_fast( &self, bucket_name: &String, - ) -> Result, Error> { + ) -> Result, GarageError> { // Bucket names in Garage are aliases, true bucket identifiers // are 32-byte UUIDs. This function resolves bucket names into // their full identifier by looking up in the bucket_alias_table. @@ -32,38 +47,129 @@ impl<'a> BucketHelper<'a> { let hexbucket = hex::decode(bucket_name.as_str()) .ok() .and_then(|by| Uuid::try_from(&by)); - if let Some(bucket_id) = hexbucket { - Ok(self - .0 - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .filter(|x| !x.state.is_deleted()) - .map(|_| bucket_id)) - } else { - Ok(self - .0 - .bucket_alias_table - .get(&EmptyKey, bucket_name) - .await? - .and_then(|x| *x.state.get())) - } + let bucket_id = match hexbucket { + Some(id) => id, + None => { + let alias = self + .0 + .bucket_alias_table + .get_local(&EmptyKey, bucket_name)? + .and_then(|x| *x.state.get()); + match alias { + Some(id) => id, + None => return Ok(None), + } + } + }; + Ok(self + .0 + .bucket_table + .get_local(&EmptyKey, &bucket_id)? + .filter(|x| !x.state.is_deleted())) } + /// Return bucket ID corresponding to a bucket name from the perspective of + /// a given access key. + /// + /// The name can be of three forms: + /// 1. A global bucket alias + /// 2. A local bucket alias + /// 3. The full ID of a bucket encoded in hex + /// + /// This will not do any network interaction to check the alias table, + /// it will only check the local copy of the table. + /// As a consequence, it does not conserve read-after-write guarantees. + /// + /// This function transforms non-existing buckets in a NoSuchBucket error. #[allow(clippy::ptr_arg)] - pub async fn resolve_bucket(&self, bucket_name: &String, api_key: &Key) -> Result { + pub fn resolve_bucket_fast( + &self, + bucket_name: &String, + api_key: &Key, + ) -> Result { let api_key_params = api_key .state .as_option() .ok_or_message("Key should not be deleted at this point")?; - if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) { - Ok(*bucket_id) - } else { + let bucket_opt = + if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) { + self.0 + .bucket_table + .get_local(&EmptyKey, &bucket_id)? + .filter(|x| !x.state.is_deleted()) + } else { + self.resolve_global_bucket_fast(bucket_name)? + }; + bucket_opt.ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string())) + } + + // ================ + // Global functions that do quorum reads/writes, + // for admin operations. + // ================ + + /// See resolve_global_bucket_fast, + /// but this one does a quorum read to ensure consistency + pub async fn resolve_global_bucket( + &self, + bucket_name: &String, + ) -> Result, GarageError> { + let hexbucket = hex::decode(bucket_name.as_str()) + .ok() + .and_then(|by| Uuid::try_from(&by)); + let bucket_id = match hexbucket { + Some(id) => id, + None => { + let alias = self + .0 + .bucket_alias_table + .get(&EmptyKey, bucket_name) + .await? + .and_then(|x| *x.state.get()); + match alias { + Some(id) => id, + None => return Ok(None), + } + } + }; + Ok(self + .0 + .bucket_table + .get(&EmptyKey, &bucket_id) + .await? + .filter(|x| !x.state.is_deleted())) + } + + /// See resolve_bucket_fast, but this one does a quorum read to ensure consistency. + /// Also, this function does not return a HelperError::NoSuchBucket if bucket is absent. + #[allow(clippy::ptr_arg)] + pub async fn resolve_bucket( + &self, + bucket_name: &String, + key_id: &String, + ) -> Result, GarageError> { + let local_alias = self + .0 + .key_table + .get(&EmptyKey, &key_id) + .await? + .and_then(|k| k.state.into_option()) + .ok_or_else(|| GarageError::Message(format!("access key {} has been deleted", key_id)))? + .local_aliases + .get(bucket_name) + .copied() + .flatten(); + + if let Some(bucket_id) = local_alias { Ok(self - .resolve_global_bucket_name(bucket_name) + .0 + .bucket_table + .get(&EmptyKey, &bucket_id) .await? - .ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string()))?) + .filter(|x| !x.state.is_deleted())) + } else { + Ok(self.resolve_global_bucket(bucket_name).await?) } } diff --git a/src/model/helper/locked.rs b/src/model/helper/locked.rs index 482e91b0..9d6a8d36 100644 --- a/src/model/helper/locked.rs +++ b/src/model/helper/locked.rs @@ -6,6 +6,7 @@ use garage_util::time::*; use garage_table::util::*; use crate::bucket_alias_table::*; +use crate::bucket_table::*; use crate::garage::Garage; use crate::helper::bucket::BucketHelper; use crate::helper::error::*; @@ -56,7 +57,7 @@ impl<'a> LockedHelper<'a> { &self, bucket_id: Uuid, alias_name: &String, - ) -> Result<(), Error> { + ) -> Result { if !is_valid_bucket_name(alias_name) { return Err(Error::InvalidBucketName(alias_name.to_string())); } @@ -100,7 +101,7 @@ impl<'a> LockedHelper<'a> { bucket_p.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, true); self.0.bucket_table.insert(&bucket).await?; - Ok(()) + Ok(bucket) } /// Unsets an alias for a bucket in global namespace. @@ -112,7 +113,7 @@ impl<'a> LockedHelper<'a> { &self, bucket_id: Uuid, alias_name: &String, - ) -> Result<(), Error> { + ) -> Result { let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?; let bucket_state = bucket.state.as_option_mut().unwrap(); @@ -156,7 +157,7 @@ impl<'a> LockedHelper<'a> { bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false); self.0.bucket_table.insert(&bucket).await?; - Ok(()) + Ok(bucket) } /// Ensures a bucket does not have a certain global alias. @@ -215,7 +216,7 @@ impl<'a> LockedHelper<'a> { bucket_id: Uuid, key_id: &String, alias_name: &String, - ) -> Result<(), Error> { + ) -> Result { let key_helper = KeyHelper(self.0); if !is_valid_bucket_name(alias_name) { @@ -257,7 +258,7 @@ impl<'a> LockedHelper<'a> { bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true); self.0.bucket_table.insert(&bucket).await?; - Ok(()) + Ok(bucket) } /// Unsets an alias for a bucket in the local namespace of a key. @@ -271,7 +272,7 @@ impl<'a> LockedHelper<'a> { bucket_id: Uuid, key_id: &String, alias_name: &String, - ) -> Result<(), Error> { + ) -> Result { let key_helper = KeyHelper(self.0); let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?; @@ -330,7 +331,7 @@ impl<'a> LockedHelper<'a> { bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false); self.0.bucket_table.insert(&bucket).await?; - Ok(()) + Ok(bucket) } /// Sets permissions for a key on a bucket. diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 821f4549..ddc356b5 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -451,10 +451,7 @@ impl K2VRpcHandler { let mut value = self .item_table - .data - .read_entry(&key.partition, &key.sort_key)? - .map(|bytes| self.item_table.data.decode_entry(&bytes[..])) - .transpose()? + .get_local(&key.partition, &key.sort_key)? .unwrap_or_else(|| { K2VItem::new( key.partition.bucket_id, diff --git a/src/table/table.rs b/src/table/table.rs index c96f4731..565d27a5 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -482,6 +482,15 @@ impl Table { Ok(ret_vec) } + pub fn get_local( + self: &Arc, + partition_key: &F::P, + sort_key: &F::S, + ) -> Result, Error> { + let bytes = self.data.read_entry(partition_key, sort_key)?; + bytes.map(|b| self.data.decode_entry(&b)).transpose() + } + // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {