K2V #293
3 changed files with 15 additions and 17 deletions
|
@ -133,7 +133,8 @@ pub async fn handle_insert_item(
|
||||||
.map(|s| s.to_str())
|
.map(|s| s.to_str())
|
||||||
.transpose()?
|
.transpose()?
|
||||||
.map(CausalContext::parse)
|
.map(CausalContext::parse)
|
||||||
.transpose()?;
|
.transpose()
|
||||||
|
.ok_or_bad_request("Invalid causality token")?;
|
||||||
|
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
let value = DvvsValue::Value(body.to_vec());
|
let value = DvvsValue::Value(body.to_vec());
|
||||||
|
@ -167,7 +168,8 @@ pub async fn handle_delete_item(
|
||||||
.map(|s| s.to_str())
|
.map(|s| s.to_str())
|
||||||
.transpose()?
|
.transpose()?
|
||||||
.map(CausalContext::parse)
|
.map(CausalContext::parse)
|
||||||
.transpose()?;
|
.transpose()
|
||||||
|
.ok_or_bad_request("Invalid causality token")?;
|
||||||
|
|
||||||
let value = DvvsValue::Deleted;
|
let value = DvvsValue::Deleted;
|
||||||
|
|
||||||
|
@ -200,13 +202,16 @@ pub async fn handle_poll_item(
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let format = ReturnFormat::from(req)?;
|
let format = ReturnFormat::from(req)?;
|
||||||
|
|
||||||
|
let causal_context =
|
||||||
|
CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?;
|
||||||
|
|
||||||
let item = garage
|
let item = garage
|
||||||
.k2v_rpc
|
.k2v_rpc
|
||||||
.poll(
|
.poll(
|
||||||
bucket_id,
|
bucket_id,
|
||||||
partition_key,
|
partition_key,
|
||||||
sort_key,
|
sort_key,
|
||||||
causality_token,
|
causal_context,
|
||||||
timeout_secs.unwrap_or(300) * 1000,
|
timeout_secs.unwrap_or(300) * 1000,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
|
@ -4,7 +4,6 @@ use std::convert::TryInto;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
|
||||||
|
|
||||||
/// Node IDs used in K2V are u64 integers that are the abbreviation
|
/// Node IDs used in K2V are u64 integers that are the abbreviation
|
||||||
/// of full Garage node IDs which are 256-bit UUIDs.
|
/// of full Garage node IDs which are 256-bit UUIDs.
|
||||||
|
@ -45,13 +44,11 @@ impl CausalContext {
|
||||||
base64::encode_config(bytes, base64::URL_SAFE_NO_PAD)
|
base64::encode_config(bytes, base64::URL_SAFE_NO_PAD)
|
||||||
}
|
}
|
||||||
/// Parse from base64-encoded binary representation
|
/// Parse from base64-encoded binary representation
|
||||||
pub fn parse(s: &str) -> Result<Self, Error> {
|
pub fn parse(s: &str) -> Result<Self, String> {
|
||||||
let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD)
|
let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD)
|
||||||
.ok_or_message("Invalid causality token (bad base64)")?;
|
.map_err(|e| format!("bad causality token base64: {}", e))?;
|
||||||
if bytes.len() % 16 != 8 || bytes.len() < 8 {
|
if bytes.len() % 16 != 8 || bytes.len() < 8 {
|
||||||
return Err(Error::Message(
|
return Err("bad causality token length".into());
|
||||||
"Invalid causality token (bad length)".into(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
|
let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
|
||||||
|
@ -68,9 +65,7 @@ impl CausalContext {
|
||||||
let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);
|
let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);
|
||||||
|
|
||||||
if check != checksum {
|
if check != checksum {
|
||||||
return Err(Error::Message(
|
return Err("bad causality token checksum".into());
|
||||||
"Invalid causality token (bad checksum)".into(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
|
|
|
@ -38,7 +38,7 @@ enum K2VRpc {
|
||||||
InsertManyItems(Vec<InsertedItem>),
|
InsertManyItems(Vec<InsertedItem>),
|
||||||
PollItem {
|
PollItem {
|
||||||
key: PollKey,
|
key: PollKey,
|
||||||
causal_context: String,
|
causal_context: CausalContext,
|
||||||
timeout_msec: u64,
|
timeout_msec: u64,
|
||||||
},
|
},
|
||||||
PollItemResponse(Option<K2VItem>),
|
PollItemResponse(Option<K2VItem>),
|
||||||
|
@ -189,7 +189,7 @@ impl K2VRpcHandler {
|
||||||
bucket_id: Uuid,
|
bucket_id: Uuid,
|
||||||
partition_key: String,
|
partition_key: String,
|
||||||
sort_key: String,
|
sort_key: String,
|
||||||
causal_context: String,
|
causal_context: CausalContext,
|
||||||
timeout_msec: u64,
|
timeout_msec: u64,
|
||||||
) -> Result<Option<K2VItem>, Error> {
|
) -> Result<Option<K2VItem>, Error> {
|
||||||
let poll_key = PollKey {
|
let poll_key = PollKey {
|
||||||
|
@ -295,9 +295,7 @@ impl K2VRpcHandler {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_poll(&self, key: &PollKey, ct: &str) -> Result<K2VItem, Error> {
|
async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
|
||||||
let ct = CausalContext::parse(ct)?;
|
|
||||||
|
|
||||||
let mut chan = self.subscriptions.subscribe(key);
|
let mut chan = self.subscriptions.subscribe(key);
|
||||||
|
|
||||||
let mut value = self
|
let mut value = self
|
||||||
|
|
Loading…
Reference in a new issue