diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index f016cb8c..efe052a8 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -245,12 +245,42 @@ impl K2VRpcHandler { // ---- internal handlers ---- async fn handle_insert(&self, item: &InsertedItem) -> Result { + let new = self.local_insert(item)?; + + // Propagate to rest of network + if let Some(updated) = new { + self.item_table.insert(&updated).await?; + } + + Ok(K2VRpc::Ok) + } + + async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result { + let mut updated_vec = vec![]; + + for item in items { + let new = self.local_insert(item)?; + + if let Some(updated) = new { + updated_vec.push(updated); + } + } + + // Propagate to rest of network + if !updated_vec.is_empty() { + self.item_table.insert_many(&updated_vec).await?; + } + + Ok(K2VRpc::Ok) + } + + fn local_insert(&self, item: &InsertedItem) -> Result, Error> { let tree_key = self .item_table .data .tree_key(&item.partition, &item.sort_key); - let new = self - .item_table + + self.item_table .data .update_entry_with(&tree_key[..], |ent| { let mut ent = ent.unwrap_or_else(|| { @@ -262,21 +292,7 @@ impl K2VRpcHandler { }); ent.update(self.system.id, &item.causal_context, item.value.clone()); ent - })?; - - // Propagate to rest of network - if let Some(updated) = new { - self.item_table.insert(&updated).await?; - } - - Ok(K2VRpc::Ok) - } - - async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result { - for i in items.iter() { - self.handle_insert(i).await?; - } - Ok(K2VRpc::Ok) + }) } async fn handle_poll(&self, key: &PollKey, ct: &str) -> Result {