K2V PollRange, version 2 #471

Merged
lx merged 18 commits from k2v-watch-range-2 into main 2023-01-26 16:19:05 +00:00
7 changed files with 117 additions and 59 deletions
Showing only changes of commit a48e2e0cb2 - Show all commits

View file

@ -211,7 +211,7 @@ pub async fn handle_poll_item(
let item = garage let item = garage
.k2v .k2v
.rpc .rpc
.poll( .poll_item(
bucket_id, bucket_id,
partition_key, partition_key,
sort_key, sort_key,

View file

@ -27,7 +27,7 @@ use crate::index_counter::*;
use crate::key_table::*; use crate::key_table::*;
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
use crate::k2v::{item_table::*, poll::*, rpc::*}; use crate::k2v::{item_table::*, rpc::*, sub::*};
/// An entire Garage full of data /// An entire Garage full of data
pub struct Garage { pub struct Garage {

View file

@ -11,7 +11,7 @@ use garage_table::*;
use crate::index_counter::*; use crate::index_counter::*;
use crate::k2v::causality::*; use crate::k2v::causality::*;
use crate::k2v::poll::*; use crate::k2v::sub::*;
pub const ENTRIES: &str = "entries"; pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &str = "conflicts"; pub const CONFLICTS: &str = "conflicts";

View file

@ -2,5 +2,6 @@ pub mod causality;
pub mod item_table; pub mod item_table;
pub mod poll;
pub mod rpc; pub mod rpc;
pub(crate) mod sub;

View file

@ -1,50 +0,0 @@
use std::collections::HashMap;
use std::sync::Mutex;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use crate::k2v::item_table::*;
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PollKey {
pub partition: K2VItemPartition,
pub sort_key: String,
}
#[derive(Default)]
pub struct SubscriptionManager {
subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
}
impl SubscriptionManager {
pub fn new() -> Self {
Self::default()
}
pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
let mut subs = self.subscriptions.lock().unwrap();
if let Some(s) = subs.get(key) {
s.subscribe()
} else {
let (tx, rx) = broadcast::channel(8);
subs.insert(key.clone(), tx);
rx
}
}
pub fn notify(&self, item: &K2VItem) {
let key = PollKey {
partition: item.partition.clone(),
sort_key: item.sort_key.clone(),
};
let mut subs = self.subscriptions.lock().unwrap();
if let Some(s) = subs.get(&key) {
if s.send(item.clone()).is_err() {
// no more subscribers, remove channel from here
// (we will re-create it later if we need to subscribe again)
subs.remove(&key);
}
}
}
}

View file

@ -27,7 +27,7 @@ use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*; use crate::k2v::causality::*;
use crate::k2v::item_table::*; use crate::k2v::item_table::*;
use crate::k2v::poll::*; use crate::k2v::sub::*;
/// RPC messages for K2V /// RPC messages for K2V
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -181,7 +181,7 @@ impl K2VRpcHandler {
Ok(()) Ok(())
} }
pub async fn poll( pub async fn poll_item(
&self, &self,
bucket_id: Uuid, bucket_id: Uuid,
partition_key: String, partition_key: String,
@ -288,8 +288,8 @@ impl K2VRpcHandler {
}) })
} }
async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> { async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
let mut chan = self.subscriptions.subscribe(key); let mut chan = self.subscriptions.subscribe_item(key);
let mut value = self let mut value = self
.item_table .item_table
@ -326,7 +326,7 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
} => { } => {
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
select! { select! {
ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
_ = delay => Ok(K2VRpc::PollItemResponse(None)), _ = delay => Ok(K2VRpc::PollItemResponse(None)),
} }
} }

107
src/model/k2v/sub.rs Normal file
View file

@ -0,0 +1,107 @@
use std::collections::HashMap;
use std::sync::Mutex;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use crate::k2v::item_table::*;
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PollKey {
pub partition: K2VItemPartition,
pub sort_key: String,
}
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PollRange {
pub partition: K2VItemPartition,
pub prefix: Option<String>,
pub start: Option<String>,
pub end: Option<String>,
}
#[derive(Default)]
pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>);
#[derive(Default)]
pub struct SubscriptionManagerInner {
item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>,
range_subscriptions: HashMap<PollRange, broadcast::Sender<K2VItem>>,
}
impl SubscriptionManager {
pub fn new() -> Self {
Self::default()
}
pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
let mut inner = self.0.lock().unwrap();
if let Some(s) = inner.item_subscriptions.get(key) {
s.subscribe()
} else {
let (tx, rx) = broadcast::channel(8);
inner.item_subscriptions.insert(key.clone(), tx);
rx
}
}
pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver<K2VItem> {
let mut inner = self.0.lock().unwrap();
if let Some(s) = inner.range_subscriptions.get(key) {
s.subscribe()
} else {
let (tx, rx) = broadcast::channel(8);
inner.range_subscriptions.insert(key.clone(), tx);
rx
}
}
pub fn notify(&self, item: &K2VItem) {
let mut inner = self.0.lock().unwrap();
// 1. Notify single item subscribers,
// removing subscriptions with no more listeners if any
let key = PollKey {
partition: item.partition.clone(),
sort_key: item.sort_key.clone(),
};
if let Some(s) = inner.item_subscriptions.get(&key) {
if s.send(item.clone()).is_err() {
// no more subscribers, remove channel from here
// (we will re-create it later if we need to subscribe again)
inner.item_subscriptions.remove(&key);
}
}
// 2. Notify range subscribers,
// removing subscriptions with no more listeners if any
inner.range_subscriptions.retain(|sub, chan| {
if sub.matches(&item) {
chan.send(item.clone()).is_ok()
} else {
chan.receiver_count() != 0
}
});
}
}
impl PollRange {
fn matches(&self, item: &K2VItem) -> bool {
item.partition == self.partition
&& self
.prefix
.as_ref()
.map(|x| item.sort_key.starts_with(x))
.unwrap_or(true)
&& self
.start
.as_ref()
.map(|x| item.sort_key >= *x)
.unwrap_or(true)
&& self
.end
.as_ref()
.map(|x| item.sort_key < *x)
.unwrap_or(true)
}
}