diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 36e8172b..4318a064 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -4,6 +4,7 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; @@ -11,6 +12,7 @@ use garage_db as db; use garage_rpc::ring::Ring; use garage_rpc::system::System; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -171,11 +173,13 @@ impl IndexCounter { ), }); - let this2 = this.clone(); - background.spawn_worker( - format!("{} index counter propagator", T::COUNTER_TABLE_NAME), - move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), - ); + background.spawn_worker(IndexPropagatorWorker { + index_counter: this.clone(), + propagate_rx, + buf: HashMap::new(), + errors: 0, + }); + this } @@ -239,68 +243,6 @@ impl IndexCounter { Ok(()) } - async fn propagate_loop( - self: Arc, - mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, - must_exit: watch::Receiver, - ) { - // This loop batches updates to counters to be sent all at once. - // They are sent once the propagate_rx channel has been emptied (or is closed). - let mut buf = HashMap::new(); - let mut errors = 0; - - loop { - let (ent, closed) = match propagate_rx.try_recv() { - Ok(ent) => (Some(ent), false), - Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => { - match propagate_rx.recv().await { - Some(ent) => (Some(ent), false), - None => (None, true), - } - } - Err(mpsc::error::TryRecvError::Empty) => (None, false), - Err(mpsc::error::TryRecvError::Disconnected) => (None, true), - }; - - if let Some((pk, sk, counters)) = ent { - let tree_key = self.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry(self.this_node); - match buf.entry(tree_key) { - hash_map::Entry::Vacant(e) => { - e.insert(dist_entry); - } - hash_map::Entry::Occupied(mut e) => { - e.get_mut().merge(&dist_entry); - } - } - // As long as we can add entries, loop back and add them to batch - // before sending batch to other nodes - continue; - } - - if !buf.is_empty() { - let entries = buf.iter().map(|(_k, v)| v); - if let Err(e) = self.table.insert_many(entries).await { - errors += 1; - if errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e); - break; - } - warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors); - tokio::time::sleep(Duration::from_secs(5)).await; - continue; - } - - buf.clear(); - errors = 0; - } - - if closed || *must_exit.borrow() { - break; - } - } - } - pub fn offline_recount_all( &self, counted_table: &Arc>, @@ -437,6 +379,86 @@ impl IndexCounter { } } +struct IndexPropagatorWorker { + index_counter: Arc>, + propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, + + buf: HashMap, CounterEntry>, + errors: usize, +} + +impl IndexPropagatorWorker { + fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry) { + let tree_key = self.index_counter.table.data.tree_key(&pk, &sk); + let dist_entry = counters.into_counter_entry(self.index_counter.this_node); + match self.buf.entry(tree_key) { + hash_map::Entry::Vacant(e) => { + e.insert(dist_entry); + } + hash_map::Entry::Occupied(mut e) => { + e.get_mut().merge(&dist_entry); + } + } + } +} + +#[async_trait] +impl Worker for IndexPropagatorWorker { + fn name(&self) -> String { + format!("{} index counter propagator", T::COUNTER_TABLE_NAME) + } + + async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result { + // This loop batches updates to counters to be sent all at once. + // They are sent once the propagate_rx channel has been emptied (or is closed). + let closed = loop { + match self.propagate_rx.try_recv() { + Ok((pk, sk, counters)) => { + self.add_ent(pk, sk, counters); + } + Err(mpsc::error::TryRecvError::Empty) => break false, + Err(mpsc::error::TryRecvError::Disconnected) => break true, + } + }; + + if !self.buf.is_empty() { + let entries = self.buf.iter().map(|(_k, v)| v); + if let Err(e) = self.index_counter.table.insert_many(entries).await { + self.errors += 1; + if self.errors >= 2 && *must_exit.borrow() { + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); + return Ok(WorkerStatus::Done); + } + warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, self.buf.len(), e, self.errors); + tokio::time::sleep(Duration::from_secs(5)).await; + return Ok(WorkerStatus::Busy); + } else { + self.buf.clear(); + self.errors = 0; + } + + return Ok(WorkerStatus::Busy); + } else if closed { + return Ok(WorkerStatus::Done); + } else { + return Ok(WorkerStatus::Idle); + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerStatus { + match self.propagate_rx.recv().await { + Some((pk, sk, counters)) => { + self.add_ent(pk, sk, counters); + WorkerStatus::Busy + } + None => match self.buf.is_empty() { + false => WorkerStatus::Busy, + true => WorkerStatus::Done, + }, + } + } +} + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] struct LocalCounterEntry { pk: T::CP,