From 84536f2e75f26514a104b3e97e6418b5ad6df2d2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 14 Apr 2022 16:19:31 +0200 Subject: [PATCH] First implementation of ReadItem --- src/api/error.rs | 5 ++ src/api/k2v/api_server.rs | 5 ++ src/api/k2v/item.rs | 123 ++++++++++++++++++++++++++++++++++++ src/api/k2v/mod.rs | 2 + src/model/k2v/item_table.rs | 52 +++++++-------- 5 files changed, 157 insertions(+), 30 deletions(-) create mode 100644 src/api/k2v/item.rs diff --git a/src/api/error.rs b/src/api/error.rs index cd7afe5a..4b7254d2 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -100,6 +100,10 @@ pub enum Error { #[error(display = "Bad request: {}", _0)] BadRequest(String), + /// The client asked for an invalid return format (invalid Accept header) + #[error(display = "Not acceptable: {}", _0)] + NotAcceptable(String), + /// The client sent a request for an action not supported by garage #[error(display = "Unimplemented action: {}", _0)] NotImplemented(String), @@ -140,6 +144,7 @@ impl Error { Error::BucketNotEmpty | Error::BucketAlreadyExists => StatusCode::CONFLICT, Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED, Error::Forbidden(_) => StatusCode::FORBIDDEN, + Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE, Error::InternalError( GarageError::Timeout | GarageError::RemoteError(_) diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index eb082c8d..a19dcdd4 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -19,6 +19,7 @@ use crate::signature::payload::check_payload_signature; use crate::signature::streaming::*; use crate::helpers::*; +use crate::k2v::item::*; use crate::k2v::router::Endpoint; use crate::s3::cors::*; @@ -122,6 +123,10 @@ impl ApiHandler for K2VApiServer { }; let resp = match endpoint { + Endpoint::ReadItem { + partition_key, + sort_key, + } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await, //TODO endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs new file mode 100644 index 00000000..2a1bb049 --- /dev/null +++ b/src/api/k2v/item.rs @@ -0,0 +1,123 @@ +//! Function related to GET and HEAD requests +use std::sync::Arc; +use std::time::{Duration, UNIX_EPOCH}; + +use futures::stream::*; +use http::header; +use hyper::body::Bytes; +use hyper::{Body, Request, Response, StatusCode}; + +use garage_table::EmptyKey; +use garage_util::data::*; + +use garage_model::garage::Garage; +use garage_model::k2v::item_table::*; + +use crate::error::*; + +const X_GARAGE_CAUSALITY_TOKEN: &'static str = "X-Garage-Causality-Token"; + +pub enum ReturnFormat { + Json, + Binary, + Either, +} + +impl ReturnFormat { + pub fn from(req: &Request) -> Result { + let accept = match req.headers().get(header::ACCEPT) { + Some(a) => a.to_str()?, + None => return Ok(Self::Json), + }; + + let accept = accept.split(',').map(|s| s.trim()).collect::>(); + let accept_json = accept.contains(&"application/json"); + let accept_binary = accept.contains(&"application/octet-stream"); + + match (accept_json, accept_binary) { + (true, true) => Ok(Self::Either), + (true, false) => Ok(Self::Json), + (false, true) => Ok(Self::Binary), + (false, false) => Err(Error::NotAcceptable("Invalid Accept: header value, must contain either application/json or application/octet-stream (or both)".into())), + } + } + + pub fn make_response(&self, item: &K2VItem) -> Result, Error> { + let vals = item.values(); + + if vals.len() == 0 { + return Err(Error::NoSuchKey); + } + + let ct = item.causality_context().serialize(); + match self { + Self::Binary if vals.len() > 1 => Ok(Response::builder() + .header(X_GARAGE_CAUSALITY_TOKEN, ct) + .status(StatusCode::CONFLICT) + .body(Body::empty())?), + Self::Binary => { + assert!(vals.len() == 1); + Self::make_binary_response(ct, vals[0]) + } + Self::Either if vals.len() == 1 => Self::make_binary_response(ct, vals[0]), + _ => Self::make_json_response(ct, &vals[..]), + } + } + + fn make_binary_response(ct: String, v: &DvvsValue) -> Result, Error> { + match v { + DvvsValue::Deleted => Ok(Response::builder() + .header(X_GARAGE_CAUSALITY_TOKEN, ct) + .header(header::CONTENT_TYPE, "application/octet-stream") + .status(StatusCode::NO_CONTENT) + .body(Body::empty())?), + DvvsValue::Value(v) => Ok(Response::builder() + .header(X_GARAGE_CAUSALITY_TOKEN, ct) + .header(header::CONTENT_TYPE, "application/octet-stream") + .status(StatusCode::OK) + .body(Body::from(v.to_vec()))?), + } + } + + fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result, Error> { + let items = v + .iter() + .map(|v| match v { + DvvsValue::Deleted => serde_json::Value::Null, + DvvsValue::Value(v) => serde_json::Value::String(base64::encode(v)), + }) + .collect::>(); + let json_body = + serde_json::to_string_pretty(&items).ok_or_internal_error("JSON encoding error")?; + Ok(Response::builder() + .header(X_GARAGE_CAUSALITY_TOKEN, ct) + .header(header::CONTENT_TYPE, "application/json") + .status(StatusCode::OK) + .body(Body::from(json_body))?) + } +} + +/// Handle ReadItem request +pub async fn handle_read_item( + garage: Arc, + req: &Request, + bucket_id: Uuid, + partition_key: &str, + sort_key: &String, +) -> Result, Error> { + let format = ReturnFormat::from(req)?; + + let item = garage + .k2v_item_table + .get( + &K2VItemPartition { + bucket_id, + partition_key: partition_key.to_string(), + }, + sort_key, + ) + .await? + .ok_or(Error::NoSuchKey)?; + + format.make_response(&item) +} diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs index 23f24444..cf8247f7 100644 --- a/src/api/k2v/mod.rs +++ b/src/api/k2v/mod.rs @@ -1,3 +1,5 @@ pub mod api_server; mod router; + +mod item; diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 3b79ebc9..d3ef5769 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -36,44 +36,36 @@ pub enum DvvsValue { impl K2VItem { /// Creates a new K2VItem when no previous entry existed in the db - pub fn new( - bucket_id: Uuid, - partition_key: String, - sort_key: String, - this_node: Uuid, - value: DvvsValue, - ) -> Self { - let mut ret = Self { + pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self { + Self { partition: K2VItemPartition { bucket_id, partition_key, }, sort_key, items: BTreeMap::new(), - }; - let node_id = make_node_id(this_node); - ret.items.insert( - node_id, - DvvsEntry { - t_discard: 0, - values: vec![(1, value)], - }, - ); - ret + } } /// Updates a K2VItem with a new value or a deletion event - pub fn update(&mut self, this_node: Uuid, context: CausalContext, new_value: DvvsValue) { - for (node, t_discard) in context.vector_clock.iter() { - if let Some(e) = self.items.get_mut(node) { - e.t_discard = std::cmp::max(e.t_discard, *t_discard); - } else { - self.items.insert( - *node, - DvvsEntry { - t_discard: *t_discard, - values: vec![], - }, - ); + pub fn update( + &mut self, + this_node: Uuid, + context: Option, + new_value: DvvsValue, + ) { + if let Some(context) = context { + for (node, t_discard) in context.vector_clock.iter() { + if let Some(e) = self.items.get_mut(node) { + e.t_discard = std::cmp::max(e.t_discard, *t_discard); + } else { + self.items.insert( + *node, + DvvsEntry { + t_discard: *t_discard, + values: vec![], + }, + ); + } } }