K2V #293

Merged
lx merged 68 commits from k2v into main 2022-05-10 11:16:58 +00:00
3 changed files with 48 additions and 35 deletions
Showing only changes of commit ae0e6c6d27 - Show all commits

View file

@ -126,7 +126,8 @@ response = requests.post('http://localhost:3812/alex?search',
{"partitionKey": "root"}, {"partitionKey": "root"},
{"partitionKey": "root", "tombstones": true}, {"partitionKey": "root", "tombstones": true},
{"partitionKey": "root", "tombstones": true, "limit": 2}, {"partitionKey": "root", "tombstones": true, "limit": 2},
{"partitionKey": "root", "start": "c", "singleItem": true} {"partitionKey": "root", "start": "c", "singleItem": true},
{"partitionKey": "root", "start": "b", "end": "d", "tombstones": true}
] ]
''') ''')
print(response.headers) print(response.headers)

View file

@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_table::TableSchema;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::k2v::causality::*; use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*; use garage_model::k2v::item_table::*;
@ -55,8 +57,10 @@ pub async fn handle_read_batch(
serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
let resp_results = futures::future::join_all( let resp_results = futures::future::join_all(
queries.into_iter() queries
.map(|q| handle_read_batch_query(&garage, bucket_id, q))) .into_iter()
.map(|q| handle_read_batch_query(&garage, bucket_id, q)),
)
.await; .await;
let mut resps: Vec<ReadBatchResponse> = vec![]; let mut resps: Vec<ReadBatchResponse> = vec![];
@ -75,7 +79,7 @@ async fn handle_read_batch_query(
bucket_id: Uuid, bucket_id: Uuid,
query: ReadBatchQuery, query: ReadBatchQuery,
) -> Result<ReadBatchResponse, Error> { ) -> Result<ReadBatchResponse, Error> {
let partition = K2VItemPartition{ let partition = K2VItemPartition {
bucket_id, bucket_id,
partition_key: query.partition_key.clone(), partition_key: query.partition_key.clone(),
}; };
@ -86,15 +90,20 @@ async fn handle_read_batch_query(
}; };
let (items, more, next_start) = if query.single_item { let (items, more, next_start) = if query.single_item {
let sk = query.start.as_ref() if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() {
return Err(Error::BadRequest("Batch query parameters 'prefix', 'end' and 'limit' must not be set when singleItem is true.".into()));
}
let sk = query
.start
.as_ref()
.ok_or_bad_request("start should be specified if single_item is set")?; .ok_or_bad_request("start should be specified if single_item is set")?;
let item = garage let item = garage
.k2v_item_table .k2v_item_table
.get(&partition, sk) .get(&partition, sk)
.await?; .await?
.filter(|e| K2VItemTable::matches_filter(e, &filter));
match item { match item {
Some(i) => (vec![ReadBatchResponseItem::from(i)], Some(i) => (vec![ReadBatchResponseItem::from(i)], false, None),
false, None),
None => (vec![], false, None), None => (vec![], false, None),
} }
} else { } else {
@ -105,11 +114,13 @@ async fn handle_read_batch_query(
&query.start, &query.start,
&query.end, &query.end,
query.limit, query.limit,
Some(filter) Some(filter),
).await?; )
.await?;
let items = items.into_iter() let items = items
.map(|i| ReadBatchResponseItem::from(i)) .into_iter()
.map(ReadBatchResponseItem::from)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
(items, more, next_start) (items, more, next_start)
@ -140,7 +151,7 @@ struct InsertBatchItem {
#[derive(Deserialize)] #[derive(Deserialize)]
struct ReadBatchQuery { struct ReadBatchQuery {
#[serde(rename="partitionKey")] #[serde(rename = "partitionKey")]
partition_key: String, partition_key: String,
#[serde(default)] #[serde(default)]
prefix: Option<String>, prefix: Option<String>,
@ -150,9 +161,9 @@ struct ReadBatchQuery {
end: Option<String>, end: Option<String>,
#[serde(default)] #[serde(default)]
limit: Option<u64>, limit: Option<u64>,
#[serde(default,rename="singleItem")] #[serde(default, rename = "singleItem")]
single_item: bool, single_item: bool,
#[serde(default,rename="conflictsOnly")] #[serde(default, rename = "conflictsOnly")]
conflicts_only: bool, conflicts_only: bool,
#[serde(default)] #[serde(default)]
tombstones: bool, tombstones: bool,
@ -160,21 +171,21 @@ struct ReadBatchQuery {
#[derive(Serialize)] #[derive(Serialize)]
struct ReadBatchResponse { struct ReadBatchResponse {
#[serde(rename="partitionKey")] #[serde(rename = "partitionKey")]
partition_key: String, partition_key: String,
prefix: Option<String>, prefix: Option<String>,
start: Option<String>, start: Option<String>,
end: Option<String>, end: Option<String>,
limit: Option<u64>, limit: Option<u64>,
#[serde(rename="singleItem")] #[serde(rename = "singleItem")]
single_item: bool, single_item: bool,
#[serde(rename="conflictsOnly")] #[serde(rename = "conflictsOnly")]
conflicts_only: bool, conflicts_only: bool,
tombstones: bool, tombstones: bool,
items: Vec<ReadBatchResponseItem>, items: Vec<ReadBatchResponseItem>,
more: bool, more: bool,
#[serde(rename="nextStart")] #[serde(rename = "nextStart")]
next_start: Option<String>, next_start: Option<String>,
} }
@ -188,7 +199,8 @@ struct ReadBatchResponseItem {
impl ReadBatchResponseItem { impl ReadBatchResponseItem {
fn from(i: K2VItem) -> Self { fn from(i: K2VItem) -> Self {
let ct = i.causality_context().serialize(); let ct = i.causality_context().serialize();
let v = i.values() let v = i
.values()
.iter() .iter()
.map(|v| match v { .map(|v| match v {
DvvsValue::Value(x) => Some(base64::encode(x)), DvvsValue::Value(x) => Some(base64::encode(x)),

View file

@ -2,10 +2,10 @@ use garage_util::data::*;
use crate::index_counter::*; use crate::index_counter::*;
pub const ENTRIES: &'static str = "entries"; pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &'static str = "conflicts"; pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &'static str = "values"; pub const VALUES: &str = "values";
pub const BYTES: &'static str = "bytes"; pub const BYTES: &str = "bytes";
#[derive(PartialEq, Clone)] #[derive(PartialEq, Clone)]
pub struct K2VCounterTable; pub struct K2VCounterTable;