diff --git a/k2v_test.py b/k2v_test.py index eecffbc3..5fa91efd 100755 --- a/k2v_test.py +++ b/k2v_test.py @@ -100,7 +100,7 @@ response = requests.post('http://localhost:3812/alex', {"pk": "root", "sk": "a", "ct": null, "v": "aW5pdGlhbCB0ZXN0Cg=="}, {"pk": "root", "sk": "b", "ct": null, "v": "aW5pdGlhbCB0ZXN1Cg=="} ] -'''); +''') print(response.headers) print(response.text) @@ -117,3 +117,17 @@ for sk in sort_keys: print(response.headers) print(response.text) ct = response.headers["x-garage-causality-token"] + +print("-- ReadBatch") +response = requests.post('http://localhost:3812/alex?search', + auth=auth, + data=''' +[ + {"partitionKey": "root"}, + {"partitionKey": "root", "tombstones": true}, + {"partitionKey": "root", "tombstones": true, "limit": 2}, + {"partitionKey": "root", "start": "c", "singleItem": true} +] +''') +print(response.headers) +print(response.text) diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index dfe66d0b..04d54e56 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -149,6 +149,7 @@ impl ApiHandler for K2VApiServer { limit, } => handle_read_index(garage, bucket_id, prefix, start, end, limit).await, Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, + Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, //TODO endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 7568f0c9..d17756ca 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -4,8 +4,7 @@ use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::data::*; - -use garage_table::util::*; +use garage_util::error::Error as GarageError; use garage_model::garage::Garage; use garage_model::k2v::causality::*; @@ -46,6 +45,91 @@ pub async fn handle_insert_batch( .body(Body::empty())?) } +pub async fn handle_read_batch( + garage: Arc, + bucket_id: Uuid, + req: Request, +) -> Result, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + let queries: Vec = + serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + + let resp_results = futures::future::join_all( + queries.into_iter() + .map(|q| handle_read_batch_query(&garage, bucket_id, q))) + .await; + + let mut resps: Vec = vec![]; + for resp in resp_results { + resps.push(resp?); + } + + let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +async fn handle_read_batch_query( + garage: &Arc, + bucket_id: Uuid, + query: ReadBatchQuery, +) -> Result { + let partition = K2VItemPartition{ + bucket_id, + partition_key: query.partition_key.clone(), + }; + + let filter = ItemFilter { + exclude_only_tombstones: !query.tombstones, + conflicts_only: query.conflicts_only, + }; + + let (items, more, next_start) = if query.single_item { + let sk = query.start.as_ref() + .ok_or_bad_request("start should be specified if single_item is set")?; + let item = garage + .k2v_item_table + .get(&partition, sk) + .await?; + match item { + Some(i) => (vec![ReadBatchResponseItem::from(i)], + false, None), + None => (vec![], false, None), + } + } else { + let (items, more, next_start) = read_range( + &garage.k2v_item_table, + &partition, + &query.prefix, + &query.start, + &query.end, + query.limit, + Some(filter) + ).await?; + + let items = items.into_iter() + .map(|i| ReadBatchResponseItem::from(i)) + .collect::>(); + + (items, more, next_start) + }; + + Ok(ReadBatchResponse { + partition_key: query.partition_key, + prefix: query.prefix, + start: query.start, + end: query.end, + limit: query.limit, + single_item: query.single_item, + conflicts_only: query.conflicts_only, + tombstones: query.tombstones, + items, + more, + next_start, + }) +} + #[derive(Deserialize)] struct InsertBatchItem { pk: String, @@ -53,3 +137,68 @@ struct InsertBatchItem { ct: Option, v: Option, } + +#[derive(Deserialize)] +struct ReadBatchQuery { + #[serde(rename="partitionKey")] + partition_key: String, + #[serde(default)] + prefix: Option, + #[serde(default)] + start: Option, + #[serde(default)] + end: Option, + #[serde(default)] + limit: Option, + #[serde(default,rename="singleItem")] + single_item: bool, + #[serde(default,rename="conflictsOnly")] + conflicts_only: bool, + #[serde(default)] + tombstones: bool, +} + +#[derive(Serialize)] +struct ReadBatchResponse { + #[serde(rename="partitionKey")] + partition_key: String, + prefix: Option, + start: Option, + end: Option, + limit: Option, + #[serde(rename="singleItem")] + single_item: bool, + #[serde(rename="conflictsOnly")] + conflicts_only: bool, + tombstones: bool, + + items: Vec, + more: bool, + #[serde(rename="nextStart")] + next_start: Option, +} + +#[derive(Serialize)] +struct ReadBatchResponseItem { + sk: String, + ct: String, + v: Vec>, +} + +impl ReadBatchResponseItem { + fn from(i: K2VItem) -> Self { + let ct = i.causality_context().serialize(); + let v = i.values() + .iter() + .map(|v| match v { + DvvsValue::Value(x) => Some(base64::encode(x)), + DvvsValue::Deleted => None, + }) + .collect::>(); + Self { + sk: i.sort_key, + ct, + v, + } + } +} diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs index 56e77df9..f545fab7 100644 --- a/src/api/k2v/router.rs +++ b/src/api/k2v/router.rs @@ -73,12 +73,14 @@ impl Endpoint { let mut query = QueryParameters::from_query(query.unwrap_or_default())?; + let method_search = Method::from_bytes(b"SEARCH").unwrap(); let res = match *req.method() { Method::GET => Self::from_get(partition_key, &mut query)?, - //Method::HEAD => Self::from_head(partition_key, &mut query)?, + //&Method::HEAD => Self::from_head(partition_key, &mut query)?, Method::POST => Self::from_post(partition_key, &mut query)?, Method::PUT => Self::from_put(partition_key, &mut query)?, Method::DELETE => Self::from_delete(partition_key, &mut query)?, + _ if req.method() == method_search => Self::from_search(partition_key, &mut query)?, _ => return Err(Error::BadRequest("Unknown method".to_owned())), }; @@ -103,6 +105,19 @@ impl Endpoint { } } + /// Determine which endpoint a request is for, knowing it is a SEARCH. + fn from_search(partition_key: String, query: &mut QueryParameters<'_>) -> Result { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + ], + no_key: [ + EMPTY => ReadBatch, + ] + } + } + /* /// Determine which endpoint a request is for, knowing it is a HEAD. fn from_head(partition_key: String, query: &mut QueryParameters<'_>) -> Result {