diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 14db3523..23e13109 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -1,8 +1,10 @@ use std::collections::{BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; +use std::time::Duration; use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, watch}; use garage_rpc::ring::Ring; use garage_rpc::system::System; @@ -134,6 +136,7 @@ impl TableSchema for CounterTable { pub struct IndexCounter { this_node: Uuid, local_counter: sled::Tree, + propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, pub table: Arc, TableShardedReplication>>, } @@ -143,11 +146,16 @@ impl IndexCounter { replication: TableShardedReplication, db: &sled::Db, ) -> Arc { - Arc::new(Self { + let background = system.background.clone(); + + let (propagate_tx, propagate_rx) = mpsc::unbounded_channel(); + + let this = Arc::new(Self { this_node: system.id, local_counter: db .open_tree(format!("local_counter:{}", T::NAME)) .expect("Unable to open local counter tree"), + propagate_tx, table: Table::new( CounterTable { _phantom_t: Default::default(), @@ -156,7 +164,14 @@ impl IndexCounter { system, db, ), - }) + }); + + let this2 = this.clone(); + background.spawn_worker( + format!("{} index counter propagator", T::NAME), + move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), + ); + this } pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { @@ -188,19 +203,68 @@ impl IndexCounter { Ok(entry) })?; - let table = self.table.clone(); - let this_node = self.this_node; - let pk = pk.clone(); - let sk = sk.clone(); - tokio::spawn(async move { - let dist_entry = new_entry.into_counter_entry::(this_node, pk, sk); - if let Err(e) = table.insert(&dist_entry).await { - warn!("({}) Could not propagate counter value: {}", T::NAME, e); - } - }); + if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) { + error!( + "Could not propagate updated counter values, failed to send to channel: {}", + e + ); + } Ok(()) } + + async fn propagate_loop( + self: Arc, + mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>, + must_exit: watch::Receiver, + ) { + // This loop batches updates to counters to be sent all at once. + // They are sent once the propagate_rx channel has been emptied (or is closed). + let mut buf = vec![]; + let mut errors = 0; + + loop { + let (ent, closed) = match propagate_rx.try_recv() { + Ok(ent) => (Some(ent), false), + Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => { + match propagate_rx.recv().await { + Some(ent) => (Some(ent), false), + None => (None, true), + } + } + Err(mpsc::error::TryRecvError::Empty) => (None, false), + Err(mpsc::error::TryRecvError::Disconnected) => (None, true), + }; + + if let Some((pk, sk, counters)) = ent { + let dist_entry = counters.into_counter_entry::(self.this_node, pk, sk); + buf.push(dist_entry); + // As long as we can add entries, loop back and add them to batch + // before sending batch to other nodes + continue; + } + + if !buf.is_empty() { + if let Err(e) = self.table.insert_many(&buf[..]).await { + errors += 1; + if errors >= 2 && *must_exit.borrow() { + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e); + break; + } + warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + + buf.clear(); + errors = 0; + } + + if closed || *must_exit.borrow() { + break; + } + } + } } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index b21c78d3..8b7cc08a 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -222,6 +222,7 @@ impl TableSchema for K2VItemTable { type Filter = ItemFilter; fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + // 1. Count let (old_entries, old_conflicts, old_values, old_bytes) = match old { None => (0, 0, 0, 0), Some(e) => e.stats(), @@ -251,6 +252,7 @@ impl TableSchema for K2VItemTable { error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); } + // 2. Notify if let Some(new_ent) = new { self.subscriptions.notify(new_ent); }