K2V #293

Merged
lx merged 68 commits from k2v into main 2022-05-10 11:16:58 +00:00
12 changed files with 243 additions and 12 deletions
Showing only changes of commit f6d5d8c532 - Show all commits

View file

@ -142,6 +142,23 @@ impl ApiHandler for K2VApiServer {
partition_key,
sort_key,
} => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
Endpoint::PollItem {
partition_key,
sort_key,
causality_token,
timeout,
} => {
handle_poll_item(
garage,
&req,
bucket_id,
partition_key,
sort_key,
causality_token,
timeout,
)
.await
}
Endpoint::ReadIndex {
prefix,
start,
@ -151,8 +168,7 @@ impl ApiHandler for K2VApiServer {
Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
//TODO
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
Endpoint::Options => unreachable!(),
};
// If request was a success and we have a CORS rule that applies to it,

View file

@ -198,7 +198,7 @@ async fn handle_delete_batch_query(
.filter(|e| K2VItemTable::matches_filter(e, &filter));
match item {
Some(i) => {
let cc = i.causality_context();
let cc = i.causal_context();
garage
.k2v_rpc
.insert(
@ -230,7 +230,7 @@ async fn handle_delete_batch_query(
let items = items
.into_iter()
.map(|i| {
let cc = i.causality_context();
let cc = i.causal_context();
(
i.partition.partition_key,
i.sort_key,
@ -313,7 +313,7 @@ struct ReadBatchResponseItem {
impl ReadBatchResponseItem {
fn from(i: K2VItem) -> Self {
let ct = i.causality_context().serialize();
let ct = i.causal_context().serialize();
let v = i
.values()
.iter()

View file

@ -46,7 +46,7 @@ impl ReturnFormat {
return Err(Error::NoSuchKey);
}
let ct = item.causality_context().serialize();
let ct = item.causal_context().serialize();
match self {
Self::Binary if vals.len() > 1 => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
@ -186,3 +186,36 @@ pub async fn handle_delete_item(
.status(StatusCode::NO_CONTENT)
.body(Body::empty())?)
}
/// Handle ReadItem request
#[allow(clippy::ptr_arg)]
pub async fn handle_poll_item(
garage: Arc<Garage>,
req: &Request<Body>,
bucket_id: Uuid,
partition_key: String,
sort_key: String,
causality_token: String,
timeout_secs: Option<u64>,
) -> Result<Response<Body>, Error> {
let format = ReturnFormat::from(req)?;
let item = garage
.k2v_rpc
.poll(
bucket_id,
partition_key,
sort_key,
causality_token,
timeout_secs.unwrap_or(300) * 1000,
)
.await?;
if let Some(item) = item {
format.make_response(&item)
} else {
Ok(Response::builder()
.status(StatusCode::NOT_MODIFIED)
.body(Body::empty())?)
}
}

View file

@ -30,6 +30,7 @@ pub enum Endpoint {
partition_key: String,
sort_key: String,
causality_token: String,
timeout: Option<u64>,
},
ReadBatch {
},
@ -96,7 +97,7 @@ impl Endpoint {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
key: [
EMPTY if causality_token => PollItem (query::sort_key, query::causality_token),
EMPTY if causality_token => PollItem (query::sort_key, query::causality_token, opt_parse::timeout),
EMPTY => ReadItem (query::sort_key),
],
no_key: [
@ -235,7 +236,8 @@ generateQueryParameters! {
"causality_token" => causality_token,
"end" => end,
"limit" => limit,
"sort_key" => sort_key
"sort_key" => sort_key,
"timeout" => timeout
}
mod keywords {

View file

@ -15,6 +15,7 @@ use garage_table::*;
use crate::k2v::counter_table::*;
use crate::k2v::item_table::*;
use crate::k2v::poll::*;
use crate::k2v::rpc::*;
use crate::s3::block_ref_table::*;
use crate::s3::object_table::*;
@ -158,16 +159,21 @@ impl Garage {
);
// ---- K2V ----
info!("Initialize K2V counter table...");
let k2v_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
info!("Initialize K2V subscription manager...");
let k2v_subscriptions = Arc::new(SubscriptionManager::new());
info!("Initialize K2V item table...");
let k2v_item_table = Table::new(
K2VItemTable {
counter_table: k2v_counter_table.clone(),
subscriptions: k2v_subscriptions.clone(),
},
meta_rep_param,
system.clone(),
&db,
);
let k2v_rpc = K2VRpcHandler::new(system.clone(), k2v_item_table.clone());
let k2v_rpc = K2VRpcHandler::new(system.clone(), k2v_item_table.clone(), k2v_subscriptions);
info!("Initialize Garage...");

View file

@ -74,6 +74,12 @@ impl CausalContext {
Ok(ret)
}
/// Check if this causal context contains newer items than another one
pub fn is_newer_than(&self, other: &Self) -> bool {
self.vector_clock
.iter()
.any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0))
}
}
#[cfg(test)]

View file

@ -10,6 +10,7 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
use crate::k2v::counter_table::*;
use crate::k2v::poll::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct K2VItem {
@ -19,7 +20,7 @@ pub struct K2VItem {
items: BTreeMap<K2VNodeId, DvvsEntry>,
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)]
pub struct K2VItemPartition {
pub bucket_id: Uuid,
pub partition_key: String,
@ -84,7 +85,7 @@ impl K2VItem {
}
/// Extract the causality context of a K2V Item
pub fn causality_context(&self) -> CausalContext {
pub fn causal_context(&self) -> CausalContext {
let mut cc = CausalContext::new_empty();
for (node, ent) in self.items.iter() {
cc.vector_clock.insert(*node, ent.max_time());
@ -201,6 +202,7 @@ impl Entry<K2VItemPartition, String> for K2VItem {
pub struct K2VItemTable {
pub(crate) counter_table: Arc<IndexCounter<K2VCounterTable>>,
pub(crate) subscriptions: Arc<SubscriptionManager>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
@ -246,6 +248,10 @@ impl TableSchema for K2VItemTable {
) {
error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e);
}
if let Some(new_ent) = new {
self.subscriptions.notify(new_ent);
}
}
#[allow(clippy::nonminimal_bool)]

View file

@ -3,4 +3,5 @@ pub mod causality;
pub mod counter_table;
pub mod item_table;
pub mod poll;
pub mod rpc;

50
src/model/k2v/poll.rs Normal file
View file

@ -0,0 +1,50 @@
use std::collections::HashMap;
use std::sync::Mutex;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use crate::k2v::item_table::*;
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PollKey {
pub partition: K2VItemPartition,
pub sort_key: String,
}
#[derive(Default)]
pub struct SubscriptionManager {
subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
}
impl SubscriptionManager {
pub fn new() -> Self {
Self::default()
}
pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
let mut subs = self.subscriptions.lock().unwrap();
if let Some(s) = subs.get(key) {
s.subscribe()
} else {
let (tx, rx) = broadcast::channel(8);
subs.insert(key.clone(), tx);
rx
}
}
pub fn notify(&self, item: &K2VItem) {
let key = PollKey {
partition: item.partition.clone(),
sort_key: item.sort_key.clone(),
};
let mut subs = self.subscriptions.lock().unwrap();
if let Some(s) = subs.get(&key) {
if s.send(item.clone()).is_err() {
// no more subscribers, remove channel from here
// (we will re-create it later if we need to subscribe again)
subs.remove(&key);
}
}
}
}

View file

@ -7,12 +7,15 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::select;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
@ -25,6 +28,7 @@ use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
use crate::k2v::poll::*;
/// RPC messages for K2V
#[derive(Debug, Serialize, Deserialize)]
@ -32,6 +36,12 @@ enum K2VRpc {
Ok,
InsertItem(InsertedItem),
InsertManyItems(Vec<InsertedItem>),
PollItem {
key: PollKey,
causal_context: String,
timeout_msec: u64,
},
PollItemResponse(Option<K2VItem>),
}
#[derive(Debug, Serialize, Deserialize)]
@ -51,12 +61,14 @@ pub struct K2VRpcHandler {
system: Arc<System>,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
endpoint: Arc<Endpoint<K2VRpc, Self>>,
subscriptions: Arc<SubscriptionManager>,
}
impl K2VRpcHandler {
pub fn new(
system: Arc<System>,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
subscriptions: Arc<SubscriptionManager>,
) -> Arc<Self> {
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
@ -64,6 +76,7 @@ impl K2VRpcHandler {
system,
item_table,
endpoint,
subscriptions,
});
rpc_handler.endpoint.set_handler(rpc_handler.clone());
@ -171,6 +184,64 @@ impl K2VRpcHandler {
Ok(())
}
pub async fn poll(
&self,
bucket_id: Uuid,
partition_key: String,
sort_key: String,
causal_context: String,
timeout_msec: u64,
) -> Result<Option<K2VItem>, Error> {
let poll_key = PollKey {
partition: K2VItemPartition {
bucket_id,
partition_key,
},
sort_key,
};
let nodes = self
.item_table
.data
.replication
.write_nodes(&poll_key.partition.hash());
let resps = self
.system
.rpc
.try_call_many(
&self.endpoint,
&nodes[..],
K2VRpc::PollItem {
key: poll_key,
causal_context,
timeout_msec,
},
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.item_table.data.replication.read_quorum())
.with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT),
)
.await?;
let mut resp: Option<K2VItem> = None;
for v in resps {
match v {
K2VRpc::PollItemResponse(Some(x)) => {
if let Some(y) = &mut resp {
y.merge(&x);
} else {
resp = Some(x);
}
}
K2VRpc::PollItemResponse(None) => {
return Ok(None);
}
v => return Err(Error::unexpected_rpc_message(v)),
}
}
Ok(resp)
}
// ---- internal handlers ----
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
@ -207,6 +278,32 @@ impl K2VRpcHandler {
}
Ok(K2VRpc::Ok)
}
async fn handle_poll(&self, key: &PollKey, ct: &str) -> Result<K2VItem, Error> {
let ct = CausalContext::parse(ct)?;
let mut chan = self.subscriptions.subscribe(key);
let mut value = self
.item_table
.data
.read_entry(&key.partition, &key.sort_key)?
.map(|bytes| self.item_table.data.decode_entry(&bytes[..]))
.transpose()?
.unwrap_or_else(|| {
K2VItem::new(
key.partition.bucket_id,
key.partition.partition_key.clone(),
key.sort_key.clone(),
)
});
while !value.causal_context().is_newer_than(&ct) {
value = chan.recv().await?;
}
Ok(value)
}
}
#[async_trait]
@ -215,6 +312,17 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
match message {
K2VRpc::InsertItem(item) => self.handle_insert(item).await,
K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await,
K2VRpc::PollItem {
key,
causal_context,
timeout_msec,
} => {
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
select! {
ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
}
}
m => Err(Error::unexpected_rpc_message(m)),
}
}

View file

@ -267,7 +267,7 @@ where
ret
}
pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
Ok(x) => Ok(x),
Err(e) => match F::try_migrate(bytes) {

View file

@ -44,6 +44,9 @@ pub enum Error {
#[error(display = "Tokio semaphore acquire error: {}", _0)]
TokioSemAcquire(#[error(source)] tokio::sync::AcquireError),
#[error(display = "Tokio broadcast receive error: {}", _0)]
TokioBcastRecv(#[error(source)] tokio::sync::broadcast::error::RecvError),
#[error(display = "Remote error: {}", _0)]
RemoteError(String),