diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 7b340fe8..1213d793 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -133,7 +133,8 @@ pub async fn handle_insert_item( .map(|s| s.to_str()) .transpose()? .map(CausalContext::parse) - .transpose()?; + .transpose() + .ok_or_bad_request("Invalid causality token")?; let body = hyper::body::to_bytes(req.into_body()).await?; let value = DvvsValue::Value(body.to_vec()); @@ -167,7 +168,8 @@ pub async fn handle_delete_item( .map(|s| s.to_str()) .transpose()? .map(CausalContext::parse) - .transpose()?; + .transpose() + .ok_or_bad_request("Invalid causality token")?; let value = DvvsValue::Deleted; @@ -200,13 +202,16 @@ pub async fn handle_poll_item( ) -> Result, Error> { let format = ReturnFormat::from(req)?; + let causal_context = + CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?; + let item = garage .k2v_rpc .poll( bucket_id, partition_key, sort_key, - causality_token, + causal_context, timeout_secs.unwrap_or(300) * 1000, ) .await?; diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 0c7633d0..8c76a32b 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -4,7 +4,6 @@ use std::convert::TryInto; use serde::{Deserialize, Serialize}; use garage_util::data::*; -use garage_util::error::*; /// Node IDs used in K2V are u64 integers that are the abbreviation /// of full Garage node IDs which are 256-bit UUIDs. @@ -45,13 +44,11 @@ impl CausalContext { base64::encode_config(bytes, base64::URL_SAFE_NO_PAD) } /// Parse from base64-encoded binary representation - pub fn parse(s: &str) -> Result { + pub fn parse(s: &str) -> Result { let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD) - .ok_or_message("Invalid causality token (bad base64)")?; + .map_err(|e| format!("bad causality token base64: {}", e))?; if bytes.len() % 16 != 8 || bytes.len() < 8 { - return Err(Error::Message( - "Invalid causality token (bad length)".into(), - )); + return Err("bad causality token length".into()); } let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap()); @@ -68,9 +65,7 @@ impl CausalContext { let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t); if check != checksum { - return Err(Error::Message( - "Invalid causality token (bad checksum)".into(), - )); + return Err("bad causality token checksum".into()); } Ok(ret) diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index efe052a8..b11e06f4 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -38,7 +38,7 @@ enum K2VRpc { InsertManyItems(Vec), PollItem { key: PollKey, - causal_context: String, + causal_context: CausalContext, timeout_msec: u64, }, PollItemResponse(Option), @@ -189,7 +189,7 @@ impl K2VRpcHandler { bucket_id: Uuid, partition_key: String, sort_key: String, - causal_context: String, + causal_context: CausalContext, timeout_msec: u64, ) -> Result, Error> { let poll_key = PollKey { @@ -295,9 +295,7 @@ impl K2VRpcHandler { }) } - async fn handle_poll(&self, key: &PollKey, ct: &str) -> Result { - let ct = CausalContext::parse(ct)?; - + async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result { let mut chan = self.subscriptions.subscribe(key); let mut value = self