diff --git a/src/block/manager.rs b/src/block/manager.rs index abc5063d..b7dcaf8a 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -289,28 +289,41 @@ impl BlockManager { /// Increment the number of time a block is used, putting it to resynchronization if it is /// required, but not known - pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - if self.rc.block_incref(hash)? { + pub fn block_incref(self: &Arc, tx: &mut db::Transaction, hash: Hash) -> db::Result<()> { + if self.rc.block_incref(tx, &hash)? { // When the reference counter is incremented, there is // normally a node that is responsible for sending us the // data of the block. However that operation may fail, // so in all cases we add the block here to the todo list // to check later that it arrived correctly, and if not // we will fecth it from someone. - self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; + let this = self.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + if let Err(e) = this.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT) { + error!("Block {:?} could not be put in resync queue: {}.", hash, e); + } + }); } Ok(()) } /// Decrement the number of time a block is used - pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - if self.rc.block_decref(hash)? { + pub fn block_decref(self: &Arc, tx: &mut db::Transaction, hash: Hash) -> db::Result<()> { + if self.rc.block_decref(tx, &hash)? { // When the RC is decremented, it might drop to zero, // indicating that we don't need the block. // There is a delay before we garbage collect it; // make sure that it is handled in the resync loop // after that delay has passed. - self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?; + let this = self.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + if let Err(e) = this.put_to_resync(&hash, BLOCK_GC_DELAY + Duration::from_secs(10)) + { + error!("Block {:?} could not be put in resync queue: {}.", hash, e); + } + }); } Ok(()) } @@ -510,12 +523,12 @@ impl BlockManager { }); } - fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), db::Error> { + fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> { let when = now_msec() + delay.as_millis() as u64; self.put_to_resync_at(hash, when) } - fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), db::Error> { + fn put_to_resync_at(&self, hash: &Hash, when: u64) -> db::Result<()> { trace!("Put resync_queue: {} {:?}", when, hash); let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); diff --git a/src/block/rc.rs b/src/block/rc.rs index e0b952fd..42cdf241 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -19,35 +19,29 @@ impl BlockRc { /// Increment the reference counter associated to a hash. /// Returns true if the RC goes from zero to nonzero. - pub(crate) fn block_incref(&self, hash: &Hash) -> Result { - let old_rc = self.rc.db().transaction(|mut tx| { - let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); - match old_rc.increment().serialize() { - Some(x) => { - tx.insert(&self.rc, &hash, x)?; - } - None => unreachable!(), - }; - tx.commit(old_rc) - })?; + pub(crate) fn block_incref(&self, tx: &mut db::Transaction, hash: &Hash) -> db::Result { + let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); + match old_rc.increment().serialize() { + Some(x) => { + tx.insert(&self.rc, &hash, x)?; + } + None => unreachable!(), + }; Ok(old_rc.is_zero()) } /// Decrement the reference counter associated to a hash. /// Returns true if the RC is now zero. - pub(crate) fn block_decref(&self, hash: &Hash) -> Result { - let new_rc = self.rc.db().transaction(|mut tx| { - let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); - match new_rc.serialize() { - Some(x) => { - tx.insert(&self.rc, &hash, x)?; - } - None => { - tx.remove(&self.rc, &hash)?; - } - }; - tx.commit(new_rc) - })?; + pub(crate) fn block_decref(&self, tx: &mut db::Transaction, hash: &Hash) -> db::Result { + let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); + match new_rc.serialize() { + Some(x) => { + tx.insert(&self.rc, &hash, x)?; + } + None => { + tx.remove(&self.rc, &hash)?; + } + }; Ok(matches!(new_rc, RcEntry::Deletable { .. })) } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 6f81be3e..4fec1138 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -116,8 +116,14 @@ impl TableSchema for CounterTable { type E = CounterEntry; type Filter = (DeletedFilter, Vec); - fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + _old: Option<&Self::E>, + _new: Option<&Self::E>, + ) -> db::Result<()> { // nothing for now + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { @@ -176,36 +182,36 @@ impl IndexCounter { this } - pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { + pub fn count( + &self, + tx: &mut db::Transaction, + pk: &T::P, + sk: &T::S, + counts: &[(&str, i64)], + ) -> db::TxResult<(), Error> { let tree_key = self.table.data.tree_key(pk, sk); - let new_entry = self.local_counter.db().transaction(|mut tx| { - let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { - Some(old_bytes) => { - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(db::TxError::Abort)? - } - None => LocalCounterEntry { - values: BTreeMap::new(), - }, - }; + let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { + Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)?, + None => LocalCounterEntry { + values: BTreeMap::new(), + }, + }; - for (s, inc) in counts.iter() { - let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); - ent.0 += 1; - ent.1 += *inc; - } + for (s, inc) in counts.iter() { + let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); + ent.0 += 1; + ent.1 += *inc; + } - let new_entry_bytes = rmp_to_vec_all_named(&entry) - .map_err(Error::RmpEncode) - .map_err(db::TxError::Abort)?; - tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; + let new_entry_bytes = rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?; + tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; - Ok(entry) - })?; - - if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) { + if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) { error!( "Could not propagate updated counter values, failed to send to channel: {}", e diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 8b7cc08a..77446f64 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use garage_db as db; use garage_util::data::*; use garage_table::crdt::*; @@ -221,7 +222,12 @@ impl TableSchema for K2VItemTable { type E = K2VItem; type Filter = ItemFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::Result<()> { // 1. Count let (old_entries, old_conflicts, old_values, old_bytes) = match old { None => (0, 0, 0, 0), @@ -239,7 +245,8 @@ impl TableSchema for K2VItemTable { .map(|e| &e.partition.partition_key) .unwrap_or_else(|| &new.unwrap().partition.partition_key); - if let Err(e) = self.counter_table.count( + match self.counter_table.count( + tx, &count_pk, count_sk, &[ @@ -249,13 +256,25 @@ impl TableSchema for K2VItemTable { (BYTES, new_bytes - old_bytes), ], ) { - error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); + Ok(()) => (), + Err(db::TxError::Db(e)) => return Err(e), + Err(db::TxError::Abort(e)) => { + // This result can be returned by `counter_table.count()` for instance + // if messagepack serialization or deserialization fails at some step. + // Warn admin but ignore this error for now, that's all we can do. + error!( + "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!", + count_pk, count_sk, e + ); + } } // 2. Notify if let Some(new_ent) = new { self.subscriptions.notify(new_ent); } + + Ok(()) } #[allow(clippy::nonminimal_bool)] diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index 9b3991bf..2c06bc96 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use garage_db as db; + use garage_util::data::*; use garage_table::crdt::Crdt; @@ -51,21 +53,23 @@ impl TableSchema for BlockRefTable { type E = BlockRef; type Filter = DeletedFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::Result<()> { #[allow(clippy::or_fun_call)] - let block = &old.or(new).unwrap().block; + let block = old.or(new).unwrap().block; let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false); if is_after && !was_before { - if let Err(e) = self.block_manager.block_incref(block) { - warn!("block_incref failed for block {:?}: {}", block, e); - } + self.block_manager.block_incref(tx, block)?; } if was_before && !is_after { - if let Err(e) = self.block_manager.block_decref(block) { - warn!("block_decref failed for block {:?}: {}", block, e); - } + self.block_manager.block_decref(tx, block)?; } + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 3d9a89f7..f3bd9892 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -232,7 +234,12 @@ impl TableSchema for ObjectTable { type E = Object; type Filter = ObjectFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::Result<()> { let version_table = self.version_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -259,7 +266,8 @@ impl TableSchema for ObjectTable { } } Ok(()) - }) + }); + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index ad096772..d168c2c2 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -137,7 +139,12 @@ impl TableSchema for VersionTable { type E = Version; type Filter = DeletedFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::Result<()> { let block_ref_table = self.block_ref_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -160,7 +167,9 @@ impl TableSchema for VersionTable { } } Ok(()) - }) + }); + + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/table/data.rs b/src/table/data.rs index 6352ac24..e688168f 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -209,17 +209,20 @@ where let new_bytes_hash = blake2sum(&new_bytes[..]); tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?; tx.insert(&self.store, tree_key, new_bytes)?; - Ok(Some((old_entry, new_entry, new_bytes_hash))) + + self.instance + .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?; + + Ok(Some((new_entry, new_bytes_hash))) } else { Ok(None) } })?; - if let Some((old_entry, new_entry, new_bytes_hash)) = changed { + if let Some((new_entry, new_bytes_hash)) = changed { self.metrics.internal_update_counter.add(1); let is_tombstone = new_entry.is_tombstone(); - self.instance.updated(old_entry.as_ref(), Some(&new_entry)); self.merkle_todo_notify.notify_one(); if is_tombstone { // We are only responsible for GC'ing this item if we are the @@ -242,20 +245,23 @@ where } pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { - let removed = self.store.db().transaction(|mut tx| { - let remove = matches!(tx.get(&self.store, k)?, Some(cur_v) if cur_v == v); - if remove { - tx.remove(&self.store, k)?; - tx.insert(&self.merkle_todo, k, vec![])?; - } - Ok(remove) - })?; + let removed = self + .store + .db() + .transaction(|mut tx| match tx.get(&self.store, k)? { + Some(cur_v) if cur_v == v => { + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; + + let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; + self.instance.updated(&mut tx, Some(&old_entry), None)?; + Ok(true) + } + _ => Ok(false), + })?; if removed { self.metrics.internal_delete_counter.add(1); - - let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(&old_entry), None); self.merkle_todo_notify.notify_one(); } Ok(removed) @@ -266,26 +272,26 @@ where k: &[u8], vhash: Hash, ) -> Result { - let removed = self.store.db().transaction(|mut tx| { - let remove_v = match tx.get(&self.store, k)? { - Some(cur_v) if blake2sum(&cur_v[..]) == vhash => Some(cur_v), - _ => None, - }; - if remove_v.is_some() { - tx.remove(&self.store, k)?; - tx.insert(&self.merkle_todo, k, vec![])?; - } - Ok(remove_v) - })?; + let removed = self + .store + .db() + .transaction(|mut tx| match tx.get(&self.store, k)? { + Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { + tx.remove(&self.store, k)?; + tx.insert(&self.merkle_todo, k, vec![])?; - if let Some(old_v) = removed { - let old_entry = self.decode_entry(&old_v[..])?; - self.instance.updated(Some(&old_entry), None); + let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; + self.instance.updated(&mut tx, Some(&old_entry), None)?; + Ok(true) + } + _ => Ok(false), + })?; + + if removed { + self.metrics.internal_delete_counter.add(1); self.merkle_todo_notify.notify_one(); - Ok(true) - } else { - Ok(false) } + Ok(removed) } // ---- Utility functions ---- diff --git a/src/table/schema.rs b/src/table/schema.rs index 37327037..393c7388 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; +use garage_db as db; use garage_util::data::*; use crate::crdt::Crdt; @@ -82,11 +83,19 @@ pub trait TableSchema: Send + Sync { None } - // Updated triggers some stuff downstream, but it is not supposed to block or fail, - // as the update itself is an unchangeable fact that will never go back - // due to CRDT logic. Typically errors in propagation of info should be logged - // to stderr. - fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {} + /// Actions triggered by data changing in a table. If such actions + /// include updates to the local database that should be applied + /// atomically with the item update itself, a db transaction is + /// provided on which these changes should be done. + /// This function can return a DB error but that's all. + fn updated( + &self, + _tx: &mut db::Transaction, + _old: Option<&Self::E>, + _new: Option<&Self::E>, + ) -> db::Result<()> { + Ok(()) + } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool; }