K2V PollRange, version 2 #471

Merged
lx merged 18 commits from k2v-watch-range-2 into main 2023-01-26 16:19:05 +00:00
5 changed files with 39 additions and 29 deletions
Showing only changes of commit bba13f40fc - Show all commits

View file

@ -24,11 +24,7 @@ pub async fn handle_insert_batch(
let mut items2 = vec![]; let mut items2 = vec![];
for it in items { for it in items {
let ct = it let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?;
.ct
.map(|s| CausalContext::parse(&s))
.transpose()
.ok_or_bad_request("Invalid causality token")?;
let v = match it.v { let v = match it.v {
Some(vs) => { Some(vs) => {
DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?) DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?)

View file

@ -133,9 +133,8 @@ pub async fn handle_insert_item(
.get(X_GARAGE_CAUSALITY_TOKEN) .get(X_GARAGE_CAUSALITY_TOKEN)
.map(|s| s.to_str()) .map(|s| s.to_str())
.transpose()? .transpose()?
.map(CausalContext::parse) .map(CausalContext::parse_helper)
.transpose() .transpose()?;
.ok_or_bad_request("Invalid causality token")?;
let body = hyper::body::to_bytes(req.into_body()).await?; let body = hyper::body::to_bytes(req.into_body()).await?;
let value = DvvsValue::Value(body.to_vec()); let value = DvvsValue::Value(body.to_vec());
@ -169,9 +168,8 @@ pub async fn handle_delete_item(
.get(X_GARAGE_CAUSALITY_TOKEN) .get(X_GARAGE_CAUSALITY_TOKEN)
.map(|s| s.to_str()) .map(|s| s.to_str())
.transpose()? .transpose()?
.map(CausalContext::parse) .map(CausalContext::parse_helper)
.transpose() .transpose()?;
.ok_or_bad_request("Invalid causality token")?;
let value = DvvsValue::Deleted; let value = DvvsValue::Deleted;

View file

@ -15,6 +15,8 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*; use garage_util::data::*;
use crate::helper::error::{Error as HelperError, OkOrBadRequest};
/// Node IDs used in K2V are u64 integers that are the abbreviation /// Node IDs used in K2V are u64 integers that are the abbreviation
/// of full Garage node IDs which are 256-bit UUIDs. /// of full Garage node IDs which are 256-bit UUIDs.
pub type K2VNodeId = u64; pub type K2VNodeId = u64;
@ -50,6 +52,7 @@ impl CausalContext {
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default()
} }
/// Make binary representation and encode in base64 /// Make binary representation and encode in base64
pub fn serialize(&self) -> String { pub fn serialize(&self) -> String {
let mut ints = Vec::with_capacity(2 * self.vector_clock.len()); let mut ints = Vec::with_capacity(2 * self.vector_clock.len());
@ -66,12 +69,13 @@ impl CausalContext {
base64::encode_config(bytes, base64::URL_SAFE_NO_PAD) base64::encode_config(bytes, base64::URL_SAFE_NO_PAD)
} }
/// Parse from base64-encoded binary representation
pub fn parse(s: &str) -> Result<Self, String> { /// Parse from base64-encoded binary representation.
let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD) /// Returns None on error.
.map_err(|e| format!("bad causality token base64: {}", e))?; pub fn parse(s: &str) -> Option<Self> {
let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD).ok()?;
if bytes.len() % 16 != 8 || bytes.len() < 8 { if bytes.len() % 16 != 8 || bytes.len() < 8 {
return Err("bad causality token length".into()); return None;
} }
let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap()); let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
@ -88,11 +92,16 @@ impl CausalContext {
let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t); let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);
if check != checksum { if check != checksum {
return Err("bad causality token checksum".into()); return None;
} }
Ok(ret) Some(ret)
} }
pub fn parse_helper(s: &str) -> Result<Self, HelperError> {
Self::parse(s).ok_or_bad_request("Invalid causality token")
}
/// Check if this causal context contains newer items than another one /// Check if this causal context contains newer items than another one
pub fn is_newer_than(&self, other: &Self) -> bool { pub fn is_newer_than(&self, other: &Self) -> bool {
vclock_gt(&self.vector_clock, &other.vector_clock) vclock_gt(&self.vector_clock, &other.vector_clock)

View file

@ -29,6 +29,7 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication}; use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::{PartitionKey, Table}; use garage_table::{PartitionKey, Table};
use crate::helper::error::Error as HelperError;
use crate::k2v::causality::*; use crate::k2v::causality::*;
use crate::k2v::item_table::*; use crate::k2v::item_table::*;
use crate::k2v::seen::*; use crate::k2v::seen::*;
@ -212,7 +213,7 @@ impl K2VRpcHandler {
sort_key: String, sort_key: String,
causal_context: CausalContext, causal_context: CausalContext,
timeout_msec: u64, timeout_msec: u64,
) -> Result<Option<K2VItem>, Error> { ) -> Result<Option<K2VItem>, HelperError> {
let poll_key = PollKey { let poll_key = PollKey {
partition: K2VItemPartition { partition: K2VItemPartition {
bucket_id, bucket_id,
@ -255,7 +256,7 @@ impl K2VRpcHandler {
} }
} }
K2VRpc::PollItemResponse(None) => (), K2VRpc::PollItemResponse(None) => (),
v => return Err(Error::unexpected_rpc_message(v)), v => return Err(Error::unexpected_rpc_message(v).into()),
} }
} }
@ -267,12 +268,12 @@ impl K2VRpcHandler {
range: PollRange, range: PollRange,
seen_str: Option<String>, seen_str: Option<String>,
timeout_msec: u64, timeout_msec: u64,
) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, Error> { ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> {
let has_seen_marker = seen_str.is_some(); let has_seen_marker = seen_str.is_some();
let mut seen = seen_str let mut seen = seen_str
.as_deref() .as_deref()
.map(RangeSeenMarker::decode) .map(RangeSeenMarker::decode_helper)
.transpose()? .transpose()?
.unwrap_or_default(); .unwrap_or_default();
seen.restrict(&range); seen.restrict(&range);
@ -316,7 +317,7 @@ impl K2VRpcHandler {
} }
} }
} else { } else {
return Err(Error::unexpected_rpc_message(v)); return Err(Error::unexpected_rpc_message(v).into());
} }
} }
@ -435,7 +436,7 @@ impl K2VRpcHandler {
seen_str: &Option<String>, seen_str: &Option<String>,
) -> Result<Vec<K2VItem>, Error> { ) -> Result<Vec<K2VItem>, Error> {
if let Some(seen_str) = seen_str { if let Some(seen_str) = seen_str {
let seen = RangeSeenMarker::decode(seen_str)?; let seen = RangeSeenMarker::decode(seen_str).ok_or_message("Invalid seenMarker")?;
// Subscribe now to all changes on that partition, // Subscribe now to all changes on that partition,
// so that new items that are inserted while we are reading the range // so that new items that are inserted while we are reading the range

View file

@ -13,8 +13,9 @@ use serde::{Deserialize, Serialize};
use garage_util::data::Uuid; use garage_util::data::Uuid;
use garage_util::encode::{nonversioned_decode, nonversioned_encode}; use garage_util::encode::{nonversioned_decode, nonversioned_encode};
use garage_util::error::{Error, OkOrMessage}; use garage_util::error::Error;
use crate::helper::error::{Error as HelperError, OkOrBadRequest};
use crate::k2v::causality::*; use crate::k2v::causality::*;
use crate::k2v::item_table::*; use crate::k2v::item_table::*;
use crate::k2v::sub::*; use crate::k2v::sub::*;
@ -80,10 +81,15 @@ impl RangeSeenMarker {
Ok(base64::encode(&bytes)) Ok(base64::encode(&bytes))
} }
pub fn decode(s: &str) -> Result<Self, Error> { /// Decode from msgpack+zstd+b64 representation, returns None on error.
let bytes = base64::decode(&s).ok_or_message("invalid base64")?; pub fn decode(s: &str) -> Option<Self> {
let bytes = zstd::stream::decode_all(&mut &bytes[..])?; let bytes = base64::decode(&s).ok()?;
Ok(nonversioned_decode(&bytes)?) let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?;
nonversioned_decode(&bytes).ok()
}
pub fn decode_helper(s: &str) -> Result<Self, HelperError> {
Self::decode(s).ok_or_bad_request("Invalid causality token")
} }
pub fn is_new_item(&self, item: &K2VItem) -> bool { pub fn is_new_item(&self, item: &K2VItem) -> bool {