Test reverse and actually implement it correctly
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing

This commit is contained in:
Alex 2022-04-28 11:28:16 +02:00
parent 226439f2da
commit 7362618b97
Signed by: lx
GPG key ID: 0E496D15096376BE
6 changed files with 173 additions and 84 deletions

View file

@ -137,6 +137,32 @@ pub fn parse_bucket_key<'a>(
Ok((bucket, key))
}
const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}';
/// Compute the key after the prefix
pub fn key_after_prefix(pfx: &str) -> Option<String> {
let mut next = pfx.to_string();
while !next.is_empty() {
let tail = next.pop().unwrap();
if tail >= char::MAX {
continue;
}
// Circumvent a limitation of RangeFrom that overflow earlier than needed
// See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html
let new_tail = if tail == UTF8_BEFORE_LAST_CHAR {
char::MAX
} else {
(tail..).nth(1).unwrap()
};
next.push(new_tail);
return Some(next);
}
None
}
#[cfg(test)]
mod tests {
use super::*;
@ -236,4 +262,37 @@ mod tests {
assert_eq!(host_to_bucket("not-garage.tld", "garage.tld"), None);
assert_eq!(host_to_bucket("not-garage.tld", ".garage.tld"), None);
}
#[test]
fn test_key_after_prefix() {
assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1);
assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0");
assert_eq!(key_after_prefix("").unwrap().as_str(), "");
assert_eq!(
key_after_prefix("􏿽").unwrap().as_str(),
String::from(char::from_u32(0x10FFFE).unwrap())
);
// When the last character is the biggest UTF8 char
let a = String::from_iter(['a', char::MAX].iter());
assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b");
// When all characters are the biggest UTF8 char
let b = String::from_iter([char::MAX; 3].iter());
assert!(key_after_prefix(b.as_str()).is_none());
// Check utf8 surrogates
let c = String::from('\u{D7FF}');
assert_eq!(
key_after_prefix(c.as_str()).unwrap().as_str(),
String::from('\u{E000}')
);
// Check the character before the biggest one
let d = String::from('\u{10FFFE}');
assert_eq!(
key_after_prefix(d.as_str()).unwrap().as_str(),
String::from(char::MAX)
);
}
}

View file

@ -90,8 +90,8 @@ async fn handle_read_batch_query(
};
let (items, more, next_start) = if query.single_item {
if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() {
return Err(Error::BadRequest("Batch query parameters 'prefix', 'end' and 'limit' must not be set when singleItem is true.".into()));
if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() || query.reverse {
return Err(Error::BadRequest("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true.".into()));
}
let sk = query
.start

View file

@ -8,6 +8,7 @@ use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::error::*;
use crate::helpers::key_after_prefix;
/// Read range in a Garage table.
/// Returns (entries, more?, nextStart)
@ -25,10 +26,9 @@ pub(crate) async fn read_range<F>(
where
F: TableSchema<S = String> + 'static,
{
let mut start = match (prefix, start) {
(None, None) => "".to_string(),
(Some(p), None) => p.clone(),
(None, Some(s)) => s.clone(),
let (mut start, mut start_ignore) = match (prefix, start) {
(None, None) => (None, false),
(None, Some(s)) => (Some(s.clone()), false),
(Some(p), Some(s)) => {
if !s.starts_with(p) {
return Err(Error::BadRequest(format!(
@ -36,10 +36,15 @@ where
s, p
)));
}
s.clone()
(Some(s.clone()), false)
}
(Some(p), None) if enumeration_order == EnumerationOrder::Reverse => {
let start = key_after_prefix(p)
.ok_or_internal_error("Sorry, can't list this prefix in reverse order")?;
(Some(start), true)
}
(Some(p), None) => (Some(p.clone()), false),
};
let mut start_ignore = false;
let mut entries = vec![];
loop {
@ -50,7 +55,7 @@ where
let get_ret = table
.get_range(
partition_key,
Some(start.clone()),
start.clone(),
filter.clone(),
n_get,
enumeration_order,
@ -60,6 +65,9 @@ where
let get_ret_len = get_ret.len();
for entry in get_ret {
if start_ignore && Some(entry.sort_key()) == start.as_ref() {
continue;
}
if let Some(p) = prefix {
if !entry.sort_key().starts_with(p) {
return Ok((entries, false, None));
@ -75,9 +83,6 @@ where
return Ok((entries, true, Some(entry.sort_key().clone())));
}
}
if start_ignore && entry.sort_key() == &start {
continue;
}
entries.push(entry);
}
@ -85,7 +90,7 @@ where
return Ok((entries, false, None));
}
start = entries.last().unwrap().sort_key().clone();
start = Some(entries.last().unwrap().sort_key().clone());
start_ignore = true;
}
}

View file

@ -17,6 +17,7 @@ use garage_table::{EmptyKey, EnumerationOrder};
use crate::encoding::*;
use crate::error::*;
use crate::helpers::key_after_prefix;
use crate::s3::put as s3_put;
use crate::s3::xml as s3_xml;
@ -935,32 +936,6 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
}
}
const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}';
/// Compute the key after the prefix
fn key_after_prefix(pfx: &str) -> Option<String> {
let mut next = pfx.to_string();
while !next.is_empty() {
let tail = next.pop().unwrap();
if tail >= char::MAX {
continue;
}
// Circumvent a limitation of RangeFrom that overflow earlier than needed
// See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html
let new_tail = if tail == UTF8_BEFORE_LAST_CHAR {
char::MAX
} else {
(tail..).nth(1).unwrap()
};
next.push(new_tail);
return Some(next);
}
None
}
/*
* Unit tests of this module
*/
@ -1014,39 +989,6 @@ mod tests {
}
}
#[test]
fn test_key_after_prefix() {
assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1);
assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0");
assert_eq!(key_after_prefix("").unwrap().as_str(), "");
assert_eq!(
key_after_prefix("􏿽").unwrap().as_str(),
String::from(char::from_u32(0x10FFFE).unwrap())
);
// When the last character is the biggest UTF8 char
let a = String::from_iter(['a', char::MAX].iter());
assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b");
// When all characters are the biggest UTF8 char
let b = String::from_iter([char::MAX; 3].iter());
assert!(key_after_prefix(b.as_str()).is_none());
// Check utf8 surrogates
let c = String::from('\u{D7FF}');
assert_eq!(
key_after_prefix(c.as_str()).unwrap().as_str(),
String::from('\u{E000}')
);
// Check the character before the biggest one
let d = String::from('\u{10FFFE}');
assert_eq!(
key_after_prefix(d.as_str()).unwrap().as_str(),
String::from(char::MAX)
);
}
#[test]
fn test_common_prefixes() {
let mut query = query();

View file

@ -92,6 +92,7 @@ async fn test_batch() {
br#"[
{"partitionKey": "root"},
{"partitionKey": "root", "start": "c"},
{"partitionKey": "root", "start": "c", "reverse": true, "end": "a"},
{"partitionKey": "root", "limit": 1},
{"partitionKey": "root", "prefix": "d"}
]"#
@ -146,6 +147,23 @@ async fn test_batch() {
"more": false,
"nextStart": null,
},
{
"partitionKey": "root",
"prefix": null,
"start": "c",
"end": "a",
"limit": null,
"reverse": true,
"conflictsOnly": false,
"tombstones": false,
"singleItem": false,
"items": [
{"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
{"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
],
"more": false,
"nextStart": null,
},
{
"partitionKey": "root",
"prefix": null,
@ -252,6 +270,8 @@ async fn test_batch() {
{"partitionKey": "root", "prefix": "d.", "end": "d.2"},
{"partitionKey": "root", "prefix": "d.", "limit": 1},
{"partitionKey": "root", "prefix": "d.", "start": "d.2", "limit": 1},
{"partitionKey": "root", "prefix": "d.", "reverse": true},
{"partitionKey": "root", "prefix": "d.", "start": "d.2", "reverse": true},
{"partitionKey": "root", "prefix": "d.", "limit": 2}
]"#
.to_vec(),
@ -350,6 +370,40 @@ async fn test_batch() {
"more": false,
"nextStart": null,
},
{
"partitionKey": "root",
"prefix": "d.",
"start": null,
"end": null,
"limit": null,
"reverse": true,
"conflictsOnly": false,
"tombstones": false,
"singleItem": false,
"items": [
{"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
{"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
],
"more": false,
"nextStart": null,
},
{
"partitionKey": "root",
"prefix": "d.",
"start": "d.2",
"end": null,
"limit": null,
"reverse": true,
"conflictsOnly": false,
"tombstones": false,
"singleItem": false,
"items": [
{"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
{"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
],
"more": false,
"nextStart": null,
},
{
"partitionKey": "root",
"prefix": "d.",
@ -418,7 +472,8 @@ async fn test_batch() {
.query_param("search", Option::<&str>::None)
.body(
br#"[
{"partitionKey": "root"}
{"partitionKey": "root"},
{"partitionKey": "root", "reverse": true}
]"#
.to_vec(),
)
@ -448,6 +503,23 @@ async fn test_batch() {
"more": false,
"nextStart": null,
},
{
"partitionKey": "root",
"prefix": null,
"start": null,
"end": null,
"limit": null,
"reverse": true,
"conflictsOnly": false,
"tombstones": false,
"singleItem": false,
"items": [
{"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
{"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
],
"more": false,
"nextStart": null,
},
])
);
}

View file

@ -1,4 +1,5 @@
use core::borrow::Borrow;
use std::convert::TryInto;
use std::sync::Arc;
use serde_bytes::ByteBuf;
@ -84,26 +85,36 @@ where
pub fn read_range(
&self,
p: &F::P,
s: &Option<F::S>,
partition_key: &F::P,
start: &Option<F::S>,
filter: &Option<F::Filter>,
limit: usize,
enumeration_order: EnumerationOrder,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
let partition_hash = p.hash();
let first_key = match s {
None => partition_hash.to_vec(),
Some(sk) => self.tree_key(p, sk),
};
let partition_hash = partition_key.hash();
match enumeration_order {
EnumerationOrder::Forward => {
let first_key = match start {
None => partition_hash.to_vec(),
Some(sk) => self.tree_key(partition_key, sk),
};
let range = self.store.range(first_key..);
self.read_range_aux(partition_hash, range, filter, limit)
}
EnumerationOrder::Reverse => {
let range = self.store.range(..=first_key).rev();
self.read_range_aux(partition_hash, range, filter, limit)
}
EnumerationOrder::Reverse => match start {
Some(sk) => {
let last_key = self.tree_key(partition_key, sk);
let range = self.store.range(..=last_key).rev();
self.read_range_aux(partition_hash, range, filter, limit)
}
None => {
let mut last_key = partition_hash.to_vec();
let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap());
last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1));
let range = self.store.range(..last_key).rev();
self.read_range_aux(partition_hash, range, filter, limit)
}
},
}
}