diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 9284f00f..5a35ef56 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; use garage_util::error::Error as GarageError; -use garage_table::TableSchema; +use garage_table::{EnumerationOrder, TableSchema}; use garage_model::garage::Garage; use garage_model::k2v::causality::*; @@ -115,6 +115,7 @@ async fn handle_read_batch_query( &query.end, query.limit, Some(filter), + EnumerationOrder::Forward, ) .await?; @@ -222,6 +223,7 @@ async fn handle_delete_batch_query( &query.end, None, Some(filter), + EnumerationOrder::Forward, ) .await?; assert!(!more); diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index ceb2cf1f..bec1d8c7 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -33,6 +33,7 @@ pub async fn handle_read_index( &end, limit, Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), + EnumerationOrder::Forward, ) .await?; diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs index ae04d896..7b99b02d 100644 --- a/src/api/k2v/range.rs +++ b/src/api/k2v/range.rs @@ -11,6 +11,7 @@ use crate::error::*; /// Read range in a Garage table. /// Returns (entries, more?, nextStart) +#[allow(clippy::too_many_arguments)] pub(crate) async fn read_range( table: &Arc>, partition_key: &F::P, @@ -19,6 +20,7 @@ pub(crate) async fn read_range( end: &Option, limit: Option, filter: Option, + enumeration_order: EnumerationOrder, ) -> Result<(Vec, bool, Option), Error> where F: TableSchema + 'static, @@ -46,7 +48,13 @@ where limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2, ); let get_ret = table - .get_range(partition_key, Some(start.clone()), filter.clone(), n_get) + .get_range( + partition_key, + Some(start.clone()), + filter.clone(), + n_get, + enumeration_order, + ) .await?; let get_ret_len = get_ret.len(); diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 92149074..93048a8c 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -230,7 +230,13 @@ pub async fn handle_delete_bucket( // Check bucket is empty let objects = garage .object_table - .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10) + .get_range( + &bucket_id, + None, + Some(ObjectFilter::IsData), + 10, + EnumerationOrder::Forward, + ) .await?; if !objects.is_empty() { return Err(Error::BucketNotEmpty); diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index c0d6721d..966f2fe2 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -13,7 +13,7 @@ use garage_model::garage::Garage; use garage_model::s3::object_table::*; use garage_model::s3::version_table::Version; -use garage_table::EmptyKey; +use garage_table::{EmptyKey, EnumerationOrder}; use crate::encoding::*; use crate::error::*; @@ -66,8 +66,14 @@ pub async fn handle_list( let io = |bucket, key, count| { let t = &garage.object_table; async move { - t.get_range(&bucket, key, Some(ObjectFilter::IsData), count) - .await + t.get_range( + &bucket, + key, + Some(ObjectFilter::IsData), + count, + EnumerationOrder::Forward, + ) + .await } }; @@ -165,8 +171,14 @@ pub async fn handle_list_multipart_upload( let io = |bucket, key, count| { let t = &garage.object_table; async move { - t.get_range(&bucket, key, Some(ObjectFilter::IsUploading), count) - .await + t.get_range( + &bucket, + key, + Some(ObjectFilter::IsUploading), + count, + EnumerationOrder::Forward, + ) + .await } }; diff --git a/src/garage/admin.rs b/src/garage/admin.rs index de628f1d..af0c3f22 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -80,7 +80,13 @@ impl AdminRpcHandler { let buckets = self .garage .bucket_table - .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) + .get_range( + &EmptyKey, + None, + Some(DeletedFilter::NotDeleted), + 10000, + EnumerationOrder::Forward, + ) .await?; Ok(AdminRpc::BucketList(buckets)) } @@ -210,7 +216,13 @@ impl AdminRpcHandler { let objects = self .garage .object_table - .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10) + .get_range( + &bucket_id, + None, + Some(ObjectFilter::IsData), + 10, + EnumerationOrder::Forward, + ) .await?; if !objects.is_empty() { return Err(Error::BadRequest(format!( @@ -445,6 +457,7 @@ impl AdminRpcHandler { None, Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), 10000, + EnumerationOrder::Forward, ) .await? .iter() diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 706faf26..54d2f97b 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -1,4 +1,4 @@ -use garage_table::util::EmptyKey; +use garage_table::util::*; use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::{Error as GarageError, OkOrMessage}; @@ -116,6 +116,7 @@ impl<'a> BucketHelper<'a> { None, Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())), 10, + EnumerationOrder::Forward, ) .await? .into_iter() diff --git a/src/table/data.rs b/src/table/data.rs index daa3c62a..f3ef9f31 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -2,7 +2,7 @@ use core::borrow::Borrow; use std::sync::Arc; use serde_bytes::ByteBuf; -use sled::Transactional; +use sled::{IVec, Transactional}; use tokio::sync::Notify; use garage_util::data::*; @@ -16,6 +16,7 @@ use crate::gc::GcTodoEntry; use crate::metrics::*; use crate::replication::*; use crate::schema::*; +use crate::util::*; pub struct TableData { system: Arc, @@ -87,14 +88,34 @@ where s: &Option, filter: &Option, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result>, Error> { let partition_hash = p.hash(); let first_key = match s { None => partition_hash.to_vec(), Some(sk) => self.tree_key(p, sk), }; + match enumeration_order { + EnumerationOrder::Forward => { + let range = self.store.range(first_key..); + self.read_range_aux(partition_hash, range, filter, limit) + } + EnumerationOrder::Reverse => { + let range = self.store.range(..=first_key).rev(); + self.read_range_aux(partition_hash, range, filter, limit) + } + } + } + + fn read_range_aux( + &self, + partition_hash: Hash, + range: impl Iterator>, + filter: &Option, + limit: usize, + ) -> Result>, Error> { let mut ret = vec![]; - for item in self.store.range(first_key..) { + for item in range { let (key, value) = item?; if &key[..32] != partition_hash.as_slice() { break; diff --git a/src/table/table.rs b/src/table/table.rs index f3e5b881..e26eb215 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use std::time::Duration; @@ -26,6 +26,7 @@ use crate::merkle::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; +use crate::util::*; pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); @@ -45,7 +46,13 @@ pub(crate) enum TableRpc { ReadEntryResponse(Option), // Read range: read all keys in partition P, possibly starting at a certain sort key offset - ReadRange(F::P, Option, Option, usize), + ReadRange { + partition: F::P, + begin_sort_key: Option, + filter: Option, + limit: usize, + enumeration_order: EnumerationOrder, + }, Update(Vec>), } @@ -261,12 +268,19 @@ where begin_sort_key: Option, filter: Option, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result, Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} get_range", F::TABLE_NAME)); let res = self - .get_range_internal(partition_key, begin_sort_key, filter, limit) + .get_range_internal( + partition_key, + begin_sort_key, + filter, + limit, + enumeration_order, + ) .bound_record_duration(&self.data.metrics.get_request_duration) .with_context(Context::current_with_span(span)) .await?; @@ -282,11 +296,18 @@ where begin_sort_key: Option, filter: Option, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result, Error> { let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); - let rpc = TableRpc::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); + let rpc = TableRpc::::ReadRange { + partition: partition_key.clone(), + begin_sort_key, + filter, + limit, + enumeration_order, + }; let resps = self .system @@ -302,44 +323,65 @@ where ) .await?; - let mut ret = BTreeMap::new(); - let mut to_repair = BTreeMap::new(); + let mut ret: BTreeMap, F::E> = BTreeMap::new(); + let mut to_repair = BTreeSet::new(); for resp in resps { if let TableRpc::Update(entries) = resp { for entry_bytes in entries.iter() { let entry = self.data.decode_entry(entry_bytes.as_slice())?; let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); - match ret.remove(&entry_key) { - None => { - ret.insert(entry_key, Some(entry)); - } - Some(Some(mut prev)) => { - let must_repair = prev != entry; - prev.merge(&entry); - if must_repair { - to_repair.insert(entry_key.clone(), Some(prev.clone())); + match ret.get_mut(&entry_key) { + Some(e) => { + if *e != entry { + e.merge(&entry); + to_repair.insert(entry_key.clone()); } - ret.insert(entry_key, Some(prev)); } - Some(None) => unreachable!(), + None => { + ret.insert(entry_key, entry); + } } } + } else { + return Err(Error::unexpected_rpc_message(resp)); } } + if !to_repair.is_empty() { let self2 = self.clone(); + let to_repair = to_repair + .into_iter() + .map(|k| ret.get(&k).unwrap().clone()) + .collect::>(); self.system.background.spawn_cancellable(async move { - for (_, v) in to_repair.iter_mut() { - self2.repair_on_read(&who[..], v.take().unwrap()).await?; + for v in to_repair { + self2.repair_on_read(&who[..], v).await?; } Ok(()) }); } - let ret_vec = ret - .iter_mut() - .take(limit) - .map(|(_k, v)| v.take().unwrap()) - .collect::>(); + + // At this point, the `ret` btreemap might contain more than `limit` + // items, because nodes might have returned us each `limit` items + // but for different keys. We have to take only the first `limit` items + // in this map, in the specified enumeration order, for two reasons: + // 1. To return to the user no more than the number of items that they requested + // 2. To return only items for which we have a read quorum: we do not know + // that we have a read quorum for the items after the first `limit` + // of them + let ret_vec = match enumeration_order { + EnumerationOrder::Forward => ret + .into_iter() + .take(limit) + .map(|(_k, v)| v) + .collect::>(), + EnumerationOrder::Reverse => ret + .into_iter() + .rev() + .take(limit) + .map(|(_k, v)| v) + .collect::>(), + }; Ok(ret_vec) } @@ -378,8 +420,20 @@ where let value = self.data.read_entry(key, sort_key)?; Ok(TableRpc::ReadEntryResponse(value)) } - TableRpc::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; + TableRpc::ReadRange { + partition, + begin_sort_key, + filter, + limit, + enumeration_order, + } => { + let values = self.data.read_range( + partition, + begin_sort_key, + filter, + *limit, + *enumeration_order, + )?; Ok(TableRpc::Update(values)) } TableRpc::Update(pairs) => { diff --git a/src/table/util.rs b/src/table/util.rs index 6496ba87..20595a94 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -33,3 +33,19 @@ impl DeletedFilter { } } } + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum EnumerationOrder { + Forward, + Reverse, +} + +impl EnumerationOrder { + pub fn from_reverse(reverse: bool) -> Self { + if reverse { + Self::Reverse + } else { + Self::Forward + } + } +}