Implement PollRange API endpoint
continuous-integration/drone/pr Build is passing Details
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Alex 2023-01-10 15:22:25 +01:00
parent 57eabe7879
commit b83517d521
Signed by: lx
GPG Key ID: 0E496D15096376BE
6 changed files with 87 additions and 18 deletions

View File

@ -164,6 +164,9 @@ impl ApiHandler for K2VApiServer {
Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
Endpoint::PollRange { partition_key } => {
handle_poll_range(garage, bucket_id, &partition_key, req).await
}
Endpoint::Options => unreachable!(),
};

View File

@ -4,7 +4,6 @@ use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_table::{EnumerationOrder, TableSchema};
@ -65,10 +64,7 @@ pub async fn handle_read_batch(
resps.push(resp?);
}
let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(resp_json))?)
Ok(json_ok_response(&resps)?)
}
async fn handle_read_batch_query(
@ -160,10 +156,7 @@ pub async fn handle_delete_batch(
resps.push(resp?);
}
let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(resp_json))?)
Ok(json_ok_response(&resps)?)
}
async fn handle_delete_batch_query(
@ -257,6 +250,53 @@ async fn handle_delete_batch_query(
})
}
pub(crate) async fn handle_poll_range(
garage: Arc<Garage>,
bucket_id: Uuid,
partition_key: &str,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
use garage_model::k2v::sub::PollRange;
let query = parse_json_body::<PollRangeQuery>(req).await?;
let timeout_msec = query.timeout.unwrap_or(300).clamp(10, 600) * 1000;
let resp = garage
.k2v
.rpc
.poll_range(
PollRange {
partition: K2VItemPartition {
bucket_id,
partition_key: partition_key.to_string(),
},
start: query.start,
end: query.end,
prefix: query.prefix,
},
query.seen_marker,
timeout_msec,
)
.await?;
if let Some((items, seen_marker)) = resp {
let resp = PollRangeResponse {
items: items
.into_iter()
.map(|(_k, i)| ReadBatchResponseItem::from(i))
.collect::<Vec<_>>(),
seen_marker,
};
Ok(json_ok_response(&resp)?)
} else {
Ok(Response::builder()
.status(StatusCode::NOT_MODIFIED)
.body(Body::empty())?)
}
}
#[derive(Deserialize)]
struct InsertBatchItem {
pk: String,
@ -361,3 +401,24 @@ struct DeleteBatchResponse {
#[serde(rename = "deletedItems")]
deleted_items: usize,
}
#[derive(Deserialize)]
struct PollRangeQuery {
#[serde(default)]
prefix: Option<String>,
#[serde(default)]
start: Option<String>,
#[serde(default)]
end: Option<String>,
#[serde(default)]
timeout: Option<u64>,
#[serde(default, rename = "seenMarker")]
seen_marker: Option<String>,
}
#[derive(Serialize)]
struct PollRangeResponse {
items: Vec<ReadBatchResponseItem>,
#[serde(rename = "seenMarker")]
seen_marker: String,
}

View File

@ -1,10 +1,9 @@
use std::sync::Arc;
use hyper::{Body, Response, StatusCode};
use hyper::{Body, Response};
use serde::Serialize;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_rpc::ring::Ring;
use garage_table::util::*;
@ -12,6 +11,7 @@ use garage_table::util::*;
use garage_model::garage::Garage;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use crate::helpers::*;
use crate::k2v::error::*;
use crate::k2v::range::read_range;
@ -68,10 +68,7 @@ pub async fn handle_read_index(
next_start,
};
let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(resp_json))?)
Ok(json_ok_response(&resp)?)
}
#[derive(Serialize)]

View File

@ -208,6 +208,8 @@ pub async fn handle_poll_item(
let causal_context =
CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?;
let timeout_msec = timeout_secs.unwrap_or(300).clamp(10, 600) * 1000;
let item = garage
.k2v
.rpc
@ -216,7 +218,7 @@ pub async fn handle_poll_item(
partition_key,
sort_key,
causal_context,
timeout_secs.unwrap_or(300) * 1000,
timeout_msec,
)
.await?;

View File

@ -32,6 +32,9 @@ pub enum Endpoint {
causality_token: String,
timeout: Option<u64>,
},
PollRange {
partition_key: String,
},
ReadBatch {
},
ReadIndex {
@ -113,6 +116,7 @@ impl Endpoint {
@gen_parser
(query.keyword.take().unwrap_or_default(), partition_key, query, None),
key: [
POLL_RANGE => PollRange,
],
no_key: [
EMPTY => ReadBatch,
@ -142,6 +146,7 @@ impl Endpoint {
@gen_parser
(query.keyword.take().unwrap_or_default(), partition_key, query, None),
key: [
POLL_RANGE => PollRange,
],
no_key: [
EMPTY => InsertBatch,
@ -234,7 +239,8 @@ impl Endpoint {
generateQueryParameters! {
keywords: [
"delete" => DELETE,
"search" => SEARCH
"search" => SEARCH,
"poll_range" => POLL_RANGE
],
fields: [
"prefix" => prefix,

View File

@ -5,4 +5,4 @@ pub mod item_table;
pub mod rpc;
pub(crate) mod sub;
pub mod sub;