diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index edfd9da8..cad0fc4a 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -142,6 +142,23 @@ impl ApiHandler for K2VApiServer { partition_key, sort_key, } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await, + Endpoint::PollItem { + partition_key, + sort_key, + causality_token, + timeout, + } => { + handle_poll_item( + garage, + &req, + bucket_id, + partition_key, + sort_key, + causality_token, + timeout, + ) + .await + } Endpoint::ReadIndex { prefix, start, @@ -151,8 +168,7 @@ impl ApiHandler for K2VApiServer { Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await, - //TODO - endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), + Endpoint::Options => unreachable!(), }; // If request was a success and we have a CORS rule that applies to it, diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index c27fdb6c..9284f00f 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -198,7 +198,7 @@ async fn handle_delete_batch_query( .filter(|e| K2VItemTable::matches_filter(e, &filter)); match item { Some(i) => { - let cc = i.causality_context(); + let cc = i.causal_context(); garage .k2v_rpc .insert( @@ -230,7 +230,7 @@ async fn handle_delete_batch_query( let items = items .into_iter() .map(|i| { - let cc = i.causality_context(); + let cc = i.causal_context(); ( i.partition.partition_key, i.sort_key, @@ -313,7 +313,7 @@ struct ReadBatchResponseItem { impl ReadBatchResponseItem { fn from(i: K2VItem) -> Self { - let ct = i.causality_context().serialize(); + let ct = i.causal_context().serialize(); let v = i .values() .iter() diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 63022320..7b340fe8 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -46,7 +46,7 @@ impl ReturnFormat { return Err(Error::NoSuchKey); } - let ct = item.causality_context().serialize(); + let ct = item.causal_context().serialize(); match self { Self::Binary if vals.len() > 1 => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) @@ -186,3 +186,36 @@ pub async fn handle_delete_item( .status(StatusCode::NO_CONTENT) .body(Body::empty())?) } + +/// Handle ReadItem request +#[allow(clippy::ptr_arg)] +pub async fn handle_poll_item( + garage: Arc, + req: &Request, + bucket_id: Uuid, + partition_key: String, + sort_key: String, + causality_token: String, + timeout_secs: Option, +) -> Result, Error> { + let format = ReturnFormat::from(req)?; + + let item = garage + .k2v_rpc + .poll( + bucket_id, + partition_key, + sort_key, + causality_token, + timeout_secs.unwrap_or(300) * 1000, + ) + .await?; + + if let Some(item) = item { + format.make_response(&item) + } else { + Ok(Response::builder() + .status(StatusCode::NOT_MODIFIED) + .body(Body::empty())?) + } +} diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs index f545fab7..204051e2 100644 --- a/src/api/k2v/router.rs +++ b/src/api/k2v/router.rs @@ -30,6 +30,7 @@ pub enum Endpoint { partition_key: String, sort_key: String, causality_token: String, + timeout: Option, }, ReadBatch { }, @@ -96,7 +97,7 @@ impl Endpoint { @gen_parser (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), key: [ - EMPTY if causality_token => PollItem (query::sort_key, query::causality_token), + EMPTY if causality_token => PollItem (query::sort_key, query::causality_token, opt_parse::timeout), EMPTY => ReadItem (query::sort_key), ], no_key: [ @@ -235,7 +236,8 @@ generateQueryParameters! { "causality_token" => causality_token, "end" => end, "limit" => limit, - "sort_key" => sort_key + "sort_key" => sort_key, + "timeout" => timeout } mod keywords { diff --git a/src/model/garage.rs b/src/model/garage.rs index 0ea4bc4a..164c298e 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -15,6 +15,7 @@ use garage_table::*; use crate::k2v::counter_table::*; use crate::k2v::item_table::*; +use crate::k2v::poll::*; use crate::k2v::rpc::*; use crate::s3::block_ref_table::*; use crate::s3::object_table::*; @@ -158,16 +159,21 @@ impl Garage { ); // ---- K2V ---- + info!("Initialize K2V counter table..."); let k2v_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); + info!("Initialize K2V subscription manager..."); + let k2v_subscriptions = Arc::new(SubscriptionManager::new()); + info!("Initialize K2V item table..."); let k2v_item_table = Table::new( K2VItemTable { counter_table: k2v_counter_table.clone(), + subscriptions: k2v_subscriptions.clone(), }, meta_rep_param, system.clone(), &db, ); - let k2v_rpc = K2VRpcHandler::new(system.clone(), k2v_item_table.clone()); + let k2v_rpc = K2VRpcHandler::new(system.clone(), k2v_item_table.clone(), k2v_subscriptions); info!("Initialize Garage..."); diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 3e7d4a46..03717a06 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -74,6 +74,12 @@ impl CausalContext { Ok(ret) } + /// Check if this causal context contains newer items than another one + pub fn is_newer_than(&self, other: &Self) -> bool { + self.vector_clock + .iter() + .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0)) + } } #[cfg(test)] diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 1614a008..46aa258c 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -10,6 +10,7 @@ use garage_table::*; use crate::index_counter::*; use crate::k2v::causality::*; use crate::k2v::counter_table::*; +use crate::k2v::poll::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct K2VItem { @@ -19,7 +20,7 @@ pub struct K2VItem { items: BTreeMap, } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)] pub struct K2VItemPartition { pub bucket_id: Uuid, pub partition_key: String, @@ -84,7 +85,7 @@ impl K2VItem { } /// Extract the causality context of a K2V Item - pub fn causality_context(&self) -> CausalContext { + pub fn causal_context(&self) -> CausalContext { let mut cc = CausalContext::new_empty(); for (node, ent) in self.items.iter() { cc.vector_clock.insert(*node, ent.max_time()); @@ -201,6 +202,7 @@ impl Entry for K2VItem { pub struct K2VItemTable { pub(crate) counter_table: Arc>, + pub(crate) subscriptions: Arc, } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -246,6 +248,10 @@ impl TableSchema for K2VItemTable { ) { error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); } + + if let Some(new_ent) = new { + self.subscriptions.notify(new_ent); + } } #[allow(clippy::nonminimal_bool)] diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index cfac965b..664172a6 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -3,4 +3,5 @@ pub mod causality; pub mod counter_table; pub mod item_table; +pub mod poll; pub mod rpc; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs new file mode 100644 index 00000000..93105207 --- /dev/null +++ b/src/model/k2v/poll.rs @@ -0,0 +1,50 @@ +use std::collections::HashMap; +use std::sync::Mutex; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +use crate::k2v::item_table::*; + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollKey { + pub partition: K2VItemPartition, + pub sort_key: String, +} + +#[derive(Default)] +pub struct SubscriptionManager { + subscriptions: Mutex>>, +} + +impl SubscriptionManager { + pub fn new() -> Self { + Self::default() + } + + pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver { + let mut subs = self.subscriptions.lock().unwrap(); + if let Some(s) = subs.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + subs.insert(key.clone(), tx); + rx + } + } + + pub fn notify(&self, item: &K2VItem) { + let key = PollKey { + partition: item.partition.clone(), + sort_key: item.sort_key.clone(), + }; + let mut subs = self.subscriptions.lock().unwrap(); + if let Some(s) = subs.get(&key) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + subs.remove(&key); + } + } + } +} diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 25b02085..f016cb8c 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -7,12 +7,15 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::StreamExt; use serde::{Deserialize, Serialize}; +use tokio::select; +use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::*; @@ -25,6 +28,7 @@ use garage_table::{PartitionKey, Table}; use crate::k2v::causality::*; use crate::k2v::item_table::*; +use crate::k2v::poll::*; /// RPC messages for K2V #[derive(Debug, Serialize, Deserialize)] @@ -32,6 +36,12 @@ enum K2VRpc { Ok, InsertItem(InsertedItem), InsertManyItems(Vec), + PollItem { + key: PollKey, + causal_context: String, + timeout_msec: u64, + }, + PollItemResponse(Option), } #[derive(Debug, Serialize, Deserialize)] @@ -51,12 +61,14 @@ pub struct K2VRpcHandler { system: Arc, item_table: Arc>, endpoint: Arc>, + subscriptions: Arc, } impl K2VRpcHandler { pub fn new( system: Arc, item_table: Arc>, + subscriptions: Arc, ) -> Arc { let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); @@ -64,6 +76,7 @@ impl K2VRpcHandler { system, item_table, endpoint, + subscriptions, }); rpc_handler.endpoint.set_handler(rpc_handler.clone()); @@ -171,6 +184,64 @@ impl K2VRpcHandler { Ok(()) } + pub async fn poll( + &self, + bucket_id: Uuid, + partition_key: String, + sort_key: String, + causal_context: String, + timeout_msec: u64, + ) -> Result, Error> { + let poll_key = PollKey { + partition: K2VItemPartition { + bucket_id, + partition_key, + }, + sort_key, + }; + let nodes = self + .item_table + .data + .replication + .write_nodes(&poll_key.partition.hash()); + + let resps = self + .system + .rpc + .try_call_many( + &self.endpoint, + &nodes[..], + K2VRpc::PollItem { + key: poll_key, + causal_context, + timeout_msec, + }, + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.item_table.data.replication.read_quorum()) + .with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT), + ) + .await?; + + let mut resp: Option = None; + for v in resps { + match v { + K2VRpc::PollItemResponse(Some(x)) => { + if let Some(y) = &mut resp { + y.merge(&x); + } else { + resp = Some(x); + } + } + K2VRpc::PollItemResponse(None) => { + return Ok(None); + } + v => return Err(Error::unexpected_rpc_message(v)), + } + } + + Ok(resp) + } + // ---- internal handlers ---- async fn handle_insert(&self, item: &InsertedItem) -> Result { @@ -207,6 +278,32 @@ impl K2VRpcHandler { } Ok(K2VRpc::Ok) } + + async fn handle_poll(&self, key: &PollKey, ct: &str) -> Result { + let ct = CausalContext::parse(ct)?; + + let mut chan = self.subscriptions.subscribe(key); + + let mut value = self + .item_table + .data + .read_entry(&key.partition, &key.sort_key)? + .map(|bytes| self.item_table.data.decode_entry(&bytes[..])) + .transpose()? + .unwrap_or_else(|| { + K2VItem::new( + key.partition.bucket_id, + key.partition.partition_key.clone(), + key.sort_key.clone(), + ) + }); + + while !value.causal_context().is_newer_than(&ct) { + value = chan.recv().await?; + } + + Ok(value) + } } #[async_trait] @@ -215,6 +312,17 @@ impl EndpointHandler for K2VRpcHandler { match message { K2VRpc::InsertItem(item) => self.handle_insert(item).await, K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await, + K2VRpc::PollItem { + key, + causal_context, + timeout_msec, + } => { + let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); + select! { + ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), + _ = delay => Ok(K2VRpc::PollItemResponse(None)), + } + } m => Err(Error::unexpected_rpc_message(m)), } } diff --git a/src/table/data.rs b/src/table/data.rs index 23ef4b4e..daa3c62a 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -267,7 +267,7 @@ where ret } - pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result { + pub fn decode_entry(&self, bytes: &[u8]) -> Result { match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { Ok(x) => Ok(x), Err(e) => match F::try_migrate(bytes) { diff --git a/src/util/error.rs b/src/util/error.rs index bdb3a69b..8734a0c8 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -44,6 +44,9 @@ pub enum Error { #[error(display = "Tokio semaphore acquire error: {}", _0)] TokioSemAcquire(#[error(source)] tokio::sync::AcquireError), + #[error(display = "Tokio broadcast receive error: {}", _0)] + TokioBcastRecv(#[error(source)] tokio::sync::broadcast::error::RecvError), + #[error(display = "Remote error: {}", _0)] RemoteError(String),