K2V #293
2 changed files with 27 additions and 8 deletions
|
@ -1,4 +1,4 @@
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{hash_map, BTreeMap, HashMap};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -220,7 +220,7 @@ impl<T: CounterSchema> IndexCounter<T> {
|
||||||
) {
|
) {
|
||||||
// This loop batches updates to counters to be sent all at once.
|
// This loop batches updates to counters to be sent all at once.
|
||||||
// They are sent once the propagate_rx channel has been emptied (or is closed).
|
// They are sent once the propagate_rx channel has been emptied (or is closed).
|
||||||
let mut buf = vec![];
|
let mut buf = HashMap::new();
|
||||||
let mut errors = 0;
|
let mut errors = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -237,15 +237,24 @@ impl<T: CounterSchema> IndexCounter<T> {
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some((pk, sk, counters)) = ent {
|
if let Some((pk, sk, counters)) = ent {
|
||||||
|
let tree_key = self.table.data.tree_key(&pk, &sk);
|
||||||
let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk);
|
let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk);
|
||||||
buf.push(dist_entry);
|
match buf.entry(tree_key) {
|
||||||
|
hash_map::Entry::Vacant(e) => {
|
||||||
|
e.insert(dist_entry);
|
||||||
|
}
|
||||||
|
hash_map::Entry::Occupied(mut e) => {
|
||||||
|
e.get_mut().merge(&dist_entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
// As long as we can add entries, loop back and add them to batch
|
// As long as we can add entries, loop back and add them to batch
|
||||||
// before sending batch to other nodes
|
// before sending batch to other nodes
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if !buf.is_empty() {
|
if !buf.is_empty() {
|
||||||
if let Err(e) = self.table.insert_many(&buf[..]).await {
|
let entries = buf.iter().map(|(_k, v)| v);
|
||||||
|
if let Err(e) = self.table.insert_many(entries).await {
|
||||||
errors += 1;
|
errors += 1;
|
||||||
if errors >= 2 && *must_exit.borrow() {
|
if errors >= 2 && *must_exit.borrow() {
|
||||||
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
|
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::borrow::Borrow;
|
||||||
use std::collections::{BTreeMap, BTreeSet, HashMap};
|
use std::collections::{BTreeMap, BTreeSet, HashMap};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -130,9 +131,13 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
|
pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
I: IntoIterator<Item = IE> + Send + Sync,
|
||||||
|
IE: Borrow<F::E> + Send + Sync,
|
||||||
|
{
|
||||||
let tracer = opentelemetry::global::tracer("garage_table");
|
let tracer = opentelemetry::global::tracer("garage_table");
|
||||||
let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len()));
|
let span = tracer.start(format!("{} insert_many", F::TABLE_NAME));
|
||||||
|
|
||||||
self.insert_many_internal(entries)
|
self.insert_many_internal(entries)
|
||||||
.bound_record_duration(&self.data.metrics.put_request_duration)
|
.bound_record_duration(&self.data.metrics.put_request_duration)
|
||||||
|
@ -144,10 +149,15 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> {
|
async fn insert_many_internal<I, IE>(&self, entries: I) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
I: IntoIterator<Item = IE> + Send + Sync,
|
||||||
|
IE: Borrow<F::E> + Send + Sync,
|
||||||
|
{
|
||||||
let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
|
let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
|
||||||
|
|
||||||
for entry in entries.iter() {
|
for entry in entries.into_iter() {
|
||||||
|
let entry = entry.borrow();
|
||||||
let hash = entry.partition_key().hash();
|
let hash = entry.partition_key().hash();
|
||||||
let who = self.data.replication.write_nodes(&hash);
|
let who = self.data.replication.write_nodes(&hash);
|
||||||
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
|
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
|
||||||
|
|
Loading…
Reference in a new issue