diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 23e13109..123154d4 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; @@ -220,7 +220,7 @@ impl IndexCounter { ) { // This loop batches updates to counters to be sent all at once. // They are sent once the propagate_rx channel has been emptied (or is closed). - let mut buf = vec![]; + let mut buf = HashMap::new(); let mut errors = 0; loop { @@ -237,15 +237,24 @@ impl IndexCounter { }; if let Some((pk, sk, counters)) = ent { + let tree_key = self.table.data.tree_key(&pk, &sk); let dist_entry = counters.into_counter_entry::(self.this_node, pk, sk); - buf.push(dist_entry); + match buf.entry(tree_key) { + hash_map::Entry::Vacant(e) => { + e.insert(dist_entry); + } + hash_map::Entry::Occupied(mut e) => { + e.get_mut().merge(&dist_entry); + } + } // As long as we can add entries, loop back and add them to batch // before sending batch to other nodes continue; } if !buf.is_empty() { - if let Err(e) = self.table.insert_many(&buf[..]).await { + let entries = buf.iter().map(|(_k, v)| v); + if let Err(e) = self.table.insert_many(entries).await { errors += 1; if errors >= 2 && *must_exit.borrow() { error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e); diff --git a/src/table/table.rs b/src/table/table.rs index e26eb215..2a167604 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,3 +1,4 @@ +use std::borrow::Borrow; use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use std::time::Duration; @@ -130,9 +131,13 @@ where Ok(()) } - pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + pub async fn insert_many(&self, entries: I) -> Result<(), Error> + where + I: IntoIterator + Send + Sync, + IE: Borrow + Send + Sync, + { let tracer = opentelemetry::global::tracer("garage_table"); - let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len())); + let span = tracer.start(format!("{} insert_many", F::TABLE_NAME)); self.insert_many_internal(entries) .bound_record_duration(&self.data.metrics.put_request_duration) @@ -144,10 +149,15 @@ where Ok(()) } - async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> { + async fn insert_many_internal(&self, entries: I) -> Result<(), Error> + where + I: IntoIterator + Send + Sync, + IE: Borrow + Send + Sync, + { let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); - for entry in entries.iter() { + for entry in entries.into_iter() { + let entry = entry.borrow(); let hash = entry.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));