Implement DeleteBatch
This commit is contained in:
parent
ae0e6c6d27
commit
7a876cf94d
3 changed files with 169 additions and 1 deletions
26
k2v_test.py
26
k2v_test.py
|
@ -98,7 +98,8 @@ response = requests.post('http://localhost:3812/alex',
|
|||
data='''
|
||||
[
|
||||
{"pk": "root", "sk": "a", "ct": null, "v": "aW5pdGlhbCB0ZXN0Cg=="},
|
||||
{"pk": "root", "sk": "b", "ct": null, "v": "aW5pdGlhbCB0ZXN1Cg=="}
|
||||
{"pk": "root", "sk": "b", "ct": null, "v": "aW5pdGlhbCB0ZXN1Cg=="},
|
||||
{"pk": "root", "sk": "c", "ct": null, "v": "aW5pdGlhbCB0ZXN2Cg=="}
|
||||
]
|
||||
''')
|
||||
print(response.headers)
|
||||
|
@ -132,3 +133,26 @@ response = requests.post('http://localhost:3812/alex?search',
|
|||
''')
|
||||
print(response.headers)
|
||||
print(response.text)
|
||||
|
||||
|
||||
print("-- DeleteBatch")
|
||||
response = requests.post('http://localhost:3812/alex?delete',
|
||||
auth=auth,
|
||||
data='''
|
||||
[
|
||||
{"partitionKey": "root", "start": "b", "end": "c"}
|
||||
]
|
||||
''')
|
||||
print(response.headers)
|
||||
print(response.text)
|
||||
|
||||
print("-- ReadBatch")
|
||||
response = requests.post('http://localhost:3812/alex?search',
|
||||
auth=auth,
|
||||
data='''
|
||||
[
|
||||
{"partitionKey": "root"}
|
||||
]
|
||||
''')
|
||||
print(response.headers)
|
||||
print(response.text)
|
||||
|
|
|
@ -150,6 +150,7 @@ impl ApiHandler for K2VApiServer {
|
|||
} => handle_read_index(garage, bucket_id, prefix, start, end, limit).await,
|
||||
Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
|
||||
Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
|
||||
Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
|
||||
//TODO
|
||||
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
|
||||
};
|
||||
|
|
|
@ -141,6 +141,121 @@ async fn handle_read_batch_query(
|
|||
})
|
||||
}
|
||||
|
||||
pub async fn handle_delete_batch(
|
||||
garage: Arc<Garage>,
|
||||
bucket_id: Uuid,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||
let queries: Vec<DeleteBatchQuery> =
|
||||
serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
|
||||
|
||||
let resp_results = futures::future::join_all(
|
||||
queries
|
||||
.into_iter()
|
||||
.map(|q| handle_delete_batch_query(&garage, bucket_id, q)),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut resps: Vec<DeleteBatchResponse> = vec![];
|
||||
for resp in resp_results {
|
||||
resps.push(resp?);
|
||||
}
|
||||
|
||||
let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.body(Body::from(resp_json))?)
|
||||
}
|
||||
|
||||
async fn handle_delete_batch_query(
|
||||
garage: &Arc<Garage>,
|
||||
bucket_id: Uuid,
|
||||
query: DeleteBatchQuery,
|
||||
) -> Result<DeleteBatchResponse, Error> {
|
||||
let partition = K2VItemPartition {
|
||||
bucket_id,
|
||||
partition_key: query.partition_key.clone(),
|
||||
};
|
||||
|
||||
let filter = ItemFilter {
|
||||
exclude_only_tombstones: true,
|
||||
conflicts_only: false,
|
||||
};
|
||||
|
||||
let deleted_items = if query.single_item {
|
||||
if query.prefix.is_some() || query.end.is_some() {
|
||||
return Err(Error::BadRequest("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true.".into()));
|
||||
}
|
||||
let sk = query
|
||||
.start
|
||||
.as_ref()
|
||||
.ok_or_bad_request("start should be specified if single_item is set")?;
|
||||
let item = garage
|
||||
.k2v_item_table
|
||||
.get(&partition, sk)
|
||||
.await?
|
||||
.filter(|e| K2VItemTable::matches_filter(e, &filter));
|
||||
match item {
|
||||
Some(i) => {
|
||||
let cc = i.causality_context();
|
||||
garage
|
||||
.k2v_rpc
|
||||
.insert(
|
||||
bucket_id,
|
||||
i.partition.partition_key,
|
||||
i.sort_key,
|
||||
Some(cc),
|
||||
DvvsValue::Deleted,
|
||||
)
|
||||
.await?;
|
||||
1
|
||||
}
|
||||
None => 0,
|
||||
}
|
||||
} else {
|
||||
let (items, more, _next_start) = read_range(
|
||||
&garage.k2v_item_table,
|
||||
&partition,
|
||||
&query.prefix,
|
||||
&query.start,
|
||||
&query.end,
|
||||
None,
|
||||
Some(filter),
|
||||
)
|
||||
.await?;
|
||||
assert!(!more);
|
||||
|
||||
// TODO delete items
|
||||
let items = items
|
||||
.into_iter()
|
||||
.map(|i| {
|
||||
let cc = i.causality_context();
|
||||
(
|
||||
i.partition.partition_key,
|
||||
i.sort_key,
|
||||
Some(cc),
|
||||
DvvsValue::Deleted,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let n = items.len();
|
||||
|
||||
garage.k2v_rpc.insert_batch(bucket_id, items).await?;
|
||||
|
||||
n
|
||||
};
|
||||
|
||||
Ok(DeleteBatchResponse {
|
||||
partition_key: query.partition_key,
|
||||
prefix: query.prefix,
|
||||
start: query.start,
|
||||
end: query.end,
|
||||
single_item: query.single_item,
|
||||
deleted_items,
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct InsertBatchItem {
|
||||
pk: String,
|
||||
|
@ -214,3 +329,31 @@ impl ReadBatchResponseItem {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct DeleteBatchQuery {
|
||||
#[serde(rename = "partitionKey")]
|
||||
partition_key: String,
|
||||
#[serde(default)]
|
||||
prefix: Option<String>,
|
||||
#[serde(default)]
|
||||
start: Option<String>,
|
||||
#[serde(default)]
|
||||
end: Option<String>,
|
||||
#[serde(default, rename = "singleItem")]
|
||||
single_item: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct DeleteBatchResponse {
|
||||
#[serde(rename = "partitionKey")]
|
||||
partition_key: String,
|
||||
prefix: Option<String>,
|
||||
start: Option<String>,
|
||||
end: Option<String>,
|
||||
#[serde(rename = "singleItem")]
|
||||
single_item: bool,
|
||||
|
||||
#[serde(rename = "deletedItems")]
|
||||
deleted_items: usize,
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue