New worker for index counter propagator
This commit is contained in:
parent
269f996fd0
commit
b8338dea56
1 changed files with 89 additions and 67 deletions
|
@ -4,6 +4,7 @@ use std::marker::PhantomData;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
|
|
||||||
|
@ -11,6 +12,7 @@ use garage_db as db;
|
||||||
|
|
||||||
use garage_rpc::ring::Ring;
|
use garage_rpc::ring::Ring;
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
|
use garage_util::background::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
@ -171,11 +173,13 @@ impl<T: CountedItem> IndexCounter<T> {
|
||||||
),
|
),
|
||||||
});
|
});
|
||||||
|
|
||||||
let this2 = this.clone();
|
background.spawn_worker(IndexPropagatorWorker {
|
||||||
background.spawn_worker(
|
index_counter: this.clone(),
|
||||||
format!("{} index counter propagator", T::COUNTER_TABLE_NAME),
|
propagate_rx,
|
||||||
move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
|
buf: HashMap::new(),
|
||||||
);
|
errors: 0,
|
||||||
|
});
|
||||||
|
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,68 +243,6 @@ impl<T: CountedItem> IndexCounter<T> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn propagate_loop(
|
|
||||||
self: Arc<Self>,
|
|
||||||
mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
|
|
||||||
must_exit: watch::Receiver<bool>,
|
|
||||||
) {
|
|
||||||
// This loop batches updates to counters to be sent all at once.
|
|
||||||
// They are sent once the propagate_rx channel has been emptied (or is closed).
|
|
||||||
let mut buf = HashMap::new();
|
|
||||||
let mut errors = 0;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let (ent, closed) = match propagate_rx.try_recv() {
|
|
||||||
Ok(ent) => (Some(ent), false),
|
|
||||||
Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => {
|
|
||||||
match propagate_rx.recv().await {
|
|
||||||
Some(ent) => (Some(ent), false),
|
|
||||||
None => (None, true),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(mpsc::error::TryRecvError::Empty) => (None, false),
|
|
||||||
Err(mpsc::error::TryRecvError::Disconnected) => (None, true),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some((pk, sk, counters)) = ent {
|
|
||||||
let tree_key = self.table.data.tree_key(&pk, &sk);
|
|
||||||
let dist_entry = counters.into_counter_entry(self.this_node);
|
|
||||||
match buf.entry(tree_key) {
|
|
||||||
hash_map::Entry::Vacant(e) => {
|
|
||||||
e.insert(dist_entry);
|
|
||||||
}
|
|
||||||
hash_map::Entry::Occupied(mut e) => {
|
|
||||||
e.get_mut().merge(&dist_entry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// As long as we can add entries, loop back and add them to batch
|
|
||||||
// before sending batch to other nodes
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if !buf.is_empty() {
|
|
||||||
let entries = buf.iter().map(|(_k, v)| v);
|
|
||||||
if let Err(e) = self.table.insert_many(entries).await {
|
|
||||||
errors += 1;
|
|
||||||
if errors >= 2 && *must_exit.borrow() {
|
|
||||||
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors);
|
|
||||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
buf.clear();
|
|
||||||
errors = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if closed || *must_exit.borrow() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn offline_recount_all<TS, TR>(
|
pub fn offline_recount_all<TS, TR>(
|
||||||
&self,
|
&self,
|
||||||
counted_table: &Arc<Table<TS, TR>>,
|
counted_table: &Arc<Table<TS, TR>>,
|
||||||
|
@ -437,6 +379,86 @@ impl<T: CountedItem> IndexCounter<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct IndexPropagatorWorker<T: CountedItem> {
|
||||||
|
index_counter: Arc<IndexCounter<T>>,
|
||||||
|
propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
|
||||||
|
|
||||||
|
buf: HashMap<Vec<u8>, CounterEntry<T>>,
|
||||||
|
errors: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: CountedItem> IndexPropagatorWorker<T> {
|
||||||
|
fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) {
|
||||||
|
let tree_key = self.index_counter.table.data.tree_key(&pk, &sk);
|
||||||
|
let dist_entry = counters.into_counter_entry(self.index_counter.this_node);
|
||||||
|
match self.buf.entry(tree_key) {
|
||||||
|
hash_map::Entry::Vacant(e) => {
|
||||||
|
e.insert(dist_entry);
|
||||||
|
}
|
||||||
|
hash_map::Entry::Occupied(mut e) => {
|
||||||
|
e.get_mut().merge(&dist_entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
|
||||||
|
fn name(&self) -> String {
|
||||||
|
format!("{} index counter propagator", T::COUNTER_TABLE_NAME)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> {
|
||||||
|
// This loop batches updates to counters to be sent all at once.
|
||||||
|
// They are sent once the propagate_rx channel has been emptied (or is closed).
|
||||||
|
let closed = loop {
|
||||||
|
match self.propagate_rx.try_recv() {
|
||||||
|
Ok((pk, sk, counters)) => {
|
||||||
|
self.add_ent(pk, sk, counters);
|
||||||
|
}
|
||||||
|
Err(mpsc::error::TryRecvError::Empty) => break false,
|
||||||
|
Err(mpsc::error::TryRecvError::Disconnected) => break true,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !self.buf.is_empty() {
|
||||||
|
let entries = self.buf.iter().map(|(_k, v)| v);
|
||||||
|
if let Err(e) = self.index_counter.table.insert_many(entries).await {
|
||||||
|
self.errors += 1;
|
||||||
|
if self.errors >= 2 && *must_exit.borrow() {
|
||||||
|
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
|
||||||
|
return Ok(WorkerStatus::Done);
|
||||||
|
}
|
||||||
|
warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, self.buf.len(), e, self.errors);
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
return Ok(WorkerStatus::Busy);
|
||||||
|
} else {
|
||||||
|
self.buf.clear();
|
||||||
|
self.errors = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(WorkerStatus::Busy);
|
||||||
|
} else if closed {
|
||||||
|
return Ok(WorkerStatus::Done);
|
||||||
|
} else {
|
||||||
|
return Ok(WorkerStatus::Idle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
|
||||||
|
match self.propagate_rx.recv().await {
|
||||||
|
Some((pk, sk, counters)) => {
|
||||||
|
self.add_ent(pk, sk, counters);
|
||||||
|
WorkerStatus::Busy
|
||||||
|
}
|
||||||
|
None => match self.buf.is_empty() {
|
||||||
|
false => WorkerStatus::Busy,
|
||||||
|
true => WorkerStatus::Done,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
struct LocalCounterEntry<T: CountedItem> {
|
struct LocalCounterEntry<T: CountedItem> {
|
||||||
pk: T::CP,
|
pk: T::CP,
|
||||||
|
|
Loading…
Reference in a new issue