K2V #293
11 changed files with 257 additions and 24 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -966,6 +966,8 @@ version = "0.7.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"base64",
|
||||||
|
"blake2",
|
||||||
"err-derive 0.3.1",
|
"err-derive 0.3.1",
|
||||||
"futures",
|
"futures",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
|
|
@ -7,8 +7,8 @@ use garage_model::bucket_alias_table::*;
|
||||||
use garage_model::bucket_table::Bucket;
|
use garage_model::bucket_table::Bucket;
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::key_table::Key;
|
use garage_model::key_table::Key;
|
||||||
use garage_model::s3::object_table::ObjectFilter;
|
|
||||||
use garage_model::permission::BucketKeyPerm;
|
use garage_model::permission::BucketKeyPerm;
|
||||||
|
use garage_model::s3::object_table::ObjectFilter;
|
||||||
use garage_table::util::*;
|
use garage_table::util::*;
|
||||||
use garage_util::crdt::*;
|
use garage_util::crdt::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
|
@ -11,9 +11,9 @@ use garage_table::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
|
||||||
use garage_model::s3::block_ref_table::*;
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::key_table::Key;
|
use garage_model::key_table::Key;
|
||||||
|
use garage_model::s3::block_ref_table::*;
|
||||||
use garage_model::s3::object_table::*;
|
use garage_model::s3::object_table::*;
|
||||||
use garage_model::s3::version_table::*;
|
use garage_model::s3::version_table::*;
|
||||||
|
|
||||||
|
|
|
@ -955,7 +955,7 @@ fn key_after_prefix(pfx: &str) -> Option<String> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use garage_model::version_table::*;
|
use garage_model::s3::version_table::*;
|
||||||
use garage_util::*;
|
use garage_util::*;
|
||||||
use std::iter::FromIterator;
|
use std::iter::FromIterator;
|
||||||
|
|
||||||
|
|
|
@ -14,8 +14,8 @@ use garage_util::error::Error as GarageError;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
|
||||||
use garage_block::manager::INLINE_THRESHOLD;
|
use garage_block::manager::INLINE_THRESHOLD;
|
||||||
use garage_model::s3::block_ref_table::*;
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
use garage_model::s3::block_ref_table::*;
|
||||||
use garage_model::s3::object_table::*;
|
use garage_model::s3::object_table::*;
|
||||||
use garage_model::s3::version_table::*;
|
use garage_model::s3::version_table::*;
|
||||||
|
|
||||||
|
|
|
@ -21,8 +21,8 @@ use garage_model::garage::Garage;
|
||||||
use garage_model::helper::error::{Error, OkOrBadRequest};
|
use garage_model::helper::error::{Error, OkOrBadRequest};
|
||||||
use garage_model::key_table::*;
|
use garage_model::key_table::*;
|
||||||
use garage_model::migrate::Migrate;
|
use garage_model::migrate::Migrate;
|
||||||
use garage_model::s3::object_table::ObjectFilter;
|
|
||||||
use garage_model::permission::*;
|
use garage_model::permission::*;
|
||||||
|
use garage_model::s3::object_table::ObjectFilter;
|
||||||
|
|
||||||
use crate::cli::*;
|
use crate::cli::*;
|
||||||
use crate::repair::Repair;
|
use crate::repair::Repair;
|
||||||
|
|
|
@ -2,8 +2,8 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use garage_model::s3::block_ref_table::*;
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
use garage_model::s3::block_ref_table::*;
|
||||||
use garage_model::s3::object_table::*;
|
use garage_model::s3::object_table::*;
|
||||||
use garage_model::s3::version_table::*;
|
use garage_model::s3::version_table::*;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
|
@ -22,8 +22,10 @@ garage_model_050 = { package = "garage_model", version = "0.5.1" }
|
||||||
|
|
||||||
async-trait = "0.1.7"
|
async-trait = "0.1.7"
|
||||||
arc-swap = "1.0"
|
arc-swap = "1.0"
|
||||||
|
blake2 = "0.9"
|
||||||
err-derive = "0.3"
|
err-derive = "0.3"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
|
base64 = "0.13"
|
||||||
tracing = "0.1.30"
|
tracing = "0.1.30"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
zstd = { version = "0.9", default-features = false }
|
zstd = { version = "0.9", default-features = false }
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
use std::convert::TryInto;
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
use garage_util::error::*;
|
||||||
|
|
||||||
/// Node IDs used in K2V are u64 integers that are the abbreviation
|
/// Node IDs used in K2V are u64 integers that are the abbreviation
|
||||||
/// of full Garage node IDs which are 256-bit UUIDs.
|
/// of full Garage node IDs which are 256-bit UUIDs.
|
||||||
|
@ -12,12 +14,12 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
|
||||||
u64::from_be_bytes(tmp)
|
u64::from_be_bytes(tmp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Debug)]
|
||||||
pub struct CausalityContext {
|
pub struct CausalContext {
|
||||||
pub vector_clock: BTreeMap<K2VNodeId, u64>,
|
pub vector_clock: BTreeMap<K2VNodeId, u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CausalityContext {
|
impl CausalContext {
|
||||||
/// Empty causality context
|
/// Empty causality context
|
||||||
pub fn new_empty() -> Self {
|
pub fn new_empty() -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
@ -26,10 +28,65 @@ impl CausalityContext {
|
||||||
}
|
}
|
||||||
/// Make binary representation and encode in base64
|
/// Make binary representation and encode in base64
|
||||||
pub fn serialize(&self) -> String {
|
pub fn serialize(&self) -> String {
|
||||||
unimplemented!(); //TODO
|
let mut ints = Vec::with_capacity(2 * self.vector_clock.len());
|
||||||
|
for (node, time) in self.vector_clock.iter() {
|
||||||
|
ints.push(*node);
|
||||||
|
ints.push(*time);
|
||||||
|
}
|
||||||
|
let checksum = ints.iter().fold(0, |acc, v| acc ^ *v);
|
||||||
|
|
||||||
|
let mut bytes = u64::to_be_bytes(checksum).to_vec();
|
||||||
|
for i in ints {
|
||||||
|
bytes.extend(u64::to_be_bytes(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
base64::encode(bytes)
|
||||||
}
|
}
|
||||||
/// Parse from base64-encoded binary representation
|
/// Parse from base64-encoded binary representation
|
||||||
pub fn parse(s: &str) -> Self {
|
pub fn parse(s: &str) -> Result<Self, Error> {
|
||||||
unimplemented!(); //TODO
|
let bytes = base64::decode(s).ok_or_message("Invalid causality token (bad base64)")?;
|
||||||
|
if bytes.len() % 16 != 8 || bytes.len() < 8 {
|
||||||
|
return Err(Error::Message(
|
||||||
|
"Invalid causality token (bad length)".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
|
||||||
|
let mut ret = CausalContext {
|
||||||
|
vector_clock: BTreeMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
for i in 0..(bytes.len() / 16) {
|
||||||
|
let node_id = u64::from_be_bytes(bytes[8 + i * 16..16 + i * 16].try_into().unwrap());
|
||||||
|
let time = u64::from_be_bytes(bytes[16 + i * 16..24 + i * 16].try_into().unwrap());
|
||||||
|
ret.vector_clock.insert(node_id, time);
|
||||||
|
}
|
||||||
|
|
||||||
|
let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);
|
||||||
|
|
||||||
|
if check != checksum {
|
||||||
|
return Err(Error::Message(
|
||||||
|
"Invalid causality token (bad checksum)".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_causality_token_serialization() {
|
||||||
|
let ct = CausalContext {
|
||||||
|
vector_clock: [(4, 42), (1928131023, 76), (0xefc0c1c47f9de433, 2)]
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.collect(),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(CausalContext::parse(&ct.serialize()).unwrap(), ct);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,13 +10,18 @@ use crate::k2v::causality::*;
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct K2VItem {
|
pub struct K2VItem {
|
||||||
pub bucket_id: Uuid,
|
pub partition: K2VItemPartition,
|
||||||
pub partition_key: String,
|
|
||||||
pub sort_key: String,
|
pub sort_key: String,
|
||||||
|
|
||||||
items: BTreeMap<K2VNodeId, DvvsEntry>,
|
items: BTreeMap<K2VNodeId, DvvsEntry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct K2VItemPartition {
|
||||||
|
pub bucket_id: Uuid,
|
||||||
|
pub partition_key: String,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
struct DvvsEntry {
|
struct DvvsEntry {
|
||||||
t_discard: u64,
|
t_discard: u64,
|
||||||
|
@ -31,33 +36,200 @@ pub enum DvvsValue {
|
||||||
|
|
||||||
impl K2VItem {
|
impl K2VItem {
|
||||||
/// Creates a new K2VItem when no previous entry existed in the db
|
/// Creates a new K2VItem when no previous entry existed in the db
|
||||||
pub fn new(this_node: Uuid, value: DvvsValue) -> Self {
|
pub fn new(
|
||||||
unimplemented!(); // TODO
|
bucket_id: Uuid,
|
||||||
|
partition_key: String,
|
||||||
|
sort_key: String,
|
||||||
|
this_node: Uuid,
|
||||||
|
value: DvvsValue,
|
||||||
|
) -> Self {
|
||||||
|
let mut ret = Self {
|
||||||
|
partition: K2VItemPartition {
|
||||||
|
bucket_id,
|
||||||
|
partition_key,
|
||||||
|
},
|
||||||
|
sort_key,
|
||||||
|
items: BTreeMap::new(),
|
||||||
|
};
|
||||||
|
let node_id = make_node_id(this_node);
|
||||||
|
ret.items.insert(
|
||||||
|
node_id,
|
||||||
|
DvvsEntry {
|
||||||
|
t_discard: 0,
|
||||||
|
values: vec![(1, value)],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
ret
|
||||||
}
|
}
|
||||||
/// Updates a K2VItem with a new value or a deletion event
|
/// Updates a K2VItem with a new value or a deletion event
|
||||||
pub fn update(&mut self, this_node: Uuid, context: CausalityContext, new_value: DvvsValue) {
|
pub fn update(&mut self, this_node: Uuid, context: CausalContext, new_value: DvvsValue) {
|
||||||
unimplemented!(); // TODO
|
for (node, t_discard) in context.vector_clock.iter() {
|
||||||
|
if let Some(e) = self.items.get_mut(node) {
|
||||||
|
e.t_discard = std::cmp::max(e.t_discard, *t_discard);
|
||||||
|
} else {
|
||||||
|
self.items.insert(
|
||||||
|
*node,
|
||||||
|
DvvsEntry {
|
||||||
|
t_discard: *t_discard,
|
||||||
|
values: vec![],
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.discard();
|
||||||
|
|
||||||
|
let node_id = make_node_id(this_node);
|
||||||
|
let e = self.items.entry(node_id).or_insert(DvvsEntry {
|
||||||
|
t_discard: 0,
|
||||||
|
values: vec![],
|
||||||
|
});
|
||||||
|
let t_prev = e.max_time();
|
||||||
|
e.values.push((t_prev + 1, new_value));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract the causality context of a K2V Item
|
/// Extract the causality context of a K2V Item
|
||||||
pub fn causality_context(&self) -> CausalityContext {
|
pub fn causality_context(&self) -> CausalContext {
|
||||||
unimplemented!(); // TODO
|
let mut cc = CausalContext::new_empty();
|
||||||
|
for (node, ent) in self.items.iter() {
|
||||||
|
cc.vector_clock.insert(*node, ent.max_time());
|
||||||
|
}
|
||||||
|
cc
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract the list of values
|
/// Extract the list of values
|
||||||
pub fn values(&'_ self) -> Vec<&'_ DvvsValue> {
|
pub fn values(&'_ self) -> Vec<&'_ DvvsValue> {
|
||||||
unimplemented!(); // TODO
|
let mut ret = vec![];
|
||||||
|
for (_, ent) in self.items.iter() {
|
||||||
|
for (_, v) in ent.values.iter() {
|
||||||
|
ret.push(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
|
||||||
|
fn discard(&mut self) {
|
||||||
|
for (_, ent) in self.items.iter_mut() {
|
||||||
|
ent.discard();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DvvsEntry {
|
||||||
|
fn max_time(&self) -> u64 {
|
||||||
|
self.values
|
||||||
|
.iter()
|
||||||
|
.fold(self.t_discard, |acc, (vts, _)| std::cmp::max(acc, *vts))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn discard(&mut self) {
|
||||||
|
self.values = std::mem::take(&mut self.values)
|
||||||
|
.into_iter()
|
||||||
|
.filter(|(t, _)| *t > self.t_discard)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Crdt for K2VItem {
|
impl Crdt for K2VItem {
|
||||||
fn merge(&mut self, other: &Self) {
|
fn merge(&mut self, other: &Self) {
|
||||||
unimplemented!(); // TODO
|
for (node, e2) in other.items.iter() {
|
||||||
|
if let Some(e) = self.items.get_mut(node) {
|
||||||
|
e.merge(e2);
|
||||||
|
} else {
|
||||||
|
self.items.insert(*node, e2.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Crdt for DvvsEntry {
|
impl Crdt for DvvsEntry {
|
||||||
fn merge(&mut self, other: &Self) {
|
fn merge(&mut self, other: &Self) {
|
||||||
unimplemented!(); // TODO
|
self.t_discard = std::cmp::max(self.t_discard, other.t_discard);
|
||||||
|
self.discard();
|
||||||
|
|
||||||
|
let t_max = self.max_time();
|
||||||
|
for (vt, vv) in other.values.iter() {
|
||||||
|
if *vt > t_max {
|
||||||
|
self.values.push((*vt, vv.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartitionKey for K2VItemPartition {
|
||||||
|
fn hash(&self) -> Hash {
|
||||||
|
use blake2::{Blake2b, Digest};
|
||||||
|
|
||||||
|
let mut hasher = Blake2b::new();
|
||||||
|
hasher.update(self.bucket_id.as_slice());
|
||||||
|
hasher.update(self.partition_key.as_bytes());
|
||||||
|
let mut hash = [0u8; 32];
|
||||||
|
hash.copy_from_slice(&hasher.finalize()[..32]);
|
||||||
|
hash.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Entry<K2VItemPartition, String> for K2VItem {
|
||||||
|
fn partition_key(&self) -> &K2VItemPartition {
|
||||||
|
&self.partition
|
||||||
|
}
|
||||||
|
fn sort_key(&self) -> &String {
|
||||||
|
&self.sort_key
|
||||||
|
}
|
||||||
|
fn is_tombstone(&self) -> bool {
|
||||||
|
self.values()
|
||||||
|
.iter()
|
||||||
|
.all(|v| matches!(v, DvvsValue::Deleted))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct K2VItemTable {}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct ItemFilter {
|
||||||
|
pub exclude_only_tombstones: bool,
|
||||||
|
pub conflicts_only: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TableSchema for K2VItemTable {
|
||||||
|
const TABLE_NAME: &'static str = "k2v_item";
|
||||||
|
|
||||||
|
type P = K2VItemPartition;
|
||||||
|
type S = String;
|
||||||
|
type E = K2VItem;
|
||||||
|
type Filter = ItemFilter;
|
||||||
|
|
||||||
|
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {
|
||||||
|
// nothing for now
|
||||||
|
}
|
||||||
|
|
||||||
|
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
|
||||||
|
let v = entry.values();
|
||||||
|
!(filter.conflicts_only && v.len() < 2)
|
||||||
|
&& !(filter.exclude_only_tombstones && entry.is_tombstone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_dvvsentry_merge_simple() {
|
||||||
|
let e1 = DvvsEntry {
|
||||||
|
t_discard: 4,
|
||||||
|
values: vec![
|
||||||
|
(5, DvvsValue::Value(vec![15])),
|
||||||
|
(6, DvvsValue::Value(vec![16])),
|
||||||
|
],
|
||||||
|
};
|
||||||
|
let e2 = DvvsEntry {
|
||||||
|
t_discard: 5,
|
||||||
|
values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)],
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut e3 = e1.clone();
|
||||||
|
e3.merge(&e2);
|
||||||
|
assert_eq!(e2, e3);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,8 +7,8 @@ pub mod bucket_alias_table;
|
||||||
pub mod bucket_table;
|
pub mod bucket_table;
|
||||||
pub mod key_table;
|
pub mod key_table;
|
||||||
|
|
||||||
pub mod s3;
|
|
||||||
pub mod k2v;
|
pub mod k2v;
|
||||||
|
pub mod s3;
|
||||||
|
|
||||||
pub mod garage;
|
pub mod garage;
|
||||||
pub mod helper;
|
pub mod helper;
|
||||||
|
|
Loading…
Reference in a new issue