forked from Deuxfleurs/garage
Compare commits
3 commits
main
...
k2v-watch-
Author | SHA1 | Date | |
---|---|---|---|
32715d462e | |||
b337895fce | |||
49b5d18554 |
16 changed files with 354 additions and 95 deletions
16
flake.lock
16
flake.lock
|
@ -10,17 +10,17 @@
|
||||||
"rust-overlay": "rust-overlay"
|
"rust-overlay": "rust-overlay"
|
||||||
},
|
},
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1666087781,
|
"lastModified": 1673262828,
|
||||||
"narHash": "sha256-trKVdjMZ8mNkGfLcY5LsJJGtdV3xJDZnMVrkFjErlcs=",
|
"narHash": "sha256-pDqno5/2ghQDt4LjVt5ZUMV9pTSA5rGGdz6Skf2rBwc=",
|
||||||
"owner": "Alexis211",
|
"owner": "Alexis211",
|
||||||
"repo": "cargo2nix",
|
"repo": "cargo2nix",
|
||||||
"rev": "a7a61179b66054904ef6a195d8da736eaaa06c36",
|
"rev": "505caa32110d42ee03bd68b47031142eff9c827b",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
"owner": "Alexis211",
|
"owner": "Alexis211",
|
||||||
"repo": "cargo2nix",
|
"repo": "cargo2nix",
|
||||||
"rev": "a7a61179b66054904ef6a195d8da736eaaa06c36",
|
"rev": "505caa32110d42ee03bd68b47031142eff9c827b",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -57,17 +57,17 @@
|
||||||
},
|
},
|
||||||
"nixpkgs": {
|
"nixpkgs": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1665657542,
|
"lastModified": 1673261889,
|
||||||
"narHash": "sha256-mojxNyzbvmp8NtVtxqiHGhRfjCALLfk9i/Uup68Y5q8=",
|
"narHash": "sha256-7trMsi0z7EfYNC/Nc5EtulvChBHQAo376XRICyWr89Q=",
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"repo": "nixpkgs",
|
"repo": "nixpkgs",
|
||||||
"rev": "a3073c49bc0163fea6a121c276f526837672b555",
|
"rev": "baed728abe983508cabc99d05cccc164fe748744",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"repo": "nixpkgs",
|
"repo": "nixpkgs",
|
||||||
"rev": "a3073c49bc0163fea6a121c276f526837672b555",
|
"rev": "baed728abe983508cabc99d05cccc164fe748744",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
{
|
{
|
||||||
description = "Garage, an S3-compatible distributed object store for self-hosted deployments";
|
description = "Garage, an S3-compatible distributed object store for self-hosted deployments";
|
||||||
|
|
||||||
inputs.nixpkgs.url = "github:NixOS/nixpkgs/a3073c49bc0163fea6a121c276f526837672b555";
|
inputs.nixpkgs.url = "github:NixOS/nixpkgs/baed728abe983508cabc99d05cccc164fe748744";
|
||||||
inputs.cargo2nix = {
|
inputs.cargo2nix = {
|
||||||
# As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection
|
# As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection
|
||||||
url = "github:Alexis211/cargo2nix/a7a61179b66054904ef6a195d8da736eaaa06c36";
|
url = "github:Alexis211/cargo2nix/505caa32110d42ee03bd68b47031142eff9c827b";
|
||||||
inputs.nixpkgs.follows = "nixpkgs";
|
inputs.nixpkgs.follows = "nixpkgs";
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -3,15 +3,15 @@ rec {
|
||||||
* Fixed dependencies
|
* Fixed dependencies
|
||||||
*/
|
*/
|
||||||
pkgsSrc = fetchTarball {
|
pkgsSrc = fetchTarball {
|
||||||
# As of 2022-10-13
|
# As of 2023-01-09
|
||||||
url = "https://github.com/NixOS/nixpkgs/archive/a3073c49bc0163fea6a121c276f526837672b555.zip";
|
url = "https://github.com/NixOS/nixpkgs/archive/baed728abe983508cabc99d05cccc164fe748744.zip";
|
||||||
sha256 = "1bz632psfbpmicyzjb8b4265y50shylccvfm6ry6mgnv5hvz324s";
|
sha256 = "1m7kmcjhnj3lx7xqs0nh262c4nxs5n8p7k9g6kc4gv1k5nrcrnpf";
|
||||||
};
|
};
|
||||||
cargo2nixSrc = fetchGit {
|
cargo2nixSrc = fetchGit {
|
||||||
# As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection
|
# As of 2022-10-18: two small patches over unstable branch, one for clippy and one to fix feature detection
|
||||||
url = "https://github.com/Alexis211/cargo2nix";
|
url = "https://github.com/Alexis211/cargo2nix";
|
||||||
ref = "custom_unstable";
|
ref = "custom_unstable";
|
||||||
rev = "a7a61179b66054904ef6a195d8da736eaaa06c36";
|
rev = "505caa32110d42ee03bd68b47031142eff9c827b";
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -42,7 +42,7 @@ let
|
||||||
*/
|
*/
|
||||||
toolchainOptions =
|
toolchainOptions =
|
||||||
if target == null || target == "x86_64-unknown-linux-musl" || target == "aarch64-unknown-linux-musl" then {
|
if target == null || target == "x86_64-unknown-linux-musl" || target == "aarch64-unknown-linux-musl" then {
|
||||||
rustVersion = "1.63.0";
|
rustVersion = "1.65.0";
|
||||||
extraRustComponents = [ "clippy" ];
|
extraRustComponents = [ "clippy" ];
|
||||||
} else {
|
} else {
|
||||||
rustToolchain = pkgs.symlinkJoin {
|
rustToolchain = pkgs.symlinkJoin {
|
||||||
|
|
|
@ -7,7 +7,7 @@ pkgs.buildGoModule rec {
|
||||||
owner = "GoogleContainerTools";
|
owner = "GoogleContainerTools";
|
||||||
repo = "kaniko";
|
repo = "kaniko";
|
||||||
rev = "v${version}";
|
rev = "v${version}";
|
||||||
sha256 = "1fnclr556avxay6pvgw5ya3xbxfnf2gv4njq2hr4fd6fcjyslq5h";
|
sha256 = "TXgzO/NfLXVo5a7yyO3XYSk+9H1CwMF+vwbRx3kchQ8=";
|
||||||
};
|
};
|
||||||
|
|
||||||
vendorSha256 = null;
|
vendorSha256 = null;
|
||||||
|
|
|
@ -211,7 +211,7 @@ pub async fn handle_poll_item(
|
||||||
let item = garage
|
let item = garage
|
||||||
.k2v
|
.k2v
|
||||||
.rpc
|
.rpc
|
||||||
.poll(
|
.poll_item(
|
||||||
bucket_id,
|
bucket_id,
|
||||||
partition_key,
|
partition_key,
|
||||||
sort_key,
|
sort_key,
|
||||||
|
|
|
@ -27,7 +27,7 @@ use crate::index_counter::*;
|
||||||
use crate::key_table::*;
|
use crate::key_table::*;
|
||||||
|
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
use crate::k2v::{item_table::*, poll::*, rpc::*};
|
use crate::k2v::{history_table::*, item_table::*, sub::*, rpc::*};
|
||||||
|
|
||||||
/// An entire Garage full of data
|
/// An entire Garage full of data
|
||||||
pub struct Garage {
|
pub struct Garage {
|
||||||
|
@ -70,6 +70,8 @@ pub struct Garage {
|
||||||
pub struct GarageK2V {
|
pub struct GarageK2V {
|
||||||
/// Table containing K2V items
|
/// Table containing K2V items
|
||||||
pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
||||||
|
/// Table containing K2V modification history
|
||||||
|
pub history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>,
|
||||||
/// Indexing table containing K2V item counters
|
/// Indexing table containing K2V item counters
|
||||||
pub counter_table: Arc<IndexCounter<K2VItem>>,
|
pub counter_table: Arc<IndexCounter<K2VItem>>,
|
||||||
/// K2V RPC handler
|
/// K2V RPC handler
|
||||||
|
@ -305,22 +307,42 @@ impl GarageK2V {
|
||||||
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
|
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
|
||||||
info!("Initialize K2V counter table...");
|
info!("Initialize K2V counter table...");
|
||||||
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
|
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
|
||||||
|
|
||||||
info!("Initialize K2V subscription manager...");
|
info!("Initialize K2V subscription manager...");
|
||||||
let subscriptions = Arc::new(SubscriptionManager::new());
|
let subscriptions = Arc::new(SubscriptionManager::new());
|
||||||
|
|
||||||
info!("Initialize K2V item table...");
|
info!("Initialize K2V item table...");
|
||||||
let item_table = Table::new(
|
let item_table = Table::new(
|
||||||
K2VItemTable {
|
K2VItemTable {
|
||||||
counter_table: counter_table.clone(),
|
counter_table: counter_table.clone(),
|
||||||
subscriptions: subscriptions.clone(),
|
subscriptions: subscriptions.clone(),
|
||||||
},
|
},
|
||||||
|
meta_rep_param.clone(),
|
||||||
|
system.clone(),
|
||||||
|
db,
|
||||||
|
);
|
||||||
|
info!("Initialize K2V history table...");
|
||||||
|
let history_table = Table::new(
|
||||||
|
K2VHistoryTable {
|
||||||
|
subscriptions: subscriptions.clone(),
|
||||||
|
},
|
||||||
meta_rep_param,
|
meta_rep_param,
|
||||||
system.clone(),
|
system.clone(),
|
||||||
db,
|
db,
|
||||||
);
|
);
|
||||||
let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions);
|
|
||||||
|
info!("Initialize K2V RPC handler...");
|
||||||
|
let rpc = K2VRpcHandler::new(
|
||||||
|
system,
|
||||||
|
db,
|
||||||
|
item_table.clone(),
|
||||||
|
history_table.clone(),
|
||||||
|
subscriptions,
|
||||||
|
);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
item_table,
|
item_table,
|
||||||
|
history_table,
|
||||||
counter_table,
|
counter_table,
|
||||||
rpc,
|
rpc,
|
||||||
}
|
}
|
||||||
|
|
107
src/model/k2v/history_table.rs
Normal file
107
src/model/k2v/history_table.rs
Normal file
|
@ -0,0 +1,107 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use garage_db as db;
|
||||||
|
|
||||||
|
use garage_table::crdt::*;
|
||||||
|
use garage_table::*;
|
||||||
|
|
||||||
|
use crate::k2v::sub::*;
|
||||||
|
|
||||||
|
mod v08 {
|
||||||
|
use crate::k2v::causality::K2VNodeId;
|
||||||
|
pub use crate::k2v::item_table::v08::{DvvsValue, K2VItem, K2VItemPartition};
|
||||||
|
use garage_util::crdt;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct K2VHistoryEntry {
|
||||||
|
// Partition key: the partition key of ins_item
|
||||||
|
|
||||||
|
/// The inserted item
|
||||||
|
pub ins_item: K2VItem,
|
||||||
|
|
||||||
|
/// Sort key: the node ID and its local counter
|
||||||
|
pub node_counter: K2VHistorySortKey,
|
||||||
|
|
||||||
|
/// The value of the node's local counter before this entry was updated
|
||||||
|
pub prev_counter: u64,
|
||||||
|
/// The timesamp of the update (!= counter, counters are incremented
|
||||||
|
/// by one, timestamps are real clock timestamps)
|
||||||
|
pub timestamp: u64,
|
||||||
|
|
||||||
|
/// Mark this history entry for deletion
|
||||||
|
pub deleted: crdt::Bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct K2VHistorySortKey {
|
||||||
|
pub node: K2VNodeId,
|
||||||
|
pub counter: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl garage_util::migrate::InitialFormat for K2VHistoryEntry {
|
||||||
|
const VERSION_MARKER: &'static [u8] = b"Gk2vhe08";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub use v08::*;
|
||||||
|
|
||||||
|
impl Crdt for K2VHistoryEntry {
|
||||||
|
fn merge(&mut self, other: &Self) {
|
||||||
|
self.ins_item.merge(&other.ins_item);
|
||||||
|
self.deleted.merge(&other.deleted);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SortKey for K2VHistorySortKey {
|
||||||
|
type B<'a> = [u8; 16];
|
||||||
|
|
||||||
|
fn sort_key(&self) -> [u8; 16] {
|
||||||
|
let mut ret = [0u8; 16];
|
||||||
|
ret[0..8].copy_from_slice(&u64::to_be_bytes(self.node));
|
||||||
|
ret[8..16].copy_from_slice(&u64::to_be_bytes(self.counter));
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Entry<K2VItemPartition, K2VHistorySortKey> for K2VHistoryEntry {
|
||||||
|
fn partition_key(&self) -> &K2VItemPartition {
|
||||||
|
&self.ins_item.partition
|
||||||
|
}
|
||||||
|
fn sort_key(&self) -> &K2VHistorySortKey {
|
||||||
|
&self.node_counter
|
||||||
|
}
|
||||||
|
fn is_tombstone(&self) -> bool {
|
||||||
|
self.deleted.get()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct K2VHistoryTable {
|
||||||
|
pub(crate) subscriptions: Arc<SubscriptionManager>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TableSchema for K2VHistoryTable {
|
||||||
|
const TABLE_NAME: &'static str = "k2v_history";
|
||||||
|
|
||||||
|
type P = K2VItemPartition;
|
||||||
|
type S = K2VHistorySortKey;
|
||||||
|
type E = K2VHistoryEntry;
|
||||||
|
type Filter = DeletedFilter;
|
||||||
|
|
||||||
|
fn updated(
|
||||||
|
&self,
|
||||||
|
_tx: &mut db::Transaction,
|
||||||
|
_old: Option<&Self::E>,
|
||||||
|
new: Option<&Self::E>,
|
||||||
|
) -> db::TxOpResult<()> {
|
||||||
|
if let Some(new_ent) = new {
|
||||||
|
self.subscriptions.notify_range(new_ent);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
|
||||||
|
filter.apply(entry.deleted.get())
|
||||||
|
}
|
||||||
|
}
|
|
@ -11,14 +11,14 @@ use garage_table::*;
|
||||||
|
|
||||||
use crate::index_counter::*;
|
use crate::index_counter::*;
|
||||||
use crate::k2v::causality::*;
|
use crate::k2v::causality::*;
|
||||||
use crate::k2v::poll::*;
|
use crate::k2v::sub::*;
|
||||||
|
|
||||||
pub const ENTRIES: &str = "entries";
|
pub const ENTRIES: &str = "entries";
|
||||||
pub const CONFLICTS: &str = "conflicts";
|
pub const CONFLICTS: &str = "conflicts";
|
||||||
pub const VALUES: &str = "values";
|
pub const VALUES: &str = "values";
|
||||||
pub const BYTES: &str = "bytes";
|
pub const BYTES: &str = "bytes";
|
||||||
|
|
||||||
mod v08 {
|
pub(super) mod v08 {
|
||||||
use crate::k2v::causality::K2VNodeId;
|
use crate::k2v::causality::K2VNodeId;
|
||||||
use garage_util::data::Uuid;
|
use garage_util::data::Uuid;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
@ -73,7 +73,8 @@ impl K2VItem {
|
||||||
this_node: Uuid,
|
this_node: Uuid,
|
||||||
context: &Option<CausalContext>,
|
context: &Option<CausalContext>,
|
||||||
new_value: DvvsValue,
|
new_value: DvvsValue,
|
||||||
) {
|
node_counter: u64,
|
||||||
|
) -> u64 {
|
||||||
if let Some(context) = context {
|
if let Some(context) = context {
|
||||||
for (node, t_discard) in context.vector_clock.iter() {
|
for (node, t_discard) in context.vector_clock.iter() {
|
||||||
if let Some(e) = self.items.get_mut(node) {
|
if let Some(e) = self.items.get_mut(node) {
|
||||||
|
@ -98,7 +99,9 @@ impl K2VItem {
|
||||||
values: vec![],
|
values: vec![],
|
||||||
});
|
});
|
||||||
let t_prev = e.max_time();
|
let t_prev = e.max_time();
|
||||||
e.values.push((t_prev + 1, new_value));
|
let t_new = std::cmp::max(node_counter + 1, t_prev + 1);
|
||||||
|
e.values.push((t_new, new_value));
|
||||||
|
t_new
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract the causality context of a K2V Item
|
/// Extract the causality context of a K2V Item
|
||||||
|
@ -237,7 +240,7 @@ impl TableSchema for K2VItemTable {
|
||||||
|
|
||||||
// 2. Notify
|
// 2. Notify
|
||||||
if let Some(new_ent) = new {
|
if let Some(new_ent) = new {
|
||||||
self.subscriptions.notify(new_ent);
|
self.subscriptions.notify_item(new_ent);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
pub mod causality;
|
pub mod causality;
|
||||||
|
|
||||||
|
pub mod history_table;
|
||||||
pub mod item_table;
|
pub mod item_table;
|
||||||
|
|
||||||
pub mod poll;
|
pub(crate) mod sub;
|
||||||
|
|
||||||
pub mod rpc;
|
pub mod rpc;
|
||||||
|
|
|
@ -1,50 +0,0 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::sync::Mutex;
|
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use tokio::sync::broadcast;
|
|
||||||
|
|
||||||
use crate::k2v::item_table::*;
|
|
||||||
|
|
||||||
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
|
||||||
pub struct PollKey {
|
|
||||||
pub partition: K2VItemPartition,
|
|
||||||
pub sort_key: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct SubscriptionManager {
|
|
||||||
subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SubscriptionManager {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self::default()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
|
|
||||||
let mut subs = self.subscriptions.lock().unwrap();
|
|
||||||
if let Some(s) = subs.get(key) {
|
|
||||||
s.subscribe()
|
|
||||||
} else {
|
|
||||||
let (tx, rx) = broadcast::channel(8);
|
|
||||||
subs.insert(key.clone(), tx);
|
|
||||||
rx
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn notify(&self, item: &K2VItem) {
|
|
||||||
let key = PollKey {
|
|
||||||
partition: item.partition.clone(),
|
|
||||||
sort_key: item.sort_key.clone(),
|
|
||||||
};
|
|
||||||
let mut subs = self.subscriptions.lock().unwrap();
|
|
||||||
if let Some(s) = subs.get(&key) {
|
|
||||||
if s.send(item.clone()).is_err() {
|
|
||||||
// no more subscribers, remove channel from here
|
|
||||||
// (we will re-create it later if we need to subscribe again)
|
|
||||||
subs.remove(&key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -6,6 +6,7 @@
|
||||||
//! mean the vector clock gets much larger than needed).
|
//! mean the vector clock gets much larger than needed).
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::convert::TryInto;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -15,9 +16,12 @@ use futures::StreamExt;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
|
|
||||||
|
use garage_db as db;
|
||||||
|
|
||||||
use garage_util::crdt::*;
|
use garage_util::crdt::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
use garage_util::time::*;
|
||||||
|
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
@ -26,8 +30,9 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
|
||||||
use garage_table::{PartitionKey, Table};
|
use garage_table::{PartitionKey, Table};
|
||||||
|
|
||||||
use crate::k2v::causality::*;
|
use crate::k2v::causality::*;
|
||||||
|
use crate::k2v::history_table::*;
|
||||||
use crate::k2v::item_table::*;
|
use crate::k2v::item_table::*;
|
||||||
use crate::k2v::poll::*;
|
use crate::k2v::sub::*;
|
||||||
|
|
||||||
/// RPC messages for K2V
|
/// RPC messages for K2V
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
@ -59,6 +64,8 @@ impl Rpc for K2VRpc {
|
||||||
pub struct K2VRpcHandler {
|
pub struct K2VRpcHandler {
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
||||||
|
history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>,
|
||||||
|
local_counter_tree: db::Tree,
|
||||||
endpoint: Arc<Endpoint<K2VRpc, Self>>,
|
endpoint: Arc<Endpoint<K2VRpc, Self>>,
|
||||||
subscriptions: Arc<SubscriptionManager>,
|
subscriptions: Arc<SubscriptionManager>,
|
||||||
}
|
}
|
||||||
|
@ -66,14 +73,21 @@ pub struct K2VRpcHandler {
|
||||||
impl K2VRpcHandler {
|
impl K2VRpcHandler {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
|
db: &db::Db,
|
||||||
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
||||||
|
history_table: Arc<Table<K2VHistoryTable, TableShardedReplication>>,
|
||||||
subscriptions: Arc<SubscriptionManager>,
|
subscriptions: Arc<SubscriptionManager>,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
|
let local_counter_tree = db
|
||||||
|
.open_tree("k2v_local_counter")
|
||||||
|
.expect("Unable to open DB tree for k2v local counter");
|
||||||
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
|
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
|
||||||
|
|
||||||
let rpc_handler = Arc::new(Self {
|
let rpc_handler = Arc::new(Self {
|
||||||
system,
|
system,
|
||||||
item_table,
|
item_table,
|
||||||
|
history_table,
|
||||||
|
local_counter_tree,
|
||||||
endpoint,
|
endpoint,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
});
|
});
|
||||||
|
@ -181,7 +195,7 @@ impl K2VRpcHandler {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn poll(
|
pub async fn poll_item(
|
||||||
&self,
|
&self,
|
||||||
bucket_id: Uuid,
|
bucket_id: Uuid,
|
||||||
partition_key: String,
|
partition_key: String,
|
||||||
|
@ -273,9 +287,18 @@ impl K2VRpcHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
|
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
|
||||||
|
let now = now_msec();
|
||||||
|
|
||||||
self.item_table
|
self.item_table
|
||||||
.data
|
.data
|
||||||
.update_entry_with(&item.partition, &item.sort_key, |ent| {
|
.update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
|
||||||
|
let local_counter_key = item.partition.hash();
|
||||||
|
let old_local_counter = tx
|
||||||
|
.get(&self.local_counter_tree, &local_counter_key)?
|
||||||
|
.and_then(|x| x.try_into().ok())
|
||||||
|
.map(u64::from_be_bytes)
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
let mut ent = ent.unwrap_or_else(|| {
|
let mut ent = ent.unwrap_or_else(|| {
|
||||||
K2VItem::new(
|
K2VItem::new(
|
||||||
item.partition.bucket_id,
|
item.partition.bucket_id,
|
||||||
|
@ -283,13 +306,37 @@ impl K2VRpcHandler {
|
||||||
item.sort_key.clone(),
|
item.sort_key.clone(),
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
ent.update(self.system.id, &item.causal_context, item.value.clone());
|
let new_local_counter = ent.update(
|
||||||
ent
|
self.system.id,
|
||||||
|
&item.causal_context,
|
||||||
|
item.value.clone(),
|
||||||
|
old_local_counter,
|
||||||
|
);
|
||||||
|
|
||||||
|
tx.insert(
|
||||||
|
&self.local_counter_tree,
|
||||||
|
&local_counter_key,
|
||||||
|
u64::to_be_bytes(new_local_counter),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let hist_entry = K2VHistoryEntry {
|
||||||
|
ins_item: ent.clone(),
|
||||||
|
node_counter: K2VHistorySortKey {
|
||||||
|
node: make_node_id(self.system.id),
|
||||||
|
counter: new_local_counter,
|
||||||
|
},
|
||||||
|
prev_counter: old_local_counter,
|
||||||
|
timestamp: now,
|
||||||
|
deleted: false.into(),
|
||||||
|
};
|
||||||
|
self.history_table.queue_insert(tx, &hist_entry)?;
|
||||||
|
|
||||||
|
Ok(ent)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
|
async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
|
||||||
let mut chan = self.subscriptions.subscribe(key);
|
let mut chan = self.subscriptions.subscribe_item(key);
|
||||||
|
|
||||||
let mut value = self
|
let mut value = self
|
||||||
.item_table
|
.item_table
|
||||||
|
@ -326,7 +373,7 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
|
||||||
} => {
|
} => {
|
||||||
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
|
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
|
||||||
select! {
|
select! {
|
||||||
ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
|
ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
|
||||||
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
|
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
115
src/model/k2v/sub.rs
Normal file
115
src/model/k2v/sub.rs
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
|
use crate::k2v::history_table::*;
|
||||||
|
use crate::k2v::item_table::*;
|
||||||
|
|
||||||
|
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct PollKey {
|
||||||
|
pub partition: K2VItemPartition,
|
||||||
|
pub sort_key: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct PollRange {
|
||||||
|
pub partition: K2VItemPartition,
|
||||||
|
pub prefix: Option<String>,
|
||||||
|
pub start: Option<String>,
|
||||||
|
pub end: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct SubscriptionManager {
|
||||||
|
item_subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
|
||||||
|
range_subscriptions: Mutex<HashMap<PollRange, broadcast::Sender<K2VHistoryEntry>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SubscriptionManager {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- simple item polling ----
|
||||||
|
|
||||||
|
pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
|
||||||
|
let mut subs = self.item_subscriptions.lock().unwrap();
|
||||||
|
if let Some(s) = subs.get(key) {
|
||||||
|
s.subscribe()
|
||||||
|
} else {
|
||||||
|
let (tx, rx) = broadcast::channel(8);
|
||||||
|
subs.insert(key.clone(), tx);
|
||||||
|
rx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn notify_item(&self, item: &K2VItem) {
|
||||||
|
let key = PollKey {
|
||||||
|
partition: item.partition.clone(),
|
||||||
|
sort_key: item.sort_key.clone(),
|
||||||
|
};
|
||||||
|
let mut subs = self.item_subscriptions.lock().unwrap();
|
||||||
|
if let Some(s) = subs.get(&key) {
|
||||||
|
if s.send(item.clone()).is_err() {
|
||||||
|
// no more subscribers, remove channel from here
|
||||||
|
// (we will re-create it later if we need to subscribe again)
|
||||||
|
subs.remove(&key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- range polling ----
|
||||||
|
|
||||||
|
pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver<K2VHistoryEntry> {
|
||||||
|
let mut subs = self.range_subscriptions.lock().unwrap();
|
||||||
|
if let Some(s) = subs.get(key) {
|
||||||
|
s.subscribe()
|
||||||
|
} else {
|
||||||
|
let (tx, rx) = broadcast::channel(8);
|
||||||
|
subs.insert(key.clone(), tx);
|
||||||
|
rx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn notify_range(&self, entry: &K2VHistoryEntry) {
|
||||||
|
let mut subs = self.range_subscriptions.lock().unwrap();
|
||||||
|
let mut dead_subs = vec![];
|
||||||
|
|
||||||
|
for (sub, chan) in subs.iter() {
|
||||||
|
if sub.matches(&entry) {
|
||||||
|
if chan.send(entry.clone()).is_err() {
|
||||||
|
dead_subs.push(sub.clone());
|
||||||
|
}
|
||||||
|
} else if chan.receiver_count() == 0 {
|
||||||
|
dead_subs.push(sub.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for sub in dead_subs.iter() {
|
||||||
|
subs.remove(sub);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PollRange {
|
||||||
|
fn matches(&self, entry: &K2VHistoryEntry) -> bool {
|
||||||
|
entry.ins_item.partition == self.partition
|
||||||
|
&& self
|
||||||
|
.prefix
|
||||||
|
.as_ref()
|
||||||
|
.map(|x| entry.ins_item.sort_key.starts_with(x))
|
||||||
|
.unwrap_or(true)
|
||||||
|
&& self
|
||||||
|
.start
|
||||||
|
.as_ref()
|
||||||
|
.map(|x| entry.ins_item.sort_key >= *x)
|
||||||
|
.unwrap_or(true)
|
||||||
|
&& self
|
||||||
|
.end
|
||||||
|
.as_ref()
|
||||||
|
.map(|x| entry.ins_item.sort_key < *x)
|
||||||
|
.unwrap_or(true)
|
||||||
|
}
|
||||||
|
}
|
|
@ -181,13 +181,17 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
|
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
|
||||||
let update = self.decode_entry(update_bytes)?;
|
let update = self.decode_entry(update_bytes)?;
|
||||||
|
|
||||||
self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent {
|
self.update_entry_with(
|
||||||
Some(mut ent) => {
|
update.partition_key(),
|
||||||
ent.merge(&update);
|
update.sort_key(),
|
||||||
ent
|
|_tx, ent| match ent {
|
||||||
}
|
Some(mut ent) => {
|
||||||
None => update.clone(),
|
ent.merge(&update);
|
||||||
})?;
|
Ok(ent)
|
||||||
|
}
|
||||||
|
None => Ok(update.clone()),
|
||||||
|
},
|
||||||
|
)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,7 +199,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
&self,
|
&self,
|
||||||
partition_key: &F::P,
|
partition_key: &F::P,
|
||||||
sort_key: &F::S,
|
sort_key: &F::S,
|
||||||
f: impl Fn(Option<F::E>) -> F::E,
|
update_fn: impl Fn(&mut db::Transaction, Option<F::E>) -> db::TxResult<F::E, Error>,
|
||||||
) -> Result<Option<F::E>, Error> {
|
) -> Result<Option<F::E>, Error> {
|
||||||
let tree_key = self.tree_key(partition_key, sort_key);
|
let tree_key = self.tree_key(partition_key, sort_key);
|
||||||
|
|
||||||
|
@ -203,10 +207,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
|
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
|
||||||
Some(old_bytes) => {
|
Some(old_bytes) => {
|
||||||
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
|
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
|
||||||
let new_entry = f(Some(old_entry.clone()));
|
let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
|
||||||
(Some(old_entry), Some(old_bytes), new_entry)
|
(Some(old_entry), Some(old_bytes), new_entry)
|
||||||
}
|
}
|
||||||
None => (None, None, f(None)),
|
None => (None, None, update_fn(&mut tx, None)?),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Changed can be true in two scenarios
|
// Changed can be true in two scenarios
|
||||||
|
@ -335,6 +339,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
.map_err(Error::RmpEncode)
|
.map_err(Error::RmpEncode)
|
||||||
.map_err(db::TxError::Abort)?;
|
.map_err(db::TxError::Abort)?;
|
||||||
tx.insert(&self.insert_queue, &tree_key, new_entry)?;
|
tx.insert(&self.insert_queue, &tree_key, new_entry)?;
|
||||||
|
|
||||||
self.insert_queue_notify.notify_one();
|
self.insert_queue_notify.notify_one();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -344,7 +349,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
|
|
||||||
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
|
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
|
||||||
let mut ret = p.hash().to_vec();
|
let mut ret = p.hash().to_vec();
|
||||||
ret.extend(s.sort_key());
|
ret.extend(s.sort_key().borrow());
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,17 +31,23 @@ impl PartitionKey for FixedBytes32 {
|
||||||
|
|
||||||
/// Trait for field used to sort data
|
/// Trait for field used to sort data
|
||||||
pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
|
pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
|
||||||
|
type B<'a>: std::borrow::Borrow<[u8]>;
|
||||||
|
|
||||||
/// Get the key used to sort
|
/// Get the key used to sort
|
||||||
fn sort_key(&self) -> &[u8];
|
fn sort_key(&self) -> Self::B<'_>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SortKey for String {
|
impl SortKey for String {
|
||||||
|
type B<'a> = &'a [u8];
|
||||||
|
|
||||||
fn sort_key(&self) -> &[u8] {
|
fn sort_key(&self) -> &[u8] {
|
||||||
self.as_bytes()
|
self.as_bytes()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SortKey for FixedBytes32 {
|
impl SortKey for FixedBytes32 {
|
||||||
|
type B<'a> = &'a [u8];
|
||||||
|
|
||||||
fn sort_key(&self) -> &[u8] {
|
fn sort_key(&self) -> &[u8] {
|
||||||
self.as_slice()
|
self.as_slice()
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,8 @@ use crate::schema::*;
|
||||||
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
pub struct EmptyKey;
|
pub struct EmptyKey;
|
||||||
impl SortKey for EmptyKey {
|
impl SortKey for EmptyKey {
|
||||||
|
type B<'a> = &'a [u8];
|
||||||
|
|
||||||
fn sort_key(&self) -> &[u8] {
|
fn sort_key(&self) -> &[u8] {
|
||||||
&[]
|
&[]
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue