From 3d4d59e714814031e81eea94b5502c471ae41fcb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Apr 2022 16:10:02 +0200 Subject: [PATCH] Implement InsertBatch --- k2v_test.py | 27 ++++++++ src/api/k2v/api_server.rs | 2 + src/api/k2v/batch.rs | 55 ++++++++++++++++ src/api/k2v/mod.rs | 1 + src/api/k2v/range.rs | 5 +- src/model/index_counter.rs | 5 +- src/model/k2v/rpc.rs | 124 +++++++++++++++++++++++++++---------- 7 files changed, 186 insertions(+), 33 deletions(-) create mode 100644 src/api/k2v/batch.rs diff --git a/k2v_test.py b/k2v_test.py index 346883fe..eecffbc3 100755 --- a/k2v_test.py +++ b/k2v_test.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +import os import requests from datetime import datetime @@ -90,3 +91,29 @@ response = requests.get('http://localhost:3812/alex', auth=auth) print(response.headers) print(response.text) + +print("-- InsertBatch") +response = requests.post('http://localhost:3812/alex', + auth=auth, + data=''' +[ + {"pk": "root", "sk": "a", "ct": null, "v": "aW5pdGlhbCB0ZXN0Cg=="}, + {"pk": "root", "sk": "b", "ct": null, "v": "aW5pdGlhbCB0ZXN1Cg=="} +] +'''); +print(response.headers) +print(response.text) + +print("-- ReadIndex") +response = requests.get('http://localhost:3812/alex', + auth=auth) +print(response.headers) +print(response.text) + +for sk in sort_keys: + print("-- (%s) Get"%sk) + response = requests.get('http://localhost:3812/alex/root?sort_key=%s'%sk, + auth=auth) + print(response.headers) + print(response.text) + ct = response.headers["x-garage-causality-token"] diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 0de04957..dfe66d0b 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -19,6 +19,7 @@ use crate::signature::payload::check_payload_signature; use crate::signature::streaming::*; use crate::helpers::*; +use crate::k2v::batch::*; use crate::k2v::index::*; use crate::k2v::item::*; use crate::k2v::router::Endpoint; @@ -147,6 +148,7 @@ impl ApiHandler for K2VApiServer { end, limit, } => handle_read_index(garage, bucket_id, prefix, start, end, limit).await, + Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, //TODO endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs new file mode 100644 index 00000000..7568f0c9 --- /dev/null +++ b/src/api/k2v/batch.rs @@ -0,0 +1,55 @@ +use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + +use garage_table::util::*; + +use garage_model::garage::Garage; +use garage_model::k2v::causality::*; +use garage_model::k2v::item_table::*; + +use crate::error::*; +use crate::k2v::range::read_range; + +pub async fn handle_insert_batch( + garage: Arc, + bucket_id: Uuid, + req: Request, +) -> Result, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + let items: Vec = + serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + + let mut items2 = vec![]; + for it in items { + let ct = it + .ct + .map(|s| CausalContext::parse(&s)) + .transpose() + .ok_or_bad_request("Invalid causality token")?; + let v = match it.v { + Some(vs) => { + DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?) + } + None => DvvsValue::Deleted, + }; + items2.push((it.pk, it.sk, ct, v)); + } + + garage.k2v_rpc.insert_batch(bucket_id, items2).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +#[derive(Deserialize)] +struct InsertBatchItem { + pk: String, + sk: String, + ct: Option, + v: Option, +} diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs index 62eeaa5b..ee210ad5 100644 --- a/src/api/k2v/mod.rs +++ b/src/api/k2v/mod.rs @@ -1,6 +1,7 @@ pub mod api_server; mod router; +mod batch; mod index; mod item; diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs index 37ab7aa1..ae04d896 100644 --- a/src/api/k2v/range.rs +++ b/src/api/k2v/range.rs @@ -41,7 +41,10 @@ where let mut entries = vec![]; loop { - let n_get = std::cmp::min(1000, limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2); + let n_get = std::cmp::min( + 1000, + limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2, + ); let get_ret = table .get_range(partition_key, Some(start.clone()), filter.clone(), n_get) .await?; diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 701abb94..14db3523 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -57,7 +57,10 @@ impl CounterEntry { .map(|(_, (_, v))| *v) .collect::>(); if !new_vals.is_empty() { - ret.insert(name.clone(), new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b))); + ret.insert( + name.clone(), + new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)), + ); } } diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 397496c9..25b02085 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -5,9 +5,12 @@ //! node does not process the entry directly, as this would //! mean the vector clock gets much larger than needed). +use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use serde::{Deserialize, Serialize}; use garage_util::data::*; @@ -25,14 +28,18 @@ use crate::k2v::item_table::*; /// RPC messages for K2V #[derive(Debug, Serialize, Deserialize)] -pub enum K2VRpc { +enum K2VRpc { Ok, - InsertItem { - partition: K2VItemPartition, - sort_key: String, - causal_context: Option, - value: DvvsValue, - }, + InsertItem(InsertedItem), + InsertManyItems(Vec), +} + +#[derive(Debug, Serialize, Deserialize)] +struct InsertedItem { + partition: K2VItemPartition, + sort_key: String, + causal_context: Option, + value: DvvsValue, } impl Rpc for K2VRpc { @@ -89,12 +96,12 @@ impl K2VRpcHandler { .try_call_many( &self.endpoint, &who[..], - K2VRpc::InsertItem { + K2VRpc::InsertItem(InsertedItem { partition, sort_key, causal_context, value, - }, + }), RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(1) .with_timeout(TABLE_RPC_TIMEOUT) @@ -105,29 +112,84 @@ impl K2VRpcHandler { Ok(()) } + pub async fn insert_batch( + &self, + bucket_id: Uuid, + items: Vec<(String, String, Option, DvvsValue)>, + ) -> Result<(), Error> { + let n_items = items.len(); + + let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); + + for (partition_key, sort_key, causal_context, value) in items { + let partition = K2VItemPartition { + bucket_id, + partition_key, + }; + let mut who = self + .item_table + .data + .replication + .write_nodes(&partition.hash()); + who.sort(); + + call_list.entry(who).or_default().push(InsertedItem { + partition, + sort_key, + causal_context, + value, + }); + } + + debug!( + "K2V insert_batch: {} requests to insert {} items", + call_list.len(), + n_items + ); + let call_futures = call_list.into_iter().map(|(nodes, items)| async move { + let resp = self + .system + .rpc + .try_call_many( + &self.endpoint, + &nodes[..], + K2VRpc::InsertManyItems(items), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(1) + .with_timeout(TABLE_RPC_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + Ok::<_, Error>((nodes, resp)) + }); + + let mut resps = call_futures.collect::>(); + while let Some(resp) = resps.next().await { + resp?; + } + + Ok(()) + } + // ---- internal handlers ---- - #[allow(clippy::ptr_arg)] - async fn handle_insert( - &self, - partition: &K2VItemPartition, - sort_key: &String, - causal_context: &Option, - value: &DvvsValue, - ) -> Result { - let tree_key = self.item_table.data.tree_key(partition, sort_key); + async fn handle_insert(&self, item: &InsertedItem) -> Result { + let tree_key = self + .item_table + .data + .tree_key(&item.partition, &item.sort_key); let new = self .item_table .data .update_entry_with(&tree_key[..], |ent| { let mut ent = ent.unwrap_or_else(|| { K2VItem::new( - partition.bucket_id, - partition.partition_key.clone(), - sort_key.clone(), + item.partition.bucket_id, + item.partition.partition_key.clone(), + item.sort_key.clone(), ) }); - ent.update(self.system.id, causal_context, value.clone()); + ent.update(self.system.id, &item.causal_context, item.value.clone()); ent })?; @@ -138,21 +200,21 @@ impl K2VRpcHandler { Ok(K2VRpc::Ok) } + + async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result { + for i in items.iter() { + self.handle_insert(i).await?; + } + Ok(K2VRpc::Ok) + } } #[async_trait] impl EndpointHandler for K2VRpcHandler { async fn handle(self: &Arc, message: &K2VRpc, _from: NodeID) -> Result { match message { - K2VRpc::InsertItem { - partition, - sort_key, - causal_context, - value, - } => { - self.handle_insert(partition, sort_key, causal_context, value) - .await - } + K2VRpc::InsertItem(item) => self.handle_insert(item).await, + K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await, m => Err(Error::unexpected_rpc_message(m)), } }