From 09a3dad0f2c1641e4e300809dbdb3599b32efc03 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 11:35:36 +0100 Subject: [PATCH] Lock once for insert_many --- src/model/k2v/rpc.rs | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 8b070885..04ab3ab9 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -7,7 +7,7 @@ use std::collections::{BTreeMap, HashMap}; use std::convert::TryInto; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::time::Duration; use async_trait::async_trait; @@ -72,7 +72,12 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc, item_table: Arc>, + + // Using a mutex on the local_timestamp_tree is not strictly necessary, + // but it helps to not try to do several inserts at the same time, + // which would create transaction conflicts and force many useless retries. local_timestamp_tree: Mutex, + endpoint: Arc>, subscriptions: Arc, } @@ -323,7 +328,10 @@ impl K2VRpcHandler { // ---- internal handlers ---- async fn handle_insert(&self, item: &InsertedItem) -> Result { - let new = self.local_insert(item)?; + let new = { + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + self.local_insert(&local_timestamp_tree, item)? + }; // Propagate to rest of network if let Some(updated) = new { @@ -336,11 +344,14 @@ impl K2VRpcHandler { async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result { let mut updated_vec = vec![]; - for item in items { - let new = self.local_insert(item)?; + { + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + for item in items { + let new = self.local_insert(&local_timestamp_tree, item)?; - if let Some(updated) = new { - updated_vec.push(updated); + if let Some(updated) = new { + updated_vec.push(updated); + } } } @@ -352,12 +363,11 @@ impl K2VRpcHandler { Ok(K2VRpc::Ok) } - fn local_insert(&self, item: &InsertedItem) -> Result, Error> { - // Using a mutex on the local_timestamp_tree is not strictly necessary, - // but it helps to not try to do several inserts at the same time, - // which would create transaction conflicts and force many useless retries. - let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); - + fn local_insert( + &self, + local_timestamp_tree: &MutexGuard<'_, db::Tree>, + item: &InsertedItem, + ) -> Result, Error> { let now = now_msec(); self.item_table