Implement InsertBatch
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing

This commit is contained in:
Alex 2022-04-22 16:10:02 +02:00
parent 362e7570a3
commit 3d4d59e714
Signed by: lx
GPG key ID: 0E496D15096376BE
7 changed files with 186 additions and 33 deletions

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python
import os
import requests
from datetime import datetime
@ -90,3 +91,29 @@ response = requests.get('http://localhost:3812/alex',
auth=auth)
print(response.headers)
print(response.text)
print("-- InsertBatch")
response = requests.post('http://localhost:3812/alex',
auth=auth,
data='''
[
{"pk": "root", "sk": "a", "ct": null, "v": "aW5pdGlhbCB0ZXN0Cg=="},
{"pk": "root", "sk": "b", "ct": null, "v": "aW5pdGlhbCB0ZXN1Cg=="}
]
''');
print(response.headers)
print(response.text)
print("-- ReadIndex")
response = requests.get('http://localhost:3812/alex',
auth=auth)
print(response.headers)
print(response.text)
for sk in sort_keys:
print("-- (%s) Get"%sk)
response = requests.get('http://localhost:3812/alex/root?sort_key=%s'%sk,
auth=auth)
print(response.headers)
print(response.text)
ct = response.headers["x-garage-causality-token"]

View file

@ -19,6 +19,7 @@ use crate::signature::payload::check_payload_signature;
use crate::signature::streaming::*;
use crate::helpers::*;
use crate::k2v::batch::*;
use crate::k2v::index::*;
use crate::k2v::item::*;
use crate::k2v::router::Endpoint;
@ -147,6 +148,7 @@ impl ApiHandler for K2VApiServer {
end,
limit,
} => handle_read_index(garage, bucket_id, prefix, start, end, limit).await,
Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
//TODO
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
};

55
src/api/k2v/batch.rs Normal file
View file

@ -0,0 +1,55 @@
use std::sync::Arc;
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_util::data::*;
use garage_table::util::*;
use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
use crate::error::*;
use crate::k2v::range::read_range;
pub async fn handle_insert_batch(
garage: Arc<Garage>,
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
let items: Vec<InsertBatchItem> =
serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
let mut items2 = vec![];
for it in items {
let ct = it
.ct
.map(|s| CausalContext::parse(&s))
.transpose()
.ok_or_bad_request("Invalid causality token")?;
let v = match it.v {
Some(vs) => {
DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?)
}
None => DvvsValue::Deleted,
};
items2.push((it.pk, it.sk, ct, v));
}
garage.k2v_rpc.insert_batch(bucket_id, items2).await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}
#[derive(Deserialize)]
struct InsertBatchItem {
pk: String,
sk: String,
ct: Option<String>,
v: Option<String>,
}

View file

@ -1,6 +1,7 @@
pub mod api_server;
mod router;
mod batch;
mod index;
mod item;

View file

@ -41,7 +41,10 @@ where
let mut entries = vec![];
loop {
let n_get = std::cmp::min(1000, limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2);
let n_get = std::cmp::min(
1000,
limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2,
);
let get_ret = table
.get_range(partition_key, Some(start.clone()), filter.clone(), n_get)
.await?;

View file

@ -57,7 +57,10 @@ impl<T: CounterSchema> CounterEntry<T> {
.map(|(_, (_, v))| *v)
.collect::<Vec<_>>();
if !new_vals.is_empty() {
ret.insert(name.clone(), new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)));
ret.insert(
name.clone(),
new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)),
);
}
}

View file

@ -5,9 +5,12 @@
//! node does not process the entry directly, as this would
//! mean the vector clock gets much larger than needed).
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use garage_util::data::*;
@ -25,14 +28,18 @@ use crate::k2v::item_table::*;
/// RPC messages for K2V
#[derive(Debug, Serialize, Deserialize)]
pub enum K2VRpc {
enum K2VRpc {
Ok,
InsertItem {
partition: K2VItemPartition,
sort_key: String,
causal_context: Option<CausalContext>,
value: DvvsValue,
},
InsertItem(InsertedItem),
InsertManyItems(Vec<InsertedItem>),
}
#[derive(Debug, Serialize, Deserialize)]
struct InsertedItem {
partition: K2VItemPartition,
sort_key: String,
causal_context: Option<CausalContext>,
value: DvvsValue,
}
impl Rpc for K2VRpc {
@ -89,12 +96,12 @@ impl K2VRpcHandler {
.try_call_many(
&self.endpoint,
&who[..],
K2VRpc::InsertItem {
K2VRpc::InsertItem(InsertedItem {
partition,
sort_key,
causal_context,
value,
},
}),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
.with_timeout(TABLE_RPC_TIMEOUT)
@ -105,29 +112,84 @@ impl K2VRpcHandler {
Ok(())
}
pub async fn insert_batch(
&self,
bucket_id: Uuid,
items: Vec<(String, String, Option<CausalContext>, DvvsValue)>,
) -> Result<(), Error> {
let n_items = items.len();
let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
for (partition_key, sort_key, causal_context, value) in items {
let partition = K2VItemPartition {
bucket_id,
partition_key,
};
let mut who = self
.item_table
.data
.replication
.write_nodes(&partition.hash());
who.sort();
call_list.entry(who).or_default().push(InsertedItem {
partition,
sort_key,
causal_context,
value,
});
}
debug!(
"K2V insert_batch: {} requests to insert {} items",
call_list.len(),
n_items
);
let call_futures = call_list.into_iter().map(|(nodes, items)| async move {
let resp = self
.system
.rpc
.try_call_many(
&self.endpoint,
&nodes[..],
K2VRpc::InsertManyItems(items),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
Ok::<_, Error>((nodes, resp))
});
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
while let Some(resp) = resps.next().await {
resp?;
}
Ok(())
}
// ---- internal handlers ----
#[allow(clippy::ptr_arg)]
async fn handle_insert(
&self,
partition: &K2VItemPartition,
sort_key: &String,
causal_context: &Option<CausalContext>,
value: &DvvsValue,
) -> Result<K2VRpc, Error> {
let tree_key = self.item_table.data.tree_key(partition, sort_key);
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
let tree_key = self
.item_table
.data
.tree_key(&item.partition, &item.sort_key);
let new = self
.item_table
.data
.update_entry_with(&tree_key[..], |ent| {
let mut ent = ent.unwrap_or_else(|| {
K2VItem::new(
partition.bucket_id,
partition.partition_key.clone(),
sort_key.clone(),
item.partition.bucket_id,
item.partition.partition_key.clone(),
item.sort_key.clone(),
)
});
ent.update(self.system.id, causal_context, value.clone());
ent.update(self.system.id, &item.causal_context, item.value.clone());
ent
})?;
@ -138,21 +200,21 @@ impl K2VRpcHandler {
Ok(K2VRpc::Ok)
}
async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
for i in items.iter() {
self.handle_insert(i).await?;
}
Ok(K2VRpc::Ok)
}
}
#[async_trait]
impl EndpointHandler<K2VRpc> for K2VRpcHandler {
async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> {
match message {
K2VRpc::InsertItem {
partition,
sort_key,
causal_context,
value,
} => {
self.handle_insert(partition, sort_key, causal_context, value)
.await
}
K2VRpc::InsertItem(item) => self.handle_insert(item).await,
K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await,
m => Err(Error::unexpected_rpc_message(m)),
}
}