Optimize batch insertion

This commit is contained in:
Alex 2022-04-27 10:27:13 +02:00
parent 8fa25e882b
commit a4e21dffdf
Signed by: lx
GPG Key ID: 0E496D15096376BE
1 changed files with 33 additions and 17 deletions

View File

@ -245,12 +245,42 @@ impl K2VRpcHandler {
// ---- internal handlers ----
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
let new = self.local_insert(item)?;
// Propagate to rest of network
if let Some(updated) = new {
self.item_table.insert(&updated).await?;
}
Ok(K2VRpc::Ok)
}
async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
let mut updated_vec = vec![];
for item in items {
let new = self.local_insert(item)?;
if let Some(updated) = new {
updated_vec.push(updated);
}
}
// Propagate to rest of network
if !updated_vec.is_empty() {
self.item_table.insert_many(&updated_vec).await?;
}
Ok(K2VRpc::Ok)
}
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
let tree_key = self
.item_table
.data
.tree_key(&item.partition, &item.sort_key);
let new = self
.item_table
self.item_table
.data
.update_entry_with(&tree_key[..], |ent| {
let mut ent = ent.unwrap_or_else(|| {
@ -262,21 +292,7 @@ impl K2VRpcHandler {
});
ent.update(self.system.id, &item.causal_context, item.value.clone());
ent
})?;
// Propagate to rest of network
if let Some(updated) = new {
self.item_table.insert(&updated).await?;
}
Ok(K2VRpc::Ok)
}
async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
for i in items.iter() {
self.handle_insert(i).await?;
}
Ok(K2VRpc::Ok)
})
}
async fn handle_poll(&self, key: &PollKey, ct: &str) -> Result<K2VItem, Error> {