K2V #293

Merged
lx merged 68 commits from k2v into main 2022-05-10 11:16:58 +00:00
2 changed files with 76 additions and 0 deletions
Showing only changes of commit 7cdec31e10 - Show all commits

View file

@ -123,6 +123,14 @@ impl ApiHandler for K2VApiServer {
}; };
let resp = match endpoint { let resp = match endpoint {
Endpoint::DeleteItem {
partition_key,
sort_key,
} => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await,
Endpoint::InsertItem {
partition_key,
sort_key,
} => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await,
Endpoint::ReadItem { Endpoint::ReadItem {
partition_key, partition_key,
sort_key, sort_key,

View file

@ -8,6 +8,7 @@ use hyper::{Body, Request, Response, StatusCode};
use garage_util::data::*; use garage_util::data::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*; use garage_model::k2v::item_table::*;
use crate::error::*; use crate::error::*;
@ -119,3 +120,70 @@ pub async fn handle_read_item(
format.make_response(&item) format.make_response(&item)
} }
pub async fn handle_insert_item(
garage: Arc<Garage>,
req: Request<Body>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
) -> Result<Response<Body>, Error> {
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
.map(|s| s.to_str())
.transpose()?
.map(CausalContext::parse)
.transpose()?;
let body = hyper::body::to_bytes(req.into_body()).await?;
let value = DvvsValue::Value(body.to_vec());
garage
.k2v_rpc
.insert(
bucket_id,
partition_key.to_string(),
sort_key.to_string(),
causal_context,
value,
)
.await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}
pub async fn handle_delete_item(
garage: Arc<Garage>,
req: Request<Body>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
) -> Result<Response<Body>, Error> {
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
.map(|s| s.to_str())
.transpose()?
.map(CausalContext::parse)
.transpose()?;
let value = DvvsValue::Deleted;
garage
.k2v_rpc
.insert(
bucket_id,
partition_key.to_string(),
sort_key.to_string(),
causal_context,
value,
)
.await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}