Batch propagation of index counter updates
This commit is contained in:
parent
17883bbe4c
commit
41b58d7e25
2 changed files with 78 additions and 12 deletions
|
@ -1,8 +1,10 @@
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::sync::{mpsc, watch};
|
||||||
|
|
||||||
use garage_rpc::ring::Ring;
|
use garage_rpc::ring::Ring;
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
|
@ -134,6 +136,7 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
|
||||||
pub struct IndexCounter<T: CounterSchema> {
|
pub struct IndexCounter<T: CounterSchema> {
|
||||||
this_node: Uuid,
|
this_node: Uuid,
|
||||||
local_counter: sled::Tree,
|
local_counter: sled::Tree,
|
||||||
|
propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
|
||||||
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
|
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,11 +146,16 @@ impl<T: CounterSchema> IndexCounter<T> {
|
||||||
replication: TableShardedReplication,
|
replication: TableShardedReplication,
|
||||||
db: &sled::Db,
|
db: &sled::Db,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
Arc::new(Self {
|
let background = system.background.clone();
|
||||||
|
|
||||||
|
let (propagate_tx, propagate_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let this = Arc::new(Self {
|
||||||
this_node: system.id,
|
this_node: system.id,
|
||||||
local_counter: db
|
local_counter: db
|
||||||
.open_tree(format!("local_counter:{}", T::NAME))
|
.open_tree(format!("local_counter:{}", T::NAME))
|
||||||
.expect("Unable to open local counter tree"),
|
.expect("Unable to open local counter tree"),
|
||||||
|
propagate_tx,
|
||||||
table: Table::new(
|
table: Table::new(
|
||||||
CounterTable {
|
CounterTable {
|
||||||
_phantom_t: Default::default(),
|
_phantom_t: Default::default(),
|
||||||
|
@ -156,7 +164,14 @@ impl<T: CounterSchema> IndexCounter<T> {
|
||||||
system,
|
system,
|
||||||
db,
|
db,
|
||||||
),
|
),
|
||||||
})
|
});
|
||||||
|
|
||||||
|
let this2 = this.clone();
|
||||||
|
background.spawn_worker(
|
||||||
|
format!("{} index counter propagator", T::NAME),
|
||||||
|
move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
|
||||||
|
);
|
||||||
|
this
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> {
|
pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> {
|
||||||
|
@ -188,19 +203,68 @@ impl<T: CounterSchema> IndexCounter<T> {
|
||||||
Ok(entry)
|
Ok(entry)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let table = self.table.clone();
|
if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) {
|
||||||
let this_node = self.this_node;
|
error!(
|
||||||
let pk = pk.clone();
|
"Could not propagate updated counter values, failed to send to channel: {}",
|
||||||
let sk = sk.clone();
|
e
|
||||||
tokio::spawn(async move {
|
);
|
||||||
let dist_entry = new_entry.into_counter_entry::<T>(this_node, pk, sk);
|
|
||||||
if let Err(e) = table.insert(&dist_entry).await {
|
|
||||||
warn!("({}) Could not propagate counter value: {}", T::NAME, e);
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn propagate_loop(
|
||||||
|
self: Arc<Self>,
|
||||||
|
mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>,
|
||||||
|
must_exit: watch::Receiver<bool>,
|
||||||
|
) {
|
||||||
|
// This loop batches updates to counters to be sent all at once.
|
||||||
|
// They are sent once the propagate_rx channel has been emptied (or is closed).
|
||||||
|
let mut buf = vec![];
|
||||||
|
let mut errors = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (ent, closed) = match propagate_rx.try_recv() {
|
||||||
|
Ok(ent) => (Some(ent), false),
|
||||||
|
Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => {
|
||||||
|
match propagate_rx.recv().await {
|
||||||
|
Some(ent) => (Some(ent), false),
|
||||||
|
None => (None, true),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(mpsc::error::TryRecvError::Empty) => (None, false),
|
||||||
|
Err(mpsc::error::TryRecvError::Disconnected) => (None, true),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some((pk, sk, counters)) = ent {
|
||||||
|
let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk);
|
||||||
|
buf.push(dist_entry);
|
||||||
|
// As long as we can add entries, loop back and add them to batch
|
||||||
|
// before sending batch to other nodes
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !buf.is_empty() {
|
||||||
|
if let Err(e) = self.table.insert_many(&buf[..]).await {
|
||||||
|
errors += 1;
|
||||||
|
if errors >= 2 && *must_exit.borrow() {
|
||||||
|
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors);
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.clear();
|
||||||
|
errors = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if closed || *must_exit.borrow() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
|
|
|
@ -222,6 +222,7 @@ impl TableSchema for K2VItemTable {
|
||||||
type Filter = ItemFilter;
|
type Filter = ItemFilter;
|
||||||
|
|
||||||
fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
|
fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
|
||||||
|
// 1. Count
|
||||||
let (old_entries, old_conflicts, old_values, old_bytes) = match old {
|
let (old_entries, old_conflicts, old_values, old_bytes) = match old {
|
||||||
None => (0, 0, 0, 0),
|
None => (0, 0, 0, 0),
|
||||||
Some(e) => e.stats(),
|
Some(e) => e.stats(),
|
||||||
|
@ -251,6 +252,7 @@ impl TableSchema for K2VItemTable {
|
||||||
error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e);
|
error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 2. Notify
|
||||||
if let Some(new_ent) = new {
|
if let Some(new_ent) = new {
|
||||||
self.subscriptions.notify(new_ent);
|
self.subscriptions.notify(new_ent);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue