diff --git a/flake.lock b/flake.lock index 4b5713dc..f1f78b55 100644 --- a/flake.lock +++ b/flake.lock @@ -10,17 +10,17 @@ "rust-overlay": "rust-overlay" }, "locked": { - "lastModified": 1666087781, - "narHash": "sha256-trKVdjMZ8mNkGfLcY5LsJJGtdV3xJDZnMVrkFjErlcs=", + "lastModified": 1673262828, + "narHash": "sha256-pDqno5/2ghQDt4LjVt5ZUMV9pTSA5rGGdz6Skf2rBwc=", "owner": "Alexis211", "repo": "cargo2nix", - "rev": "a7a61179b66054904ef6a195d8da736eaaa06c36", + "rev": "505caa32110d42ee03bd68b47031142eff9c827b", "type": "github" }, "original": { "owner": "Alexis211", "repo": "cargo2nix", - "rev": "a7a61179b66054904ef6a195d8da736eaaa06c36", + "rev": "505caa32110d42ee03bd68b47031142eff9c827b", "type": "github" } }, @@ -57,17 +57,17 @@ }, "nixpkgs": { "locked": { - "lastModified": 1665657542, - "narHash": "sha256-mojxNyzbvmp8NtVtxqiHGhRfjCALLfk9i/Uup68Y5q8=", + "lastModified": 1673261889, + "narHash": "sha256-7trMsi0z7EfYNC/Nc5EtulvChBHQAo376XRICyWr89Q=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "a3073c49bc0163fea6a121c276f526837672b555", + "rev": "baed728abe983508cabc99d05cccc164fe748744", "type": "github" }, "original": { "owner": "NixOS", "repo": "nixpkgs", - "rev": "a3073c49bc0163fea6a121c276f526837672b555", + "rev": "baed728abe983508cabc99d05cccc164fe748744", "type": "github" } }, diff --git a/flake.nix b/flake.nix index 7d152195..2222895d 100644 --- a/flake.nix +++ b/flake.nix @@ -1,10 +1,10 @@ { description = "Garage, an S3-compatible distributed object store for self-hosted deployments"; - inputs.nixpkgs.url = "github:NixOS/nixpkgs/a3073c49bc0163fea6a121c276f526837672b555"; + inputs.nixpkgs.url = "github:NixOS/nixpkgs/baed728abe983508cabc99d05cccc164fe748744"; inputs.cargo2nix = { # As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection - url = "github:Alexis211/cargo2nix/a7a61179b66054904ef6a195d8da736eaaa06c36"; + url = "github:Alexis211/cargo2nix/505caa32110d42ee03bd68b47031142eff9c827b"; inputs.nixpkgs.follows = "nixpkgs"; }; diff --git a/nix/common.nix b/nix/common.nix index 90e3afaf..80b89102 100644 --- a/nix/common.nix +++ b/nix/common.nix @@ -3,15 +3,15 @@ rec { * Fixed dependencies */ pkgsSrc = fetchTarball { - # As of 2022-10-13 - url = "https://github.com/NixOS/nixpkgs/archive/a3073c49bc0163fea6a121c276f526837672b555.zip"; - sha256 = "1bz632psfbpmicyzjb8b4265y50shylccvfm6ry6mgnv5hvz324s"; + # As of 2023-01-09 + url = "https://github.com/NixOS/nixpkgs/archive/baed728abe983508cabc99d05cccc164fe748744.zip"; + sha256 = "1m7kmcjhnj3lx7xqs0nh262c4nxs5n8p7k9g6kc4gv1k5nrcrnpf"; }; cargo2nixSrc = fetchGit { # As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection url = "https://github.com/Alexis211/cargo2nix"; ref = "custom_unstable"; - rev = "a7a61179b66054904ef6a195d8da736eaaa06c36"; + rev = "505caa32110d42ee03bd68b47031142eff9c827b"; }; /* diff --git a/nix/compile.nix b/nix/compile.nix index 3ea5035e..5c6b02e3 100644 --- a/nix/compile.nix +++ b/nix/compile.nix @@ -42,7 +42,7 @@ let */ toolchainOptions = if target == null || target == "x86_64-unknown-linux-musl" || target == "aarch64-unknown-linux-musl" then { - rustVersion = "1.63.0"; + rustVersion = "1.65.0"; extraRustComponents = [ "clippy" ]; } else { rustToolchain = pkgs.symlinkJoin { diff --git a/nix/kaniko.nix b/nix/kaniko.nix index 140328b8..e27aaee3 100644 --- a/nix/kaniko.nix +++ b/nix/kaniko.nix @@ -7,7 +7,7 @@ pkgs.buildGoModule rec { owner = "GoogleContainerTools"; repo = "kaniko"; rev = "v${version}"; - sha256 = "1fnclr556avxay6pvgw5ya3xbxfnf2gv4njq2hr4fd6fcjyslq5h"; + sha256 = "TXgzO/NfLXVo5a7yyO3XYSk+9H1CwMF+vwbRx3kchQ8="; }; vendorSha256 = null; diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index f85138c7..9b78bc07 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -211,7 +211,7 @@ pub async fn handle_poll_item( let item = garage .k2v .rpc - .poll( + .poll_item( bucket_id, partition_key, sort_key, diff --git a/src/model/garage.rs b/src/model/garage.rs index ac1846ce..c0ffdd31 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -27,7 +27,7 @@ use crate::index_counter::*; use crate::key_table::*; #[cfg(feature = "k2v")] -use crate::k2v::{item_table::*, poll::*, rpc::*}; +use crate::k2v::{history_table::*, item_table::*, sub::*, rpc::*}; /// An entire Garage full of data pub struct Garage { @@ -70,6 +70,8 @@ pub struct Garage { pub struct GarageK2V { /// Table containing K2V items pub item_table: Arc>, + /// Table containing K2V modification history + pub history_table: Arc>, /// Indexing table containing K2V item counters pub counter_table: Arc>, /// K2V RPC handler @@ -305,22 +307,42 @@ impl GarageK2V { fn new(system: Arc, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { info!("Initialize K2V counter table..."); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); + info!("Initialize K2V subscription manager..."); let subscriptions = Arc::new(SubscriptionManager::new()); + info!("Initialize K2V item table..."); let item_table = Table::new( K2VItemTable { counter_table: counter_table.clone(), subscriptions: subscriptions.clone(), }, + meta_rep_param.clone(), + system.clone(), + db, + ); + info!("Initialize K2V history table..."); + let history_table = Table::new( + K2VHistoryTable { + subscriptions: subscriptions.clone(), + }, meta_rep_param, system.clone(), db, ); - let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions); + + info!("Initialize K2V RPC handler..."); + let rpc = K2VRpcHandler::new( + system, + db, + item_table.clone(), + history_table.clone(), + subscriptions, + ); Self { item_table, + history_table, counter_table, rpc, } diff --git a/src/model/k2v/history_table.rs b/src/model/k2v/history_table.rs new file mode 100644 index 00000000..9df03f5d --- /dev/null +++ b/src/model/k2v/history_table.rs @@ -0,0 +1,107 @@ +use std::sync::Arc; + +use garage_db as db; + +use garage_table::crdt::*; +use garage_table::*; + +use crate::k2v::sub::*; + +mod v08 { + use crate::k2v::causality::K2VNodeId; + pub use crate::k2v::item_table::v08::{DvvsValue, K2VItem, K2VItemPartition}; + use garage_util::crdt; + use serde::{Deserialize, Serialize}; + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct K2VHistoryEntry { + // Partition key: the partition key of ins_item + + /// The inserted item + pub ins_item: K2VItem, + + /// Sort key: the node ID and its local counter + pub node_counter: K2VHistorySortKey, + + /// The value of the node's local counter before this entry was updated + pub prev_counter: u64, + /// The timesamp of the update (!= counter, counters are incremented + /// by one, timestamps are real clock timestamps) + pub timestamp: u64, + + /// Mark this history entry for deletion + pub deleted: crdt::Bool, + } + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct K2VHistorySortKey { + pub node: K2VNodeId, + pub counter: u64, + } + + impl garage_util::migrate::InitialFormat for K2VHistoryEntry { + const VERSION_MARKER: &'static [u8] = b"Gk2vhe08"; + } +} + +pub use v08::*; + +impl Crdt for K2VHistoryEntry { + fn merge(&mut self, other: &Self) { + self.ins_item.merge(&other.ins_item); + self.deleted.merge(&other.deleted); + } +} + +impl SortKey for K2VHistorySortKey { + type B<'a> = [u8; 16]; + + fn sort_key(&self) -> [u8; 16] { + let mut ret = [0u8; 16]; + ret[0..8].copy_from_slice(&u64::to_be_bytes(self.node)); + ret[8..16].copy_from_slice(&u64::to_be_bytes(self.counter)); + ret + } +} + +impl Entry for K2VHistoryEntry { + fn partition_key(&self) -> &K2VItemPartition { + &self.ins_item.partition + } + fn sort_key(&self) -> &K2VHistorySortKey { + &self.node_counter + } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} + +pub struct K2VHistoryTable { + pub(crate) subscriptions: Arc, +} + +impl TableSchema for K2VHistoryTable { + const TABLE_NAME: &'static str = "k2v_history"; + + type P = K2VItemPartition; + type S = K2VHistorySortKey; + type E = K2VHistoryEntry; + type Filter = DeletedFilter; + + fn updated( + &self, + _tx: &mut db::Transaction, + _old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + if let Some(new_ent) = new { + self.subscriptions.notify_range(new_ent); + } + + Ok(()) + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + filter.apply(entry.deleted.get()) + } +} diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index ce3e4129..cf60fd3f 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -11,14 +11,14 @@ use garage_table::*; use crate::index_counter::*; use crate::k2v::causality::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; pub const ENTRIES: &str = "entries"; pub const CONFLICTS: &str = "conflicts"; pub const VALUES: &str = "values"; pub const BYTES: &str = "bytes"; -mod v08 { +pub(super) mod v08 { use crate::k2v::causality::K2VNodeId; use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; @@ -73,7 +73,8 @@ impl K2VItem { this_node: Uuid, context: &Option, new_value: DvvsValue, - ) { + node_counter: u64, + ) -> u64 { if let Some(context) = context { for (node, t_discard) in context.vector_clock.iter() { if let Some(e) = self.items.get_mut(node) { @@ -98,7 +99,9 @@ impl K2VItem { values: vec![], }); let t_prev = e.max_time(); - e.values.push((t_prev + 1, new_value)); + let t_new = std::cmp::max(node_counter + 1, t_prev + 1); + e.values.push((t_new, new_value)); + t_new } /// Extract the causality context of a K2V Item @@ -237,7 +240,7 @@ impl TableSchema for K2VItemTable { // 2. Notify if let Some(new_ent) = new { - self.subscriptions.notify(new_ent); + self.subscriptions.notify_item(new_ent); } Ok(()) diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index f6a96151..4f7de5b7 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -1,6 +1,8 @@ pub mod causality; +pub mod history_table; pub mod item_table; -pub mod poll; +pub(crate) mod sub; + pub mod rpc; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs deleted file mode 100644 index 93105207..00000000 --- a/src/model/k2v/poll.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::collections::HashMap; -use std::sync::Mutex; - -use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast; - -use crate::k2v::item_table::*; - -#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PollKey { - pub partition: K2VItemPartition, - pub sort_key: String, -} - -#[derive(Default)] -pub struct SubscriptionManager { - subscriptions: Mutex>>, -} - -impl SubscriptionManager { - pub fn new() -> Self { - Self::default() - } - - pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver { - let mut subs = self.subscriptions.lock().unwrap(); - if let Some(s) = subs.get(key) { - s.subscribe() - } else { - let (tx, rx) = broadcast::channel(8); - subs.insert(key.clone(), tx); - rx - } - } - - pub fn notify(&self, item: &K2VItem) { - let key = PollKey { - partition: item.partition.clone(), - sort_key: item.sort_key.clone(), - }; - let mut subs = self.subscriptions.lock().unwrap(); - if let Some(s) = subs.get(&key) { - if s.send(item.clone()).is_err() { - // no more subscribers, remove channel from here - // (we will re-create it later if we need to subscribe again) - subs.remove(&key); - } - } - } -} diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index f64a7984..c3cb5f9f 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -6,6 +6,7 @@ //! mean the vector clock gets much larger than needed). use std::collections::HashMap; +use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; @@ -15,9 +16,12 @@ use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::select; +use garage_db as db; + use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_rpc::system::System; use garage_rpc::*; @@ -26,8 +30,9 @@ use garage_table::replication::{TableReplication, TableShardedReplication}; use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; +use crate::k2v::history_table::*; use crate::k2v::item_table::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; /// RPC messages for K2V #[derive(Debug, Serialize, Deserialize)] @@ -59,6 +64,8 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc, item_table: Arc>, + history_table: Arc>, + local_counter_tree: db::Tree, endpoint: Arc>, subscriptions: Arc, } @@ -66,14 +73,21 @@ pub struct K2VRpcHandler { impl K2VRpcHandler { pub fn new( system: Arc, + db: &db::Db, item_table: Arc>, + history_table: Arc>, subscriptions: Arc, ) -> Arc { + let local_counter_tree = db + .open_tree("k2v_local_counter") + .expect("Unable to open DB tree for k2v local counter"); let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); let rpc_handler = Arc::new(Self { system, item_table, + history_table, + local_counter_tree, endpoint, subscriptions, }); @@ -181,7 +195,7 @@ impl K2VRpcHandler { Ok(()) } - pub async fn poll( + pub async fn poll_item( &self, bucket_id: Uuid, partition_key: String, @@ -273,9 +287,18 @@ impl K2VRpcHandler { } fn local_insert(&self, item: &InsertedItem) -> Result, Error> { + let now = now_msec(); + self.item_table .data - .update_entry_with(&item.partition, &item.sort_key, |ent| { + .update_entry_with(&item.partition, &item.sort_key, |tx, ent| { + let local_counter_key = item.partition.hash(); + let old_local_counter = tx + .get(&self.local_counter_tree, &local_counter_key)? + .and_then(|x| x.try_into().ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); + let mut ent = ent.unwrap_or_else(|| { K2VItem::new( item.partition.bucket_id, @@ -283,13 +306,37 @@ impl K2VRpcHandler { item.sort_key.clone(), ) }); - ent.update(self.system.id, &item.causal_context, item.value.clone()); - ent + let new_local_counter = ent.update( + self.system.id, + &item.causal_context, + item.value.clone(), + old_local_counter, + ); + + tx.insert( + &self.local_counter_tree, + &local_counter_key, + u64::to_be_bytes(new_local_counter), + )?; + + let hist_entry = K2VHistoryEntry { + ins_item: ent.clone(), + node_counter: K2VHistorySortKey { + node: make_node_id(self.system.id), + counter: new_local_counter, + }, + prev_counter: old_local_counter, + timestamp: now, + deleted: false.into(), + }; + self.history_table.queue_insert(tx, &hist_entry)?; + + Ok(ent) }) } - async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result { - let mut chan = self.subscriptions.subscribe(key); + async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result { + let mut chan = self.subscriptions.subscribe_item(key); let mut value = self .item_table @@ -326,7 +373,7 @@ impl EndpointHandler for K2VRpcHandler { } => { let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); select! { - ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), + ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), _ = delay => Ok(K2VRpc::PollItemResponse(None)), } } diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs new file mode 100644 index 00000000..1dcca4d5 --- /dev/null +++ b/src/model/k2v/sub.rs @@ -0,0 +1,115 @@ +use std::collections::HashMap; +use std::sync::Mutex; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +use crate::k2v::history_table::*; +use crate::k2v::item_table::*; + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollKey { + pub partition: K2VItemPartition, + pub sort_key: String, +} + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollRange { + pub partition: K2VItemPartition, + pub prefix: Option, + pub start: Option, + pub end: Option, +} + +#[derive(Default)] +pub struct SubscriptionManager { + item_subscriptions: Mutex>>, + range_subscriptions: Mutex>>, +} + +impl SubscriptionManager { + pub fn new() -> Self { + Self::default() + } + + // ---- simple item polling ---- + + pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver { + let mut subs = self.item_subscriptions.lock().unwrap(); + if let Some(s) = subs.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + subs.insert(key.clone(), tx); + rx + } + } + + pub fn notify_item(&self, item: &K2VItem) { + let key = PollKey { + partition: item.partition.clone(), + sort_key: item.sort_key.clone(), + }; + let mut subs = self.item_subscriptions.lock().unwrap(); + if let Some(s) = subs.get(&key) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + subs.remove(&key); + } + } + } + + // ---- range polling ---- + + pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver { + let mut subs = self.range_subscriptions.lock().unwrap(); + if let Some(s) = subs.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + subs.insert(key.clone(), tx); + rx + } + } + + pub fn notify_range(&self, entry: &K2VHistoryEntry) { + let mut subs = self.range_subscriptions.lock().unwrap(); + let mut dead_subs = vec![]; + + for (sub, chan) in subs.iter() { + if sub.matches(&entry) { + if chan.send(entry.clone()).is_err() { + dead_subs.push(sub.clone()); + } + } else if chan.receiver_count() == 0 { + dead_subs.push(sub.clone()); + } + } + + for sub in dead_subs.iter() { + subs.remove(sub); + } + } +} + +impl PollRange { + fn matches(&self, entry: &K2VHistoryEntry) -> bool { + entry.ins_item.partition == self.partition + && self + .prefix + .as_ref() + .map(|x| entry.ins_item.sort_key.starts_with(x)) + .unwrap_or(true) + && self + .start + .as_ref() + .map(|x| entry.ins_item.sort_key >= *x) + .unwrap_or(true) + && self + .end + .as_ref() + .map(|x| entry.ins_item.sort_key < *x) + .unwrap_or(true) + } +} diff --git a/src/table/data.rs b/src/table/data.rs index 5c792f1f..d19586f3 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -181,13 +181,17 @@ impl TableData { pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { let update = self.decode_entry(update_bytes)?; - self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent { - Some(mut ent) => { - ent.merge(&update); - ent - } - None => update.clone(), - })?; + self.update_entry_with( + update.partition_key(), + update.sort_key(), + |_tx, ent| match ent { + Some(mut ent) => { + ent.merge(&update); + Ok(ent) + } + None => Ok(update.clone()), + }, + )?; Ok(()) } @@ -195,7 +199,7 @@ impl TableData { &self, partition_key: &F::P, sort_key: &F::S, - f: impl Fn(Option) -> F::E, + update_fn: impl Fn(&mut db::Transaction, Option) -> db::TxResult, ) -> Result, Error> { let tree_key = self.tree_key(partition_key, sort_key); @@ -203,10 +207,10 @@ impl TableData { let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { Some(old_bytes) => { let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; - let new_entry = f(Some(old_entry.clone())); + let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?; (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, f(None)), + None => (None, None, update_fn(&mut tx, None)?), }; // Changed can be true in two scenarios @@ -335,6 +339,7 @@ impl TableData { .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; tx.insert(&self.insert_queue, &tree_key, new_entry)?; + self.insert_queue_notify.notify_one(); Ok(()) @@ -344,7 +349,7 @@ impl TableData { pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { let mut ret = p.hash().to_vec(); - ret.extend(s.sort_key()); + ret.extend(s.sort_key().borrow()); ret } diff --git a/src/table/schema.rs b/src/table/schema.rs index 5cbf6c95..86ff816a 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -31,17 +31,23 @@ impl PartitionKey for FixedBytes32 { /// Trait for field used to sort data pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { + type B<'a>: std::borrow::Borrow<[u8]>; + /// Get the key used to sort - fn sort_key(&self) -> &[u8]; + fn sort_key(&self) -> Self::B<'_>; } impl SortKey for String { + type B<'a> = &'a [u8]; + fn sort_key(&self) -> &[u8] { self.as_bytes() } } impl SortKey for FixedBytes32 { + type B<'a> = &'a [u8]; + fn sort_key(&self) -> &[u8] { self.as_slice() } diff --git a/src/table/util.rs b/src/table/util.rs index 0b10cf3f..a79e2cba 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -7,6 +7,8 @@ use crate::schema::*; #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct EmptyKey; impl SortKey for EmptyKey { + type B<'a> = &'a [u8]; + fn sort_key(&self) -> &[u8] { &[] }