Add EnumerationOrder parameter to table range queries
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is passing

This commit is contained in:
Alex 2022-04-28 10:31:57 +02:00
parent d7e2eb166d
commit 3ac6970a24
Signed by: lx
GPG key ID: 0E496D15096376BE
10 changed files with 173 additions and 39 deletions

View file

@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_table::TableSchema; use garage_table::{EnumerationOrder, TableSchema};
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::k2v::causality::*; use garage_model::k2v::causality::*;
@ -115,6 +115,7 @@ async fn handle_read_batch_query(
&query.end, &query.end,
query.limit, query.limit,
Some(filter), Some(filter),
EnumerationOrder::Forward,
) )
.await?; .await?;
@ -222,6 +223,7 @@ async fn handle_delete_batch_query(
&query.end, &query.end,
None, None,
Some(filter), Some(filter),
EnumerationOrder::Forward,
) )
.await?; .await?;
assert!(!more); assert!(!more);

View file

@ -33,6 +33,7 @@ pub async fn handle_read_index(
&end, &end,
limit, limit,
Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
EnumerationOrder::Forward,
) )
.await?; .await?;

View file

@ -11,6 +11,7 @@ use crate::error::*;
/// Read range in a Garage table. /// Read range in a Garage table.
/// Returns (entries, more?, nextStart) /// Returns (entries, more?, nextStart)
#[allow(clippy::too_many_arguments)]
pub(crate) async fn read_range<F>( pub(crate) async fn read_range<F>(
table: &Arc<Table<F, TableShardedReplication>>, table: &Arc<Table<F, TableShardedReplication>>,
partition_key: &F::P, partition_key: &F::P,
@ -19,6 +20,7 @@ pub(crate) async fn read_range<F>(
end: &Option<String>, end: &Option<String>,
limit: Option<u64>, limit: Option<u64>,
filter: Option<F::Filter>, filter: Option<F::Filter>,
enumeration_order: EnumerationOrder,
) -> Result<(Vec<F::E>, bool, Option<String>), Error> ) -> Result<(Vec<F::E>, bool, Option<String>), Error>
where where
F: TableSchema<S = String> + 'static, F: TableSchema<S = String> + 'static,
@ -46,7 +48,13 @@ where
limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2, limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2,
); );
let get_ret = table let get_ret = table
.get_range(partition_key, Some(start.clone()), filter.clone(), n_get) .get_range(
partition_key,
Some(start.clone()),
filter.clone(),
n_get,
enumeration_order,
)
.await?; .await?;
let get_ret_len = get_ret.len(); let get_ret_len = get_ret.len();

View file

@ -230,7 +230,13 @@ pub async fn handle_delete_bucket(
// Check bucket is empty // Check bucket is empty
let objects = garage let objects = garage
.object_table .object_table
.get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10) .get_range(
&bucket_id,
None,
Some(ObjectFilter::IsData),
10,
EnumerationOrder::Forward,
)
.await?; .await?;
if !objects.is_empty() { if !objects.is_empty() {
return Err(Error::BucketNotEmpty); return Err(Error::BucketNotEmpty);

View file

@ -13,7 +13,7 @@ use garage_model::garage::Garage;
use garage_model::s3::object_table::*; use garage_model::s3::object_table::*;
use garage_model::s3::version_table::Version; use garage_model::s3::version_table::Version;
use garage_table::EmptyKey; use garage_table::{EmptyKey, EnumerationOrder};
use crate::encoding::*; use crate::encoding::*;
use crate::error::*; use crate::error::*;
@ -66,8 +66,14 @@ pub async fn handle_list(
let io = |bucket, key, count| { let io = |bucket, key, count| {
let t = &garage.object_table; let t = &garage.object_table;
async move { async move {
t.get_range(&bucket, key, Some(ObjectFilter::IsData), count) t.get_range(
.await &bucket,
key,
Some(ObjectFilter::IsData),
count,
EnumerationOrder::Forward,
)
.await
} }
}; };
@ -165,8 +171,14 @@ pub async fn handle_list_multipart_upload(
let io = |bucket, key, count| { let io = |bucket, key, count| {
let t = &garage.object_table; let t = &garage.object_table;
async move { async move {
t.get_range(&bucket, key, Some(ObjectFilter::IsUploading), count) t.get_range(
.await &bucket,
key,
Some(ObjectFilter::IsUploading),
count,
EnumerationOrder::Forward,
)
.await
} }
}; };

View file

@ -80,7 +80,13 @@ impl AdminRpcHandler {
let buckets = self let buckets = self
.garage .garage
.bucket_table .bucket_table
.get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) .get_range(
&EmptyKey,
None,
Some(DeletedFilter::NotDeleted),
10000,
EnumerationOrder::Forward,
)
.await?; .await?;
Ok(AdminRpc::BucketList(buckets)) Ok(AdminRpc::BucketList(buckets))
} }
@ -210,7 +216,13 @@ impl AdminRpcHandler {
let objects = self let objects = self
.garage .garage
.object_table .object_table
.get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10) .get_range(
&bucket_id,
None,
Some(ObjectFilter::IsData),
10,
EnumerationOrder::Forward,
)
.await?; .await?;
if !objects.is_empty() { if !objects.is_empty() {
return Err(Error::BadRequest(format!( return Err(Error::BadRequest(format!(
@ -445,6 +457,7 @@ impl AdminRpcHandler {
None, None,
Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
10000, 10000,
EnumerationOrder::Forward,
) )
.await? .await?
.iter() .iter()

View file

@ -1,4 +1,4 @@
use garage_table::util::EmptyKey; use garage_table::util::*;
use garage_util::crdt::*; use garage_util::crdt::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage}; use garage_util::error::{Error as GarageError, OkOrMessage};
@ -116,6 +116,7 @@ impl<'a> BucketHelper<'a> {
None, None,
Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())), Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())),
10, 10,
EnumerationOrder::Forward,
) )
.await? .await?
.into_iter() .into_iter()

View file

@ -2,7 +2,7 @@ use core::borrow::Borrow;
use std::sync::Arc; use std::sync::Arc;
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
use sled::Transactional; use sled::{IVec, Transactional};
use tokio::sync::Notify; use tokio::sync::Notify;
use garage_util::data::*; use garage_util::data::*;
@ -16,6 +16,7 @@ use crate::gc::GcTodoEntry;
use crate::metrics::*; use crate::metrics::*;
use crate::replication::*; use crate::replication::*;
use crate::schema::*; use crate::schema::*;
use crate::util::*;
pub struct TableData<F: TableSchema, R: TableReplication> { pub struct TableData<F: TableSchema, R: TableReplication> {
system: Arc<System>, system: Arc<System>,
@ -87,14 +88,34 @@ where
s: &Option<F::S>, s: &Option<F::S>,
filter: &Option<F::Filter>, filter: &Option<F::Filter>,
limit: usize, limit: usize,
enumeration_order: EnumerationOrder,
) -> Result<Vec<Arc<ByteBuf>>, Error> { ) -> Result<Vec<Arc<ByteBuf>>, Error> {
let partition_hash = p.hash(); let partition_hash = p.hash();
let first_key = match s { let first_key = match s {
None => partition_hash.to_vec(), None => partition_hash.to_vec(),
Some(sk) => self.tree_key(p, sk), Some(sk) => self.tree_key(p, sk),
}; };
match enumeration_order {
EnumerationOrder::Forward => {
let range = self.store.range(first_key..);
self.read_range_aux(partition_hash, range, filter, limit)
}
EnumerationOrder::Reverse => {
let range = self.store.range(..=first_key).rev();
self.read_range_aux(partition_hash, range, filter, limit)
}
}
}
fn read_range_aux(
&self,
partition_hash: Hash,
range: impl Iterator<Item = sled::Result<(IVec, IVec)>>,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
let mut ret = vec![]; let mut ret = vec![];
for item in self.store.range(first_key..) { for item in range {
let (key, value) = item?; let (key, value) = item?;
if &key[..32] != partition_hash.as_slice() { if &key[..32] != partition_hash.as_slice() {
break; break;

View file

@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -26,6 +26,7 @@ use crate::merkle::*;
use crate::replication::*; use crate::replication::*;
use crate::schema::*; use crate::schema::*;
use crate::sync::*; use crate::sync::*;
use crate::util::*;
pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
@ -45,7 +46,13 @@ pub(crate) enum TableRpc<F: TableSchema> {
ReadEntryResponse(Option<ByteBuf>), ReadEntryResponse(Option<ByteBuf>),
// Read range: read all keys in partition P, possibly starting at a certain sort key offset // Read range: read all keys in partition P, possibly starting at a certain sort key offset
ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize), ReadRange {
partition: F::P,
begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
limit: usize,
enumeration_order: EnumerationOrder,
},
Update(Vec<Arc<ByteBuf>>), Update(Vec<Arc<ByteBuf>>),
} }
@ -261,12 +268,19 @@ where
begin_sort_key: Option<F::S>, begin_sort_key: Option<F::S>,
filter: Option<F::Filter>, filter: Option<F::Filter>,
limit: usize, limit: usize,
enumeration_order: EnumerationOrder,
) -> Result<Vec<F::E>, Error> { ) -> Result<Vec<F::E>, Error> {
let tracer = opentelemetry::global::tracer("garage_table"); let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} get_range", F::TABLE_NAME)); let span = tracer.start(format!("{} get_range", F::TABLE_NAME));
let res = self let res = self
.get_range_internal(partition_key, begin_sort_key, filter, limit) .get_range_internal(
partition_key,
begin_sort_key,
filter,
limit,
enumeration_order,
)
.bound_record_duration(&self.data.metrics.get_request_duration) .bound_record_duration(&self.data.metrics.get_request_duration)
.with_context(Context::current_with_span(span)) .with_context(Context::current_with_span(span))
.await?; .await?;
@ -282,11 +296,18 @@ where
begin_sort_key: Option<F::S>, begin_sort_key: Option<F::S>,
filter: Option<F::Filter>, filter: Option<F::Filter>,
limit: usize, limit: usize,
enumeration_order: EnumerationOrder,
) -> Result<Vec<F::E>, Error> { ) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash(); let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash); let who = self.data.replication.read_nodes(&hash);
let rpc = TableRpc::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let rpc = TableRpc::<F>::ReadRange {
partition: partition_key.clone(),
begin_sort_key,
filter,
limit,
enumeration_order,
};
let resps = self let resps = self
.system .system
@ -302,44 +323,65 @@ where
) )
.await?; .await?;
let mut ret = BTreeMap::new(); let mut ret: BTreeMap<Vec<u8>, F::E> = BTreeMap::new();
let mut to_repair = BTreeMap::new(); let mut to_repair = BTreeSet::new();
for resp in resps { for resp in resps {
if let TableRpc::Update(entries) = resp { if let TableRpc::Update(entries) = resp {
for entry_bytes in entries.iter() { for entry_bytes in entries.iter() {
let entry = self.data.decode_entry(entry_bytes.as_slice())?; let entry = self.data.decode_entry(entry_bytes.as_slice())?;
let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) { match ret.get_mut(&entry_key) {
None => { Some(e) => {
ret.insert(entry_key, Some(entry)); if *e != entry {
} e.merge(&entry);
Some(Some(mut prev)) => { to_repair.insert(entry_key.clone());
let must_repair = prev != entry;
prev.merge(&entry);
if must_repair {
to_repair.insert(entry_key.clone(), Some(prev.clone()));
} }
ret.insert(entry_key, Some(prev));
} }
Some(None) => unreachable!(), None => {
ret.insert(entry_key, entry);
}
} }
} }
} else {
return Err(Error::unexpected_rpc_message(resp));
} }
} }
if !to_repair.is_empty() { if !to_repair.is_empty() {
let self2 = self.clone(); let self2 = self.clone();
let to_repair = to_repair
.into_iter()
.map(|k| ret.get(&k).unwrap().clone())
.collect::<Vec<_>>();
self.system.background.spawn_cancellable(async move { self.system.background.spawn_cancellable(async move {
for (_, v) in to_repair.iter_mut() { for v in to_repair {
self2.repair_on_read(&who[..], v.take().unwrap()).await?; self2.repair_on_read(&who[..], v).await?;
} }
Ok(()) Ok(())
}); });
} }
let ret_vec = ret
.iter_mut() // At this point, the `ret` btreemap might contain more than `limit`
.take(limit) // items, because nodes might have returned us each `limit` items
.map(|(_k, v)| v.take().unwrap()) // but for different keys. We have to take only the first `limit` items
.collect::<Vec<_>>(); // in this map, in the specified enumeration order, for two reasons:
// 1. To return to the user no more than the number of items that they requested
// 2. To return only items for which we have a read quorum: we do not know
// that we have a read quorum for the items after the first `limit`
// of them
let ret_vec = match enumeration_order {
EnumerationOrder::Forward => ret
.into_iter()
.take(limit)
.map(|(_k, v)| v)
.collect::<Vec<_>>(),
EnumerationOrder::Reverse => ret
.into_iter()
.rev()
.take(limit)
.map(|(_k, v)| v)
.collect::<Vec<_>>(),
};
Ok(ret_vec) Ok(ret_vec)
} }
@ -378,8 +420,20 @@ where
let value = self.data.read_entry(key, sort_key)?; let value = self.data.read_entry(key, sort_key)?;
Ok(TableRpc::ReadEntryResponse(value)) Ok(TableRpc::ReadEntryResponse(value))
} }
TableRpc::ReadRange(key, begin_sort_key, filter, limit) => { TableRpc::ReadRange {
let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; partition,
begin_sort_key,
filter,
limit,
enumeration_order,
} => {
let values = self.data.read_range(
partition,
begin_sort_key,
filter,
*limit,
*enumeration_order,
)?;
Ok(TableRpc::Update(values)) Ok(TableRpc::Update(values))
} }
TableRpc::Update(pairs) => { TableRpc::Update(pairs) => {

View file

@ -33,3 +33,19 @@ impl DeletedFilter {
} }
} }
} }
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum EnumerationOrder {
Forward,
Reverse,
}
impl EnumerationOrder {
pub fn from_reverse(reverse: bool) -> Self {
if reverse {
Self::Reverse
} else {
Self::Forward
}
}
}