From 7a876cf94dacfc5ab789a8844b8e40f87bd92fac Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 26 Apr 2022 13:37:25 +0200 Subject: [PATCH] Implement DeleteBatch --- k2v_test.py | 26 ++++++- src/api/k2v/api_server.rs | 1 + src/api/k2v/batch.rs | 143 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 169 insertions(+), 1 deletion(-) diff --git a/k2v_test.py b/k2v_test.py index c3663a72..3219056e 100755 --- a/k2v_test.py +++ b/k2v_test.py @@ -98,7 +98,8 @@ response = requests.post('http://localhost:3812/alex', data=''' [ {"pk": "root", "sk": "a", "ct": null, "v": "aW5pdGlhbCB0ZXN0Cg=="}, - {"pk": "root", "sk": "b", "ct": null, "v": "aW5pdGlhbCB0ZXN1Cg=="} + {"pk": "root", "sk": "b", "ct": null, "v": "aW5pdGlhbCB0ZXN1Cg=="}, + {"pk": "root", "sk": "c", "ct": null, "v": "aW5pdGlhbCB0ZXN2Cg=="} ] ''') print(response.headers) @@ -132,3 +133,26 @@ response = requests.post('http://localhost:3812/alex?search', ''') print(response.headers) print(response.text) + + +print("-- DeleteBatch") +response = requests.post('http://localhost:3812/alex?delete', + auth=auth, + data=''' +[ + {"partitionKey": "root", "start": "b", "end": "c"} +] +''') +print(response.headers) +print(response.text) + +print("-- ReadBatch") +response = requests.post('http://localhost:3812/alex?search', + auth=auth, + data=''' +[ + {"partitionKey": "root"} +] +''') +print(response.headers) +print(response.text) diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 04d54e56..edfd9da8 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -150,6 +150,7 @@ impl ApiHandler for K2VApiServer { } => handle_read_index(garage, bucket_id, prefix, start, end, limit).await, Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, + Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await, //TODO endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 207bcb91..c27fdb6c 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -141,6 +141,121 @@ async fn handle_read_batch_query( }) } +pub async fn handle_delete_batch( + garage: Arc, + bucket_id: Uuid, + req: Request, +) -> Result, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + let queries: Vec = + serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + + let resp_results = futures::future::join_all( + queries + .into_iter() + .map(|q| handle_delete_batch_query(&garage, bucket_id, q)), + ) + .await; + + let mut resps: Vec = vec![]; + for resp in resp_results { + resps.push(resp?); + } + + let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +async fn handle_delete_batch_query( + garage: &Arc, + bucket_id: Uuid, + query: DeleteBatchQuery, +) -> Result { + let partition = K2VItemPartition { + bucket_id, + partition_key: query.partition_key.clone(), + }; + + let filter = ItemFilter { + exclude_only_tombstones: true, + conflicts_only: false, + }; + + let deleted_items = if query.single_item { + if query.prefix.is_some() || query.end.is_some() { + return Err(Error::BadRequest("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true.".into())); + } + let sk = query + .start + .as_ref() + .ok_or_bad_request("start should be specified if single_item is set")?; + let item = garage + .k2v_item_table + .get(&partition, sk) + .await? + .filter(|e| K2VItemTable::matches_filter(e, &filter)); + match item { + Some(i) => { + let cc = i.causality_context(); + garage + .k2v_rpc + .insert( + bucket_id, + i.partition.partition_key, + i.sort_key, + Some(cc), + DvvsValue::Deleted, + ) + .await?; + 1 + } + None => 0, + } + } else { + let (items, more, _next_start) = read_range( + &garage.k2v_item_table, + &partition, + &query.prefix, + &query.start, + &query.end, + None, + Some(filter), + ) + .await?; + assert!(!more); + + // TODO delete items + let items = items + .into_iter() + .map(|i| { + let cc = i.causality_context(); + ( + i.partition.partition_key, + i.sort_key, + Some(cc), + DvvsValue::Deleted, + ) + }) + .collect::>(); + let n = items.len(); + + garage.k2v_rpc.insert_batch(bucket_id, items).await?; + + n + }; + + Ok(DeleteBatchResponse { + partition_key: query.partition_key, + prefix: query.prefix, + start: query.start, + end: query.end, + single_item: query.single_item, + deleted_items, + }) +} + #[derive(Deserialize)] struct InsertBatchItem { pk: String, @@ -214,3 +329,31 @@ impl ReadBatchResponseItem { } } } + +#[derive(Deserialize)] +struct DeleteBatchQuery { + #[serde(rename = "partitionKey")] + partition_key: String, + #[serde(default)] + prefix: Option, + #[serde(default)] + start: Option, + #[serde(default)] + end: Option, + #[serde(default, rename = "singleItem")] + single_item: bool, +} + +#[derive(Serialize)] +struct DeleteBatchResponse { + #[serde(rename = "partitionKey")] + partition_key: String, + prefix: Option, + start: Option, + end: Option, + #[serde(rename = "singleItem")] + single_item: bool, + + #[serde(rename = "deletedItems")] + deleted_items: usize, +}