diff --git a/flake.lock b/flake.lock
index 4b5713dc..f1f78b55 100644
--- a/flake.lock
+++ b/flake.lock
@@ -10,17 +10,17 @@
"rust-overlay": "rust-overlay"
},
"locked": {
- "lastModified": 1666087781,
- "narHash": "sha256-trKVdjMZ8mNkGfLcY5LsJJGtdV3xJDZnMVrkFjErlcs=",
+ "lastModified": 1673262828,
+ "narHash": "sha256-pDqno5/2ghQDt4LjVt5ZUMV9pTSA5rGGdz6Skf2rBwc=",
"owner": "Alexis211",
"repo": "cargo2nix",
- "rev": "a7a61179b66054904ef6a195d8da736eaaa06c36",
+ "rev": "505caa32110d42ee03bd68b47031142eff9c827b",
"type": "github"
},
"original": {
"owner": "Alexis211",
"repo": "cargo2nix",
- "rev": "a7a61179b66054904ef6a195d8da736eaaa06c36",
+ "rev": "505caa32110d42ee03bd68b47031142eff9c827b",
"type": "github"
}
},
@@ -57,17 +57,17 @@
},
"nixpkgs": {
"locked": {
- "lastModified": 1665657542,
- "narHash": "sha256-mojxNyzbvmp8NtVtxqiHGhRfjCALLfk9i/Uup68Y5q8=",
+ "lastModified": 1673261889,
+ "narHash": "sha256-7trMsi0z7EfYNC/Nc5EtulvChBHQAo376XRICyWr89Q=",
"owner": "NixOS",
"repo": "nixpkgs",
- "rev": "a3073c49bc0163fea6a121c276f526837672b555",
+ "rev": "baed728abe983508cabc99d05cccc164fe748744",
"type": "github"
},
"original": {
"owner": "NixOS",
"repo": "nixpkgs",
- "rev": "a3073c49bc0163fea6a121c276f526837672b555",
+ "rev": "baed728abe983508cabc99d05cccc164fe748744",
"type": "github"
}
},
diff --git a/flake.nix b/flake.nix
index 7d152195..2222895d 100644
--- a/flake.nix
+++ b/flake.nix
@@ -1,10 +1,10 @@
{
description = "Garage, an S3-compatible distributed object store for self-hosted deployments";
- inputs.nixpkgs.url = "github:NixOS/nixpkgs/a3073c49bc0163fea6a121c276f526837672b555";
+ inputs.nixpkgs.url = "github:NixOS/nixpkgs/baed728abe983508cabc99d05cccc164fe748744";
inputs.cargo2nix = {
# As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection
- url = "github:Alexis211/cargo2nix/a7a61179b66054904ef6a195d8da736eaaa06c36";
+ url = "github:Alexis211/cargo2nix/505caa32110d42ee03bd68b47031142eff9c827b";
inputs.nixpkgs.follows = "nixpkgs";
};
diff --git a/nix/common.nix b/nix/common.nix
index 90e3afaf..80b89102 100644
--- a/nix/common.nix
+++ b/nix/common.nix
@@ -3,15 +3,15 @@ rec {
* Fixed dependencies
*/
pkgsSrc = fetchTarball {
- # As of 2022-10-13
- url = "https://github.com/NixOS/nixpkgs/archive/a3073c49bc0163fea6a121c276f526837672b555.zip";
- sha256 = "1bz632psfbpmicyzjb8b4265y50shylccvfm6ry6mgnv5hvz324s";
+ # As of 2023-01-09
+ url = "https://github.com/NixOS/nixpkgs/archive/baed728abe983508cabc99d05cccc164fe748744.zip";
+ sha256 = "1m7kmcjhnj3lx7xqs0nh262c4nxs5n8p7k9g6kc4gv1k5nrcrnpf";
};
cargo2nixSrc = fetchGit {
# As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection
url = "https://github.com/Alexis211/cargo2nix";
ref = "custom_unstable";
- rev = "a7a61179b66054904ef6a195d8da736eaaa06c36";
+ rev = "505caa32110d42ee03bd68b47031142eff9c827b";
};
/*
diff --git a/nix/compile.nix b/nix/compile.nix
index 3ea5035e..5c6b02e3 100644
--- a/nix/compile.nix
+++ b/nix/compile.nix
@@ -42,7 +42,7 @@ let
*/
toolchainOptions =
if target == null || target == "x86_64-unknown-linux-musl" || target == "aarch64-unknown-linux-musl" then {
- rustVersion = "1.63.0";
+ rustVersion = "1.65.0";
extraRustComponents = [ "clippy" ];
} else {
rustToolchain = pkgs.symlinkJoin {
diff --git a/nix/kaniko.nix b/nix/kaniko.nix
index 140328b8..e27aaee3 100644
--- a/nix/kaniko.nix
+++ b/nix/kaniko.nix
@@ -7,7 +7,7 @@ pkgs.buildGoModule rec {
owner = "GoogleContainerTools";
repo = "kaniko";
rev = "v${version}";
- sha256 = "1fnclr556avxay6pvgw5ya3xbxfnf2gv4njq2hr4fd6fcjyslq5h";
+ sha256 = "TXgzO/NfLXVo5a7yyO3XYSk+9H1CwMF+vwbRx3kchQ8=";
};
vendorSha256 = null;
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index f85138c7..9b78bc07 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -211,7 +211,7 @@ pub async fn handle_poll_item(
let item = garage
.k2v
.rpc
- .poll(
+ .poll_item(
bucket_id,
partition_key,
sort_key,
diff --git a/src/model/garage.rs b/src/model/garage.rs
index ac1846ce..c0ffdd31 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -27,7 +27,7 @@ use crate::index_counter::*;
use crate::key_table::*;
#[cfg(feature = "k2v")]
-use crate::k2v::{item_table::*, poll::*, rpc::*};
+use crate::k2v::{history_table::*, item_table::*, sub::*, rpc::*};
/// An entire Garage full of data
pub struct Garage {
@@ -70,6 +70,8 @@ pub struct Garage {
pub struct GarageK2V {
/// Table containing K2V items
pub item_table: Arc
>,
+ /// Table containing K2V modification history
+ pub history_table: Arc>,
/// Indexing table containing K2V item counters
pub counter_table: Arc>,
/// K2V RPC handler
@@ -305,22 +307,42 @@ impl GarageK2V {
fn new(system: Arc, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
+
info!("Initialize K2V subscription manager...");
let subscriptions = Arc::new(SubscriptionManager::new());
+
info!("Initialize K2V item table...");
let item_table = Table::new(
K2VItemTable {
counter_table: counter_table.clone(),
subscriptions: subscriptions.clone(),
},
+ meta_rep_param.clone(),
+ system.clone(),
+ db,
+ );
+ info!("Initialize K2V history table...");
+ let history_table = Table::new(
+ K2VHistoryTable {
+ subscriptions: subscriptions.clone(),
+ },
meta_rep_param,
system.clone(),
db,
);
- let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions);
+
+ info!("Initialize K2V RPC handler...");
+ let rpc = K2VRpcHandler::new(
+ system,
+ db,
+ item_table.clone(),
+ history_table.clone(),
+ subscriptions,
+ );
Self {
item_table,
+ history_table,
counter_table,
rpc,
}
diff --git a/src/model/k2v/history_table.rs b/src/model/k2v/history_table.rs
new file mode 100644
index 00000000..9df03f5d
--- /dev/null
+++ b/src/model/k2v/history_table.rs
@@ -0,0 +1,107 @@
+use std::sync::Arc;
+
+use garage_db as db;
+
+use garage_table::crdt::*;
+use garage_table::*;
+
+use crate::k2v::sub::*;
+
+mod v08 {
+ use crate::k2v::causality::K2VNodeId;
+ pub use crate::k2v::item_table::v08::{DvvsValue, K2VItem, K2VItemPartition};
+ use garage_util::crdt;
+ use serde::{Deserialize, Serialize};
+
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct K2VHistoryEntry {
+ // Partition key: the partition key of ins_item
+
+ /// The inserted item
+ pub ins_item: K2VItem,
+
+ /// Sort key: the node ID and its local counter
+ pub node_counter: K2VHistorySortKey,
+
+ /// The value of the node's local counter before this entry was updated
+ pub prev_counter: u64,
+ /// The timesamp of the update (!= counter, counters are incremented
+ /// by one, timestamps are real clock timestamps)
+ pub timestamp: u64,
+
+ /// Mark this history entry for deletion
+ pub deleted: crdt::Bool,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct K2VHistorySortKey {
+ pub node: K2VNodeId,
+ pub counter: u64,
+ }
+
+ impl garage_util::migrate::InitialFormat for K2VHistoryEntry {
+ const VERSION_MARKER: &'static [u8] = b"Gk2vhe08";
+ }
+}
+
+pub use v08::*;
+
+impl Crdt for K2VHistoryEntry {
+ fn merge(&mut self, other: &Self) {
+ self.ins_item.merge(&other.ins_item);
+ self.deleted.merge(&other.deleted);
+ }
+}
+
+impl SortKey for K2VHistorySortKey {
+ type B<'a> = [u8; 16];
+
+ fn sort_key(&self) -> [u8; 16] {
+ let mut ret = [0u8; 16];
+ ret[0..8].copy_from_slice(&u64::to_be_bytes(self.node));
+ ret[8..16].copy_from_slice(&u64::to_be_bytes(self.counter));
+ ret
+ }
+}
+
+impl Entry for K2VHistoryEntry {
+ fn partition_key(&self) -> &K2VItemPartition {
+ &self.ins_item.partition
+ }
+ fn sort_key(&self) -> &K2VHistorySortKey {
+ &self.node_counter
+ }
+ fn is_tombstone(&self) -> bool {
+ self.deleted.get()
+ }
+}
+
+pub struct K2VHistoryTable {
+ pub(crate) subscriptions: Arc,
+}
+
+impl TableSchema for K2VHistoryTable {
+ const TABLE_NAME: &'static str = "k2v_history";
+
+ type P = K2VItemPartition;
+ type S = K2VHistorySortKey;
+ type E = K2VHistoryEntry;
+ type Filter = DeletedFilter;
+
+ fn updated(
+ &self,
+ _tx: &mut db::Transaction,
+ _old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
+ if let Some(new_ent) = new {
+ self.subscriptions.notify_range(new_ent);
+ }
+
+ Ok(())
+ }
+
+ fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
+ filter.apply(entry.deleted.get())
+ }
+}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index ce3e4129..cf60fd3f 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -11,14 +11,14 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
-use crate::k2v::poll::*;
+use crate::k2v::sub::*;
pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";
-mod v08 {
+pub(super) mod v08 {
use crate::k2v::causality::K2VNodeId;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
@@ -73,7 +73,8 @@ impl K2VItem {
this_node: Uuid,
context: &Option,
new_value: DvvsValue,
- ) {
+ node_counter: u64,
+ ) -> u64 {
if let Some(context) = context {
for (node, t_discard) in context.vector_clock.iter() {
if let Some(e) = self.items.get_mut(node) {
@@ -98,7 +99,9 @@ impl K2VItem {
values: vec![],
});
let t_prev = e.max_time();
- e.values.push((t_prev + 1, new_value));
+ let t_new = std::cmp::max(node_counter + 1, t_prev + 1);
+ e.values.push((t_new, new_value));
+ t_new
}
/// Extract the causality context of a K2V Item
@@ -237,7 +240,7 @@ impl TableSchema for K2VItemTable {
// 2. Notify
if let Some(new_ent) = new {
- self.subscriptions.notify(new_ent);
+ self.subscriptions.notify_item(new_ent);
}
Ok(())
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
index f6a96151..4f7de5b7 100644
--- a/src/model/k2v/mod.rs
+++ b/src/model/k2v/mod.rs
@@ -1,6 +1,8 @@
pub mod causality;
+pub mod history_table;
pub mod item_table;
-pub mod poll;
+pub(crate) mod sub;
+
pub mod rpc;
diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs
deleted file mode 100644
index 93105207..00000000
--- a/src/model/k2v/poll.rs
+++ /dev/null
@@ -1,50 +0,0 @@
-use std::collections::HashMap;
-use std::sync::Mutex;
-
-use serde::{Deserialize, Serialize};
-use tokio::sync::broadcast;
-
-use crate::k2v::item_table::*;
-
-#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
-pub struct PollKey {
- pub partition: K2VItemPartition,
- pub sort_key: String,
-}
-
-#[derive(Default)]
-pub struct SubscriptionManager {
- subscriptions: Mutex>>,
-}
-
-impl SubscriptionManager {
- pub fn new() -> Self {
- Self::default()
- }
-
- pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver {
- let mut subs = self.subscriptions.lock().unwrap();
- if let Some(s) = subs.get(key) {
- s.subscribe()
- } else {
- let (tx, rx) = broadcast::channel(8);
- subs.insert(key.clone(), tx);
- rx
- }
- }
-
- pub fn notify(&self, item: &K2VItem) {
- let key = PollKey {
- partition: item.partition.clone(),
- sort_key: item.sort_key.clone(),
- };
- let mut subs = self.subscriptions.lock().unwrap();
- if let Some(s) = subs.get(&key) {
- if s.send(item.clone()).is_err() {
- // no more subscribers, remove channel from here
- // (we will re-create it later if we need to subscribe again)
- subs.remove(&key);
- }
- }
- }
-}
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index f64a7984..c3cb5f9f 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -6,6 +6,7 @@
//! mean the vector clock gets much larger than needed).
use std::collections::HashMap;
+use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;
@@ -15,9 +16,12 @@ use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::select;
+use garage_db as db;
+
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::time::*;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -26,8 +30,9 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
+use crate::k2v::history_table::*;
use crate::k2v::item_table::*;
-use crate::k2v::poll::*;
+use crate::k2v::sub::*;
/// RPC messages for K2V
#[derive(Debug, Serialize, Deserialize)]
@@ -59,6 +64,8 @@ impl Rpc for K2VRpc {
pub struct K2VRpcHandler {
system: Arc,
item_table: Arc>,
+ history_table: Arc>,
+ local_counter_tree: db::Tree,
endpoint: Arc>,
subscriptions: Arc,
}
@@ -66,14 +73,21 @@ pub struct K2VRpcHandler {
impl K2VRpcHandler {
pub fn new(
system: Arc,
+ db: &db::Db,
item_table: Arc>,
+ history_table: Arc>,
subscriptions: Arc,
) -> Arc {
+ let local_counter_tree = db
+ .open_tree("k2v_local_counter")
+ .expect("Unable to open DB tree for k2v local counter");
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
let rpc_handler = Arc::new(Self {
system,
item_table,
+ history_table,
+ local_counter_tree,
endpoint,
subscriptions,
});
@@ -181,7 +195,7 @@ impl K2VRpcHandler {
Ok(())
}
- pub async fn poll(
+ pub async fn poll_item(
&self,
bucket_id: Uuid,
partition_key: String,
@@ -273,9 +287,18 @@ impl K2VRpcHandler {
}
fn local_insert(&self, item: &InsertedItem) -> Result