Implement ReadBatch
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing

This commit is contained in:
Alex 2022-04-22 18:00:11 +02:00
parent 99e7c3396c
commit 140994c830
Signed by: lx
GPG key ID: 0E496D15096376BE
4 changed files with 183 additions and 4 deletions

View file

@ -100,7 +100,7 @@ response = requests.post('http://localhost:3812/alex',
{"pk": "root", "sk": "a", "ct": null, "v": "aW5pdGlhbCB0ZXN0Cg=="}, {"pk": "root", "sk": "a", "ct": null, "v": "aW5pdGlhbCB0ZXN0Cg=="},
{"pk": "root", "sk": "b", "ct": null, "v": "aW5pdGlhbCB0ZXN1Cg=="} {"pk": "root", "sk": "b", "ct": null, "v": "aW5pdGlhbCB0ZXN1Cg=="}
] ]
'''); ''')
print(response.headers) print(response.headers)
print(response.text) print(response.text)
@ -117,3 +117,17 @@ for sk in sort_keys:
print(response.headers) print(response.headers)
print(response.text) print(response.text)
ct = response.headers["x-garage-causality-token"] ct = response.headers["x-garage-causality-token"]
print("-- ReadBatch")
response = requests.post('http://localhost:3812/alex?search',
auth=auth,
data='''
[
{"partitionKey": "root"},
{"partitionKey": "root", "tombstones": true},
{"partitionKey": "root", "tombstones": true, "limit": 2},
{"partitionKey": "root", "start": "c", "singleItem": true}
]
''')
print(response.headers)
print(response.text)

View file

@ -149,6 +149,7 @@ impl ApiHandler for K2VApiServer {
limit, limit,
} => handle_read_index(garage, bucket_id, prefix, start, end, limit).await, } => handle_read_index(garage, bucket_id, prefix, start, end, limit).await,
Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
//TODO //TODO
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
}; };

View file

@ -4,8 +4,7 @@ use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_table::util::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::k2v::causality::*; use garage_model::k2v::causality::*;
@ -46,6 +45,91 @@ pub async fn handle_insert_batch(
.body(Body::empty())?) .body(Body::empty())?)
} }
pub async fn handle_read_batch(
garage: Arc<Garage>,
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
let queries: Vec<ReadBatchQuery> =
serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
let resp_results = futures::future::join_all(
queries.into_iter()
.map(|q| handle_read_batch_query(&garage, bucket_id, q)))
.await;
let mut resps: Vec<ReadBatchResponse> = vec![];
for resp in resp_results {
resps.push(resp?);
}
let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(resp_json))?)
}
async fn handle_read_batch_query(
garage: &Arc<Garage>,
bucket_id: Uuid,
query: ReadBatchQuery,
) -> Result<ReadBatchResponse, Error> {
let partition = K2VItemPartition{
bucket_id,
partition_key: query.partition_key.clone(),
};
let filter = ItemFilter {
exclude_only_tombstones: !query.tombstones,
conflicts_only: query.conflicts_only,
};
let (items, more, next_start) = if query.single_item {
let sk = query.start.as_ref()
.ok_or_bad_request("start should be specified if single_item is set")?;
let item = garage
.k2v_item_table
.get(&partition, sk)
.await?;
match item {
Some(i) => (vec![ReadBatchResponseItem::from(i)],
false, None),
None => (vec![], false, None),
}
} else {
let (items, more, next_start) = read_range(
&garage.k2v_item_table,
&partition,
&query.prefix,
&query.start,
&query.end,
query.limit,
Some(filter)
).await?;
let items = items.into_iter()
.map(|i| ReadBatchResponseItem::from(i))
.collect::<Vec<_>>();
(items, more, next_start)
};
Ok(ReadBatchResponse {
partition_key: query.partition_key,
prefix: query.prefix,
start: query.start,
end: query.end,
limit: query.limit,
single_item: query.single_item,
conflicts_only: query.conflicts_only,
tombstones: query.tombstones,
items,
more,
next_start,
})
}
#[derive(Deserialize)] #[derive(Deserialize)]
struct InsertBatchItem { struct InsertBatchItem {
pk: String, pk: String,
@ -53,3 +137,68 @@ struct InsertBatchItem {
ct: Option<String>, ct: Option<String>,
v: Option<String>, v: Option<String>,
} }
#[derive(Deserialize)]
struct ReadBatchQuery {
#[serde(rename="partitionKey")]
partition_key: String,
#[serde(default)]
prefix: Option<String>,
#[serde(default)]
start: Option<String>,
#[serde(default)]
end: Option<String>,
#[serde(default)]
limit: Option<u64>,
#[serde(default,rename="singleItem")]
single_item: bool,
#[serde(default,rename="conflictsOnly")]
conflicts_only: bool,
#[serde(default)]
tombstones: bool,
}
#[derive(Serialize)]
struct ReadBatchResponse {
#[serde(rename="partitionKey")]
partition_key: String,
prefix: Option<String>,
start: Option<String>,
end: Option<String>,
limit: Option<u64>,
#[serde(rename="singleItem")]
single_item: bool,
#[serde(rename="conflictsOnly")]
conflicts_only: bool,
tombstones: bool,
items: Vec<ReadBatchResponseItem>,
more: bool,
#[serde(rename="nextStart")]
next_start: Option<String>,
}
#[derive(Serialize)]
struct ReadBatchResponseItem {
sk: String,
ct: String,
v: Vec<Option<String>>,
}
impl ReadBatchResponseItem {
fn from(i: K2VItem) -> Self {
let ct = i.causality_context().serialize();
let v = i.values()
.iter()
.map(|v| match v {
DvvsValue::Value(x) => Some(base64::encode(x)),
DvvsValue::Deleted => None,
})
.collect::<Vec<_>>();
Self {
sk: i.sort_key,
ct,
v,
}
}
}

View file

@ -73,12 +73,14 @@ impl Endpoint {
let mut query = QueryParameters::from_query(query.unwrap_or_default())?; let mut query = QueryParameters::from_query(query.unwrap_or_default())?;
let method_search = Method::from_bytes(b"SEARCH").unwrap();
let res = match *req.method() { let res = match *req.method() {
Method::GET => Self::from_get(partition_key, &mut query)?, Method::GET => Self::from_get(partition_key, &mut query)?,
//Method::HEAD => Self::from_head(partition_key, &mut query)?, //&Method::HEAD => Self::from_head(partition_key, &mut query)?,
Method::POST => Self::from_post(partition_key, &mut query)?, Method::POST => Self::from_post(partition_key, &mut query)?,
Method::PUT => Self::from_put(partition_key, &mut query)?, Method::PUT => Self::from_put(partition_key, &mut query)?,
Method::DELETE => Self::from_delete(partition_key, &mut query)?, Method::DELETE => Self::from_delete(partition_key, &mut query)?,
_ if req.method() == method_search => Self::from_search(partition_key, &mut query)?,
_ => return Err(Error::BadRequest("Unknown method".to_owned())), _ => return Err(Error::BadRequest("Unknown method".to_owned())),
}; };
@ -103,6 +105,19 @@ impl Endpoint {
} }
} }
/// Determine which endpoint a request is for, knowing it is a SEARCH.
fn from_search(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
key: [
],
no_key: [
EMPTY => ReadBatch,
]
}
}
/* /*
/// Determine which endpoint a request is for, knowing it is a HEAD. /// Determine which endpoint a request is for, knowing it is a HEAD.
fn from_head(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { fn from_head(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {