From 49b5d18554c67b84777d97f24423207c2375ae5e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 5 Jan 2023 13:11:48 +0100 Subject: [PATCH 1/3] K2V history and preparation for range watch --- src/api/k2v/item.rs | 2 +- src/model/garage.rs | 26 +++++++- src/model/k2v/history_table.rs | 107 +++++++++++++++++++++++++++++++++ src/model/k2v/item_table.rs | 11 ++-- src/model/k2v/mod.rs | 1 + src/model/k2v/poll.rs | 75 +++++++++++++++++++++-- src/model/k2v/rpc.rs | 62 ++++++++++++++++--- src/table/data.rs | 27 +++++---- src/table/schema.rs | 8 ++- src/table/util.rs | 2 + 10 files changed, 290 insertions(+), 31 deletions(-) create mode 100644 src/model/k2v/history_table.rs diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index f85138c7..9b78bc07 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -211,7 +211,7 @@ pub async fn handle_poll_item( let item = garage .k2v .rpc - .poll( + .poll_item( bucket_id, partition_key, sort_key, diff --git a/src/model/garage.rs b/src/model/garage.rs index ac1846ce..a33265af 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -27,7 +27,7 @@ use crate::index_counter::*; use crate::key_table::*; #[cfg(feature = "k2v")] -use crate::k2v::{item_table::*, poll::*, rpc::*}; +use crate::k2v::{history_table::*, item_table::*, poll::*, rpc::*}; /// An entire Garage full of data pub struct Garage { @@ -70,6 +70,8 @@ pub struct Garage { pub struct GarageK2V { /// Table containing K2V items pub item_table: Arc>, + /// Table containing K2V modification history + pub history_table: Arc>, /// Indexing table containing K2V item counters pub counter_table: Arc>, /// K2V RPC handler @@ -305,22 +307,42 @@ impl GarageK2V { fn new(system: Arc, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { info!("Initialize K2V counter table..."); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); + info!("Initialize K2V subscription manager..."); let subscriptions = Arc::new(SubscriptionManager::new()); + info!("Initialize K2V item table..."); let item_table = Table::new( K2VItemTable { counter_table: counter_table.clone(), subscriptions: subscriptions.clone(), }, + meta_rep_param.clone(), + system.clone(), + db, + ); + info!("Initialize K2V history table..."); + let history_table = Table::new( + K2VHistoryTable { + subscriptions: subscriptions.clone(), + }, meta_rep_param, system.clone(), db, ); - let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions); + + info!("Initialize K2V RPC handler..."); + let rpc = K2VRpcHandler::new( + system, + db, + item_table.clone(), + history_table.clone(), + subscriptions, + ); Self { item_table, + history_table, counter_table, rpc, } diff --git a/src/model/k2v/history_table.rs b/src/model/k2v/history_table.rs new file mode 100644 index 00000000..6a6e9a10 --- /dev/null +++ b/src/model/k2v/history_table.rs @@ -0,0 +1,107 @@ +use std::sync::Arc; + +use garage_db as db; + +use garage_table::crdt::*; +use garage_table::*; + +use crate::k2v::poll::*; + +mod v08 { + use crate::k2v::causality::K2VNodeId; + pub use crate::k2v::item_table::v08::{DvvsValue, K2VItemPartition}; + use garage_util::crdt; + use serde::{Deserialize, Serialize}; + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct K2VHistoryEntry { + /// Partition key: a K2V partition + pub partition: K2VItemPartition, + /// Sort key: the node ID and its local counter + pub node_counter: K2VHistorySortKey, + + /// The value of the node's local counter before this entry was updated + pub prev_counter: u64, + /// The timesamp of the update (!= counter, counters are incremented + /// by one, timestamps are real clock timestamps) + pub timestamp: u64, + /// The sort key of the item that was inserted + pub ins_sort_key: String, + /// The inserted value + pub ins_value: DvvsValue, + + /// Whether this history entry is too old and should be deleted + pub deleted: crdt::Bool, + } + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct K2VHistorySortKey { + pub node: K2VNodeId, + pub counter: u64, + } + + impl garage_util::migrate::InitialFormat for K2VHistoryEntry { + const VERSION_MARKER: &'static [u8] = b"Gk2vhe08"; + } +} + +pub use v08::*; + +impl Crdt for K2VHistoryEntry { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + } +} + +impl SortKey for K2VHistorySortKey { + type B<'a> = [u8; 16]; + + fn sort_key(&self) -> [u8; 16] { + let mut ret = [0u8; 16]; + ret[0..8].copy_from_slice(&u64::to_be_bytes(self.node)); + ret[8..16].copy_from_slice(&u64::to_be_bytes(self.counter)); + ret + } +} + +impl Entry for K2VHistoryEntry { + fn partition_key(&self) -> &K2VItemPartition { + &self.partition + } + fn sort_key(&self) -> &K2VHistorySortKey { + &self.node_counter + } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} + +pub struct K2VHistoryTable { + pub(crate) subscriptions: Arc, +} + +impl TableSchema for K2VHistoryTable { + const TABLE_NAME: &'static str = "k2v_history"; + + type P = K2VItemPartition; + type S = K2VHistorySortKey; + type E = K2VHistoryEntry; + type Filter = DeletedFilter; + + fn updated( + &self, + _tx: &mut db::Transaction, + _old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + if let Some(new_ent) = new { + self.subscriptions.notify_range(new_ent); + } + + Ok(()) + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + filter.apply(entry.deleted.get()) + } +} diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index ce3e4129..90a2f4d0 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -18,7 +18,7 @@ pub const CONFLICTS: &str = "conflicts"; pub const VALUES: &str = "values"; pub const BYTES: &str = "bytes"; -mod v08 { +pub(super) mod v08 { use crate::k2v::causality::K2VNodeId; use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; @@ -73,7 +73,8 @@ impl K2VItem { this_node: Uuid, context: &Option, new_value: DvvsValue, - ) { + node_counter: u64, + ) -> u64 { if let Some(context) = context { for (node, t_discard) in context.vector_clock.iter() { if let Some(e) = self.items.get_mut(node) { @@ -98,7 +99,9 @@ impl K2VItem { values: vec![], }); let t_prev = e.max_time(); - e.values.push((t_prev + 1, new_value)); + let t_new = std::cmp::max(node_counter + 1, t_prev + 1); + e.values.push((t_new, new_value)); + t_new } /// Extract the causality context of a K2V Item @@ -237,7 +240,7 @@ impl TableSchema for K2VItemTable { // 2. Notify if let Some(new_ent) = new { - self.subscriptions.notify(new_ent); + self.subscriptions.notify_item(new_ent); } Ok(()) diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index f6a96151..18deabac 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -1,5 +1,6 @@ pub mod causality; +pub mod history_table; pub mod item_table; pub mod poll; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs index 93105207..ea3e8d41 100644 --- a/src/model/k2v/poll.rs +++ b/src/model/k2v/poll.rs @@ -4,6 +4,7 @@ use std::sync::Mutex; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; +use crate::k2v::history_table::*; use crate::k2v::item_table::*; #[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -12,9 +13,18 @@ pub struct PollKey { pub sort_key: String, } +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollRange { + pub partition: K2VItemPartition, + pub prefix: Option, + pub start: Option, + pub end: Option, +} + #[derive(Default)] pub struct SubscriptionManager { - subscriptions: Mutex>>, + item_subscriptions: Mutex>>, + range_subscriptions: Mutex>>, } impl SubscriptionManager { @@ -22,8 +32,10 @@ impl SubscriptionManager { Self::default() } - pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver { - let mut subs = self.subscriptions.lock().unwrap(); + // ---- simple item polling ---- + + pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver { + let mut subs = self.item_subscriptions.lock().unwrap(); if let Some(s) = subs.get(key) { s.subscribe() } else { @@ -33,12 +45,12 @@ impl SubscriptionManager { } } - pub fn notify(&self, item: &K2VItem) { + pub fn notify_item(&self, item: &K2VItem) { let key = PollKey { partition: item.partition.clone(), sort_key: item.sort_key.clone(), }; - let mut subs = self.subscriptions.lock().unwrap(); + let mut subs = self.item_subscriptions.lock().unwrap(); if let Some(s) = subs.get(&key) { if s.send(item.clone()).is_err() { // no more subscribers, remove channel from here @@ -47,4 +59,57 @@ impl SubscriptionManager { } } } + + // ---- range polling ---- + + pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver { + let mut subs = self.range_subscriptions.lock().unwrap(); + if let Some(s) = subs.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + subs.insert(key.clone(), tx); + rx + } + } + + pub fn notify_range(&self, entry: &K2VHistoryEntry) { + let mut subs = self.range_subscriptions.lock().unwrap(); + let mut dead_subs = vec![]; + + for (sub, chan) in subs.iter() { + if sub.matches(&entry) { + if chan.send(entry.clone()).is_err() { + dead_subs.push(sub.clone()); + } + } else if chan.receiver_count() == 0 { + dead_subs.push(sub.clone()); + } + } + + for sub in dead_subs.iter() { + subs.remove(sub); + } + } +} + +impl PollRange { + fn matches(&self, entry: &K2VHistoryEntry) -> bool { + entry.partition == self.partition + && self + .prefix + .as_ref() + .map(|x| entry.ins_sort_key.starts_with(x)) + .unwrap_or(true) + && self + .start + .as_ref() + .map(|x| entry.ins_sort_key >= *x) + .unwrap_or(true) + && self + .end + .as_ref() + .map(|x| entry.ins_sort_key < *x) + .unwrap_or(true) + } } diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index f64a7984..1dc396c0 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -6,6 +6,7 @@ //! mean the vector clock gets much larger than needed). use std::collections::HashMap; +use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; @@ -15,9 +16,12 @@ use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::select; +use garage_db as db; + use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_rpc::system::System; use garage_rpc::*; @@ -26,6 +30,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication}; use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; +use crate::k2v::history_table::*; use crate::k2v::item_table::*; use crate::k2v::poll::*; @@ -59,6 +64,8 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc, item_table: Arc>, + history_table: Arc>, + local_counter_tree: db::Tree, endpoint: Arc>, subscriptions: Arc, } @@ -66,14 +73,21 @@ pub struct K2VRpcHandler { impl K2VRpcHandler { pub fn new( system: Arc, + db: &db::Db, item_table: Arc>, + history_table: Arc>, subscriptions: Arc, ) -> Arc { + let local_counter_tree = db + .open_tree("k2v_local_counter") + .expect("Unable to open DB tree for k2v local counter"); let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); let rpc_handler = Arc::new(Self { system, item_table, + history_table, + local_counter_tree, endpoint, subscriptions, }); @@ -181,7 +195,7 @@ impl K2VRpcHandler { Ok(()) } - pub async fn poll( + pub async fn poll_item( &self, bucket_id: Uuid, partition_key: String, @@ -273,9 +287,17 @@ impl K2VRpcHandler { } fn local_insert(&self, item: &InsertedItem) -> Result, Error> { + let now = now_msec(); + self.item_table .data - .update_entry_with(&item.partition, &item.sort_key, |ent| { + .update_entry_with(&item.partition, &item.sort_key, |tx, ent| { + let old_local_counter = tx + .get(&self.local_counter_tree, b"counter")? + .and_then(|x| x.try_into().ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); + let mut ent = ent.unwrap_or_else(|| { K2VItem::new( item.partition.bucket_id, @@ -283,13 +305,39 @@ impl K2VRpcHandler { item.sort_key.clone(), ) }); - ent.update(self.system.id, &item.causal_context, item.value.clone()); - ent + let new_local_counter = ent.update( + self.system.id, + &item.causal_context, + item.value.clone(), + old_local_counter, + ); + + tx.insert( + &self.local_counter_tree, + b"counter", + u64::to_be_bytes(new_local_counter), + )?; + + let hist_entry = K2VHistoryEntry { + partition: ent.partition.clone(), + node_counter: K2VHistorySortKey { + node: make_node_id(self.system.id), + counter: new_local_counter, + }, + prev_counter: old_local_counter, + timestamp: now, + ins_sort_key: item.sort_key.clone(), + ins_value: item.value.clone(), + deleted: false.into(), + }; + self.history_table.queue_insert(tx, &hist_entry)?; + + Ok(ent) }) } - async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result { - let mut chan = self.subscriptions.subscribe(key); + async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result { + let mut chan = self.subscriptions.subscribe_item(key); let mut value = self .item_table @@ -326,7 +374,7 @@ impl EndpointHandler for K2VRpcHandler { } => { let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); select! { - ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), + ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), _ = delay => Ok(K2VRpc::PollItemResponse(None)), } } diff --git a/src/table/data.rs b/src/table/data.rs index 5c792f1f..d19586f3 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -181,13 +181,17 @@ impl TableData { pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { let update = self.decode_entry(update_bytes)?; - self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent { - Some(mut ent) => { - ent.merge(&update); - ent - } - None => update.clone(), - })?; + self.update_entry_with( + update.partition_key(), + update.sort_key(), + |_tx, ent| match ent { + Some(mut ent) => { + ent.merge(&update); + Ok(ent) + } + None => Ok(update.clone()), + }, + )?; Ok(()) } @@ -195,7 +199,7 @@ impl TableData { &self, partition_key: &F::P, sort_key: &F::S, - f: impl Fn(Option) -> F::E, + update_fn: impl Fn(&mut db::Transaction, Option) -> db::TxResult, ) -> Result, Error> { let tree_key = self.tree_key(partition_key, sort_key); @@ -203,10 +207,10 @@ impl TableData { let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { Some(old_bytes) => { let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; - let new_entry = f(Some(old_entry.clone())); + let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?; (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, f(None)), + None => (None, None, update_fn(&mut tx, None)?), }; // Changed can be true in two scenarios @@ -335,6 +339,7 @@ impl TableData { .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; tx.insert(&self.insert_queue, &tree_key, new_entry)?; + self.insert_queue_notify.notify_one(); Ok(()) @@ -344,7 +349,7 @@ impl TableData { pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { let mut ret = p.hash().to_vec(); - ret.extend(s.sort_key()); + ret.extend(s.sort_key().borrow()); ret } diff --git a/src/table/schema.rs b/src/table/schema.rs index 5cbf6c95..86ff816a 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -31,17 +31,23 @@ impl PartitionKey for FixedBytes32 { /// Trait for field used to sort data pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { + type B<'a>: std::borrow::Borrow<[u8]>; + /// Get the key used to sort - fn sort_key(&self) -> &[u8]; + fn sort_key(&self) -> Self::B<'_>; } impl SortKey for String { + type B<'a> = &'a [u8]; + fn sort_key(&self) -> &[u8] { self.as_bytes() } } impl SortKey for FixedBytes32 { + type B<'a> = &'a [u8]; + fn sort_key(&self) -> &[u8] { self.as_slice() } diff --git a/src/table/util.rs b/src/table/util.rs index 0b10cf3f..a79e2cba 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -7,6 +7,8 @@ use crate::schema::*; #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct EmptyKey; impl SortKey for EmptyKey { + type B<'a> = &'a [u8]; + fn sort_key(&self) -> &[u8] { &[] } -- 2.43.0 From b337895fcefd35d2a1c8965fe30c8eecb5dcd964 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 9 Jan 2023 12:43:10 +0100 Subject: [PATCH 2/3] Update Rust Nix toolchain --- flake.lock | 16 ++++++++-------- flake.nix | 4 ++-- nix/common.nix | 8 ++++---- nix/compile.nix | 2 +- nix/kaniko.nix | 2 +- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/flake.lock b/flake.lock index 4b5713dc..f1f78b55 100644 --- a/flake.lock +++ b/flake.lock @@ -10,17 +10,17 @@ "rust-overlay": "rust-overlay" }, "locked": { - "lastModified": 1666087781, - "narHash": "sha256-trKVdjMZ8mNkGfLcY5LsJJGtdV3xJDZnMVrkFjErlcs=", + "lastModified": 1673262828, + "narHash": "sha256-pDqno5/2ghQDt4LjVt5ZUMV9pTSA5rGGdz6Skf2rBwc=", "owner": "Alexis211", "repo": "cargo2nix", - "rev": "a7a61179b66054904ef6a195d8da736eaaa06c36", + "rev": "505caa32110d42ee03bd68b47031142eff9c827b", "type": "github" }, "original": { "owner": "Alexis211", "repo": "cargo2nix", - "rev": "a7a61179b66054904ef6a195d8da736eaaa06c36", + "rev": "505caa32110d42ee03bd68b47031142eff9c827b", "type": "github" } }, @@ -57,17 +57,17 @@ }, "nixpkgs": { "locked": { - "lastModified": 1665657542, - "narHash": "sha256-mojxNyzbvmp8NtVtxqiHGhRfjCALLfk9i/Uup68Y5q8=", + "lastModified": 1673261889, + "narHash": "sha256-7trMsi0z7EfYNC/Nc5EtulvChBHQAo376XRICyWr89Q=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "a3073c49bc0163fea6a121c276f526837672b555", + "rev": "baed728abe983508cabc99d05cccc164fe748744", "type": "github" }, "original": { "owner": "NixOS", "repo": "nixpkgs", - "rev": "a3073c49bc0163fea6a121c276f526837672b555", + "rev": "baed728abe983508cabc99d05cccc164fe748744", "type": "github" } }, diff --git a/flake.nix b/flake.nix index 7d152195..2222895d 100644 --- a/flake.nix +++ b/flake.nix @@ -1,10 +1,10 @@ { description = "Garage, an S3-compatible distributed object store for self-hosted deployments"; - inputs.nixpkgs.url = "github:NixOS/nixpkgs/a3073c49bc0163fea6a121c276f526837672b555"; + inputs.nixpkgs.url = "github:NixOS/nixpkgs/baed728abe983508cabc99d05cccc164fe748744"; inputs.cargo2nix = { # As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection - url = "github:Alexis211/cargo2nix/a7a61179b66054904ef6a195d8da736eaaa06c36"; + url = "github:Alexis211/cargo2nix/505caa32110d42ee03bd68b47031142eff9c827b"; inputs.nixpkgs.follows = "nixpkgs"; }; diff --git a/nix/common.nix b/nix/common.nix index 90e3afaf..80b89102 100644 --- a/nix/common.nix +++ b/nix/common.nix @@ -3,15 +3,15 @@ rec { * Fixed dependencies */ pkgsSrc = fetchTarball { - # As of 2022-10-13 - url = "https://github.com/NixOS/nixpkgs/archive/a3073c49bc0163fea6a121c276f526837672b555.zip"; - sha256 = "1bz632psfbpmicyzjb8b4265y50shylccvfm6ry6mgnv5hvz324s"; + # As of 2023-01-09 + url = "https://github.com/NixOS/nixpkgs/archive/baed728abe983508cabc99d05cccc164fe748744.zip"; + sha256 = "1m7kmcjhnj3lx7xqs0nh262c4nxs5n8p7k9g6kc4gv1k5nrcrnpf"; }; cargo2nixSrc = fetchGit { # As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection url = "https://github.com/Alexis211/cargo2nix"; ref = "custom_unstable"; - rev = "a7a61179b66054904ef6a195d8da736eaaa06c36"; + rev = "505caa32110d42ee03bd68b47031142eff9c827b"; }; /* diff --git a/nix/compile.nix b/nix/compile.nix index 3ea5035e..5c6b02e3 100644 --- a/nix/compile.nix +++ b/nix/compile.nix @@ -42,7 +42,7 @@ let */ toolchainOptions = if target == null || target == "x86_64-unknown-linux-musl" || target == "aarch64-unknown-linux-musl" then { - rustVersion = "1.63.0"; + rustVersion = "1.65.0"; extraRustComponents = [ "clippy" ]; } else { rustToolchain = pkgs.symlinkJoin { diff --git a/nix/kaniko.nix b/nix/kaniko.nix index 140328b8..e27aaee3 100644 --- a/nix/kaniko.nix +++ b/nix/kaniko.nix @@ -7,7 +7,7 @@ pkgs.buildGoModule rec { owner = "GoogleContainerTools"; repo = "kaniko"; rev = "v${version}"; - sha256 = "1fnclr556avxay6pvgw5ya3xbxfnf2gv4njq2hr4fd6fcjyslq5h"; + sha256 = "TXgzO/NfLXVo5a7yyO3XYSk+9H1CwMF+vwbRx3kchQ8="; }; vendorSha256 = null; -- 2.43.0 From 32715d462e44d9b6dee84ff3c1eb6163d4be4123 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 9 Jan 2023 14:53:27 +0100 Subject: [PATCH 3/3] history table refactoring --- src/model/garage.rs | 2 +- src/model/k2v/history_table.rs | 20 ++++++++++---------- src/model/k2v/item_table.rs | 2 +- src/model/k2v/mod.rs | 3 ++- src/model/k2v/rpc.rs | 11 +++++------ src/model/k2v/{poll.rs => sub.rs} | 8 ++++---- 6 files changed, 23 insertions(+), 23 deletions(-) rename src/model/k2v/{poll.rs => sub.rs} (93%) diff --git a/src/model/garage.rs b/src/model/garage.rs index a33265af..c0ffdd31 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -27,7 +27,7 @@ use crate::index_counter::*; use crate::key_table::*; #[cfg(feature = "k2v")] -use crate::k2v::{history_table::*, item_table::*, poll::*, rpc::*}; +use crate::k2v::{history_table::*, item_table::*, sub::*, rpc::*}; /// An entire Garage full of data pub struct Garage { diff --git a/src/model/k2v/history_table.rs b/src/model/k2v/history_table.rs index 6a6e9a10..9df03f5d 100644 --- a/src/model/k2v/history_table.rs +++ b/src/model/k2v/history_table.rs @@ -5,18 +5,21 @@ use garage_db as db; use garage_table::crdt::*; use garage_table::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; mod v08 { use crate::k2v::causality::K2VNodeId; - pub use crate::k2v::item_table::v08::{DvvsValue, K2VItemPartition}; + pub use crate::k2v::item_table::v08::{DvvsValue, K2VItem, K2VItemPartition}; use garage_util::crdt; use serde::{Deserialize, Serialize}; #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct K2VHistoryEntry { - /// Partition key: a K2V partition - pub partition: K2VItemPartition, + // Partition key: the partition key of ins_item + + /// The inserted item + pub ins_item: K2VItem, + /// Sort key: the node ID and its local counter pub node_counter: K2VHistorySortKey, @@ -25,12 +28,8 @@ mod v08 { /// The timesamp of the update (!= counter, counters are incremented /// by one, timestamps are real clock timestamps) pub timestamp: u64, - /// The sort key of the item that was inserted - pub ins_sort_key: String, - /// The inserted value - pub ins_value: DvvsValue, - /// Whether this history entry is too old and should be deleted + /// Mark this history entry for deletion pub deleted: crdt::Bool, } @@ -49,6 +48,7 @@ pub use v08::*; impl Crdt for K2VHistoryEntry { fn merge(&mut self, other: &Self) { + self.ins_item.merge(&other.ins_item); self.deleted.merge(&other.deleted); } } @@ -66,7 +66,7 @@ impl SortKey for K2VHistorySortKey { impl Entry for K2VHistoryEntry { fn partition_key(&self) -> &K2VItemPartition { - &self.partition + &self.ins_item.partition } fn sort_key(&self) -> &K2VHistorySortKey { &self.node_counter diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 90a2f4d0..cf60fd3f 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -11,7 +11,7 @@ use garage_table::*; use crate::index_counter::*; use crate::k2v::causality::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; pub const ENTRIES: &str = "entries"; pub const CONFLICTS: &str = "conflicts"; diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index 18deabac..4f7de5b7 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -3,5 +3,6 @@ pub mod causality; pub mod history_table; pub mod item_table; -pub mod poll; +pub(crate) mod sub; + pub mod rpc; diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 1dc396c0..c3cb5f9f 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -32,7 +32,7 @@ use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; use crate::k2v::history_table::*; use crate::k2v::item_table::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; /// RPC messages for K2V #[derive(Debug, Serialize, Deserialize)] @@ -292,8 +292,9 @@ impl K2VRpcHandler { self.item_table .data .update_entry_with(&item.partition, &item.sort_key, |tx, ent| { + let local_counter_key = item.partition.hash(); let old_local_counter = tx - .get(&self.local_counter_tree, b"counter")? + .get(&self.local_counter_tree, &local_counter_key)? .and_then(|x| x.try_into().ok()) .map(u64::from_be_bytes) .unwrap_or_default(); @@ -314,20 +315,18 @@ impl K2VRpcHandler { tx.insert( &self.local_counter_tree, - b"counter", + &local_counter_key, u64::to_be_bytes(new_local_counter), )?; let hist_entry = K2VHistoryEntry { - partition: ent.partition.clone(), + ins_item: ent.clone(), node_counter: K2VHistorySortKey { node: make_node_id(self.system.id), counter: new_local_counter, }, prev_counter: old_local_counter, timestamp: now, - ins_sort_key: item.sort_key.clone(), - ins_value: item.value.clone(), deleted: false.into(), }; self.history_table.queue_insert(tx, &hist_entry)?; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/sub.rs similarity index 93% rename from src/model/k2v/poll.rs rename to src/model/k2v/sub.rs index ea3e8d41..1dcca4d5 100644 --- a/src/model/k2v/poll.rs +++ b/src/model/k2v/sub.rs @@ -95,21 +95,21 @@ impl SubscriptionManager { impl PollRange { fn matches(&self, entry: &K2VHistoryEntry) -> bool { - entry.partition == self.partition + entry.ins_item.partition == self.partition && self .prefix .as_ref() - .map(|x| entry.ins_sort_key.starts_with(x)) + .map(|x| entry.ins_item.sort_key.starts_with(x)) .unwrap_or(true) && self .start .as_ref() - .map(|x| entry.ins_sort_key >= *x) + .map(|x| entry.ins_item.sort_key >= *x) .unwrap_or(true) && self .end .as_ref() - .map(|x| entry.ins_sort_key < *x) + .map(|x| entry.ins_item.sort_key < *x) .unwrap_or(true) } } -- 2.43.0