K2V #293
5 changed files with 157 additions and 30 deletions
|
@ -100,6 +100,10 @@ pub enum Error {
|
||||||
#[error(display = "Bad request: {}", _0)]
|
#[error(display = "Bad request: {}", _0)]
|
||||||
BadRequest(String),
|
BadRequest(String),
|
||||||
|
|
||||||
|
/// The client asked for an invalid return format (invalid Accept header)
|
||||||
|
#[error(display = "Not acceptable: {}", _0)]
|
||||||
|
NotAcceptable(String),
|
||||||
|
|
||||||
/// The client sent a request for an action not supported by garage
|
/// The client sent a request for an action not supported by garage
|
||||||
#[error(display = "Unimplemented action: {}", _0)]
|
#[error(display = "Unimplemented action: {}", _0)]
|
||||||
NotImplemented(String),
|
NotImplemented(String),
|
||||||
|
@ -140,6 +144,7 @@ impl Error {
|
||||||
Error::BucketNotEmpty | Error::BucketAlreadyExists => StatusCode::CONFLICT,
|
Error::BucketNotEmpty | Error::BucketAlreadyExists => StatusCode::CONFLICT,
|
||||||
Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED,
|
Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED,
|
||||||
Error::Forbidden(_) => StatusCode::FORBIDDEN,
|
Error::Forbidden(_) => StatusCode::FORBIDDEN,
|
||||||
|
Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
|
||||||
Error::InternalError(
|
Error::InternalError(
|
||||||
GarageError::Timeout
|
GarageError::Timeout
|
||||||
| GarageError::RemoteError(_)
|
| GarageError::RemoteError(_)
|
||||||
|
|
|
@ -19,6 +19,7 @@ use crate::signature::payload::check_payload_signature;
|
||||||
use crate::signature::streaming::*;
|
use crate::signature::streaming::*;
|
||||||
|
|
||||||
use crate::helpers::*;
|
use crate::helpers::*;
|
||||||
|
use crate::k2v::item::*;
|
||||||
use crate::k2v::router::Endpoint;
|
use crate::k2v::router::Endpoint;
|
||||||
use crate::s3::cors::*;
|
use crate::s3::cors::*;
|
||||||
|
|
||||||
|
@ -122,6 +123,10 @@ impl ApiHandler for K2VApiServer {
|
||||||
};
|
};
|
||||||
|
|
||||||
let resp = match endpoint {
|
let resp = match endpoint {
|
||||||
|
Endpoint::ReadItem {
|
||||||
|
partition_key,
|
||||||
|
sort_key,
|
||||||
|
} => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
|
||||||
//TODO
|
//TODO
|
||||||
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
|
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
|
||||||
};
|
};
|
||||||
|
|
123
src/api/k2v/item.rs
Normal file
123
src/api/k2v/item.rs
Normal file
|
@ -0,0 +1,123 @@
|
||||||
|
//! Function related to GET and HEAD requests
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::{Duration, UNIX_EPOCH};
|
||||||
|
|
||||||
|
use futures::stream::*;
|
||||||
|
use http::header;
|
||||||
|
use hyper::body::Bytes;
|
||||||
|
use hyper::{Body, Request, Response, StatusCode};
|
||||||
|
|
||||||
|
use garage_table::EmptyKey;
|
||||||
|
use garage_util::data::*;
|
||||||
|
|
||||||
|
use garage_model::garage::Garage;
|
||||||
|
use garage_model::k2v::item_table::*;
|
||||||
|
|
||||||
|
use crate::error::*;
|
||||||
|
|
||||||
|
const X_GARAGE_CAUSALITY_TOKEN: &'static str = "X-Garage-Causality-Token";
|
||||||
|
|
||||||
|
pub enum ReturnFormat {
|
||||||
|
Json,
|
||||||
|
Binary,
|
||||||
|
Either,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReturnFormat {
|
||||||
|
pub fn from(req: &Request<Body>) -> Result<Self, Error> {
|
||||||
|
let accept = match req.headers().get(header::ACCEPT) {
|
||||||
|
Some(a) => a.to_str()?,
|
||||||
|
None => return Ok(Self::Json),
|
||||||
|
};
|
||||||
|
|
||||||
|
let accept = accept.split(',').map(|s| s.trim()).collect::<Vec<_>>();
|
||||||
|
let accept_json = accept.contains(&"application/json");
|
||||||
|
let accept_binary = accept.contains(&"application/octet-stream");
|
||||||
|
|
||||||
|
match (accept_json, accept_binary) {
|
||||||
|
(true, true) => Ok(Self::Either),
|
||||||
|
(true, false) => Ok(Self::Json),
|
||||||
|
(false, true) => Ok(Self::Binary),
|
||||||
|
(false, false) => Err(Error::NotAcceptable("Invalid Accept: header value, must contain either application/json or application/octet-stream (or both)".into())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> {
|
||||||
|
let vals = item.values();
|
||||||
|
|
||||||
|
if vals.len() == 0 {
|
||||||
|
return Err(Error::NoSuchKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
let ct = item.causality_context().serialize();
|
||||||
|
match self {
|
||||||
|
Self::Binary if vals.len() > 1 => Ok(Response::builder()
|
||||||
|
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
|
||||||
|
.status(StatusCode::CONFLICT)
|
||||||
|
.body(Body::empty())?),
|
||||||
|
Self::Binary => {
|
||||||
|
assert!(vals.len() == 1);
|
||||||
|
Self::make_binary_response(ct, vals[0])
|
||||||
|
}
|
||||||
|
Self::Either if vals.len() == 1 => Self::make_binary_response(ct, vals[0]),
|
||||||
|
_ => Self::make_json_response(ct, &vals[..]),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> {
|
||||||
|
match v {
|
||||||
|
DvvsValue::Deleted => Ok(Response::builder()
|
||||||
|
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
|
||||||
|
.header(header::CONTENT_TYPE, "application/octet-stream")
|
||||||
|
.status(StatusCode::NO_CONTENT)
|
||||||
|
.body(Body::empty())?),
|
||||||
|
DvvsValue::Value(v) => Ok(Response::builder()
|
||||||
|
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
|
||||||
|
.header(header::CONTENT_TYPE, "application/octet-stream")
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.body(Body::from(v.to_vec()))?),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> {
|
||||||
|
let items = v
|
||||||
|
.iter()
|
||||||
|
.map(|v| match v {
|
||||||
|
DvvsValue::Deleted => serde_json::Value::Null,
|
||||||
|
DvvsValue::Value(v) => serde_json::Value::String(base64::encode(v)),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let json_body =
|
||||||
|
serde_json::to_string_pretty(&items).ok_or_internal_error("JSON encoding error")?;
|
||||||
|
Ok(Response::builder()
|
||||||
|
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
|
||||||
|
.header(header::CONTENT_TYPE, "application/json")
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.body(Body::from(json_body))?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle ReadItem request
|
||||||
|
pub async fn handle_read_item(
|
||||||
|
garage: Arc<Garage>,
|
||||||
|
req: &Request<Body>,
|
||||||
|
bucket_id: Uuid,
|
||||||
|
partition_key: &str,
|
||||||
|
sort_key: &String,
|
||||||
|
) -> Result<Response<Body>, Error> {
|
||||||
|
let format = ReturnFormat::from(req)?;
|
||||||
|
|
||||||
|
let item = garage
|
||||||
|
.k2v_item_table
|
||||||
|
.get(
|
||||||
|
&K2VItemPartition {
|
||||||
|
bucket_id,
|
||||||
|
partition_key: partition_key.to_string(),
|
||||||
|
},
|
||||||
|
sort_key,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
.ok_or(Error::NoSuchKey)?;
|
||||||
|
|
||||||
|
format.make_response(&item)
|
||||||
|
}
|
|
@ -1,3 +1,5 @@
|
||||||
pub mod api_server;
|
pub mod api_server;
|
||||||
|
|
||||||
mod router;
|
mod router;
|
||||||
|
|
||||||
|
mod item;
|
||||||
|
|
|
@ -36,44 +36,36 @@ pub enum DvvsValue {
|
||||||
|
|
||||||
impl K2VItem {
|
impl K2VItem {
|
||||||
/// Creates a new K2VItem when no previous entry existed in the db
|
/// Creates a new K2VItem when no previous entry existed in the db
|
||||||
pub fn new(
|
pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self {
|
||||||
bucket_id: Uuid,
|
Self {
|
||||||
partition_key: String,
|
|
||||||
sort_key: String,
|
|
||||||
this_node: Uuid,
|
|
||||||
value: DvvsValue,
|
|
||||||
) -> Self {
|
|
||||||
let mut ret = Self {
|
|
||||||
partition: K2VItemPartition {
|
partition: K2VItemPartition {
|
||||||
bucket_id,
|
bucket_id,
|
||||||
partition_key,
|
partition_key,
|
||||||
},
|
},
|
||||||
sort_key,
|
sort_key,
|
||||||
items: BTreeMap::new(),
|
items: BTreeMap::new(),
|
||||||
};
|
}
|
||||||
let node_id = make_node_id(this_node);
|
|
||||||
ret.items.insert(
|
|
||||||
node_id,
|
|
||||||
DvvsEntry {
|
|
||||||
t_discard: 0,
|
|
||||||
values: vec![(1, value)],
|
|
||||||
},
|
|
||||||
);
|
|
||||||
ret
|
|
||||||
}
|
}
|
||||||
/// Updates a K2VItem with a new value or a deletion event
|
/// Updates a K2VItem with a new value or a deletion event
|
||||||
pub fn update(&mut self, this_node: Uuid, context: CausalContext, new_value: DvvsValue) {
|
pub fn update(
|
||||||
for (node, t_discard) in context.vector_clock.iter() {
|
&mut self,
|
||||||
if let Some(e) = self.items.get_mut(node) {
|
this_node: Uuid,
|
||||||
e.t_discard = std::cmp::max(e.t_discard, *t_discard);
|
context: Option<CausalContext>,
|
||||||
} else {
|
new_value: DvvsValue,
|
||||||
self.items.insert(
|
) {
|
||||||
*node,
|
if let Some(context) = context {
|
||||||
DvvsEntry {
|
for (node, t_discard) in context.vector_clock.iter() {
|
||||||
t_discard: *t_discard,
|
if let Some(e) = self.items.get_mut(node) {
|
||||||
values: vec![],
|
e.t_discard = std::cmp::max(e.t_discard, *t_discard);
|
||||||
},
|
} else {
|
||||||
);
|
self.items.insert(
|
||||||
|
*node,
|
||||||
|
DvvsEntry {
|
||||||
|
t_discard: *t_discard,
|
||||||
|
values: vec![],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue