Table updated trigger now happens in transaction, this is waaaay better!
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing

This commit is contained in:
Alex 2022-06-06 15:46:00 +02:00
parent c56d858834
commit 1897815358
Signed by: lx
GPG key ID: 0E496D15096376BE
9 changed files with 177 additions and 109 deletions

View file

@ -289,28 +289,41 @@ impl BlockManager {
/// Increment the number of time a block is used, putting it to resynchronization if it is /// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known /// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { pub fn block_incref(self: &Arc<Self>, tx: &mut db::Transaction, hash: Hash) -> db::Result<()> {
if self.rc.block_incref(hash)? { if self.rc.block_incref(tx, &hash)? {
// When the reference counter is incremented, there is // When the reference counter is incremented, there is
// normally a node that is responsible for sending us the // normally a node that is responsible for sending us the
// data of the block. However that operation may fail, // data of the block. However that operation may fail,
// so in all cases we add the block here to the todo list // so in all cases we add the block here to the todo list
// to check later that it arrived correctly, and if not // to check later that it arrived correctly, and if not
// we will fecth it from someone. // we will fecth it from someone.
self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; let this = self.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
if let Err(e) = this.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT) {
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
}
});
} }
Ok(()) Ok(())
} }
/// Decrement the number of time a block is used /// Decrement the number of time a block is used
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { pub fn block_decref(self: &Arc<Self>, tx: &mut db::Transaction, hash: Hash) -> db::Result<()> {
if self.rc.block_decref(hash)? { if self.rc.block_decref(tx, &hash)? {
// When the RC is decremented, it might drop to zero, // When the RC is decremented, it might drop to zero,
// indicating that we don't need the block. // indicating that we don't need the block.
// There is a delay before we garbage collect it; // There is a delay before we garbage collect it;
// make sure that it is handled in the resync loop // make sure that it is handled in the resync loop
// after that delay has passed. // after that delay has passed.
self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?; let this = self.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
if let Err(e) = this.put_to_resync(&hash, BLOCK_GC_DELAY + Duration::from_secs(10))
{
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
}
});
} }
Ok(()) Ok(())
} }
@ -510,12 +523,12 @@ impl BlockManager {
}); });
} }
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), db::Error> { fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
let when = now_msec() + delay.as_millis() as u64; let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when) self.put_to_resync_at(hash, when)
} }
fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), db::Error> { fn put_to_resync_at(&self, hash: &Hash, when: u64) -> db::Result<()> {
trace!("Put resync_queue: {} {:?}", when, hash); trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec(); let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref()); key.extend(hash.as_ref());

View file

@ -19,35 +19,29 @@ impl BlockRc {
/// Increment the reference counter associated to a hash. /// Increment the reference counter associated to a hash.
/// Returns true if the RC goes from zero to nonzero. /// Returns true if the RC goes from zero to nonzero.
pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> { pub(crate) fn block_incref(&self, tx: &mut db::Transaction, hash: &Hash) -> db::Result<bool> {
let old_rc = self.rc.db().transaction(|mut tx| { let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); match old_rc.increment().serialize() {
match old_rc.increment().serialize() { Some(x) => {
Some(x) => { tx.insert(&self.rc, &hash, x)?;
tx.insert(&self.rc, &hash, x)?; }
} None => unreachable!(),
None => unreachable!(), };
};
tx.commit(old_rc)
})?;
Ok(old_rc.is_zero()) Ok(old_rc.is_zero())
} }
/// Decrement the reference counter associated to a hash. /// Decrement the reference counter associated to a hash.
/// Returns true if the RC is now zero. /// Returns true if the RC is now zero.
pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> { pub(crate) fn block_decref(&self, tx: &mut db::Transaction, hash: &Hash) -> db::Result<bool> {
let new_rc = self.rc.db().transaction(|mut tx| { let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement();
let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); match new_rc.serialize() {
match new_rc.serialize() { Some(x) => {
Some(x) => { tx.insert(&self.rc, &hash, x)?;
tx.insert(&self.rc, &hash, x)?; }
} None => {
None => { tx.remove(&self.rc, &hash)?;
tx.remove(&self.rc, &hash)?; }
} };
};
tx.commit(new_rc)
})?;
Ok(matches!(new_rc, RcEntry::Deletable { .. })) Ok(matches!(new_rc, RcEntry::Deletable { .. }))
} }

View file

@ -116,8 +116,14 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
type E = CounterEntry<T>; type E = CounterEntry<T>;
type Filter = (DeletedFilter, Vec<Uuid>); type Filter = (DeletedFilter, Vec<Uuid>);
fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) { fn updated(
&self,
_tx: &mut db::Transaction,
_old: Option<&Self::E>,
_new: Option<&Self::E>,
) -> db::Result<()> {
// nothing for now // nothing for now
Ok(())
} }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
@ -176,36 +182,36 @@ impl<T: CounterSchema> IndexCounter<T> {
this this
} }
pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { pub fn count(
&self,
tx: &mut db::Transaction,
pk: &T::P,
sk: &T::S,
counts: &[(&str, i64)],
) -> db::TxResult<(), Error> {
let tree_key = self.table.data.tree_key(pk, sk); let tree_key = self.table.data.tree_key(pk, sk);
let new_entry = self.local_counter.db().transaction(|mut tx| { let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
Some(old_bytes) => { .map_err(Error::RmpDecode)
rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) .map_err(db::TxError::Abort)?,
.map_err(Error::RmpDecode) None => LocalCounterEntry {
.map_err(db::TxError::Abort)? values: BTreeMap::new(),
} },
None => LocalCounterEntry { };
values: BTreeMap::new(),
},
};
for (s, inc) in counts.iter() { for (s, inc) in counts.iter() {
let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
ent.0 += 1; ent.0 += 1;
ent.1 += *inc; ent.1 += *inc;
} }
let new_entry_bytes = rmp_to_vec_all_named(&entry) let new_entry_bytes = rmp_to_vec_all_named(&entry)
.map_err(Error::RmpEncode) .map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?; .map_err(db::TxError::Abort)?;
tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
Ok(entry) if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) {
})?;
if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) {
error!( error!(
"Could not propagate updated counter values, failed to send to channel: {}", "Could not propagate updated counter values, failed to send to channel: {}",
e e

View file

@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::Arc; use std::sync::Arc;
use garage_db as db;
use garage_util::data::*; use garage_util::data::*;
use garage_table::crdt::*; use garage_table::crdt::*;
@ -221,7 +222,12 @@ impl TableSchema for K2VItemTable {
type E = K2VItem; type E = K2VItem;
type Filter = ItemFilter; type Filter = ItemFilter;
fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { fn updated(
&self,
tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::Result<()> {
// 1. Count // 1. Count
let (old_entries, old_conflicts, old_values, old_bytes) = match old { let (old_entries, old_conflicts, old_values, old_bytes) = match old {
None => (0, 0, 0, 0), None => (0, 0, 0, 0),
@ -239,7 +245,8 @@ impl TableSchema for K2VItemTable {
.map(|e| &e.partition.partition_key) .map(|e| &e.partition.partition_key)
.unwrap_or_else(|| &new.unwrap().partition.partition_key); .unwrap_or_else(|| &new.unwrap().partition.partition_key);
if let Err(e) = self.counter_table.count( match self.counter_table.count(
tx,
&count_pk, &count_pk,
count_sk, count_sk,
&[ &[
@ -249,13 +256,25 @@ impl TableSchema for K2VItemTable {
(BYTES, new_bytes - old_bytes), (BYTES, new_bytes - old_bytes),
], ],
) { ) {
error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); Ok(()) => (),
Err(db::TxError::Db(e)) => return Err(e),
Err(db::TxError::Abort(e)) => {
// This result can be returned by `counter_table.count()` for instance
// if messagepack serialization or deserialization fails at some step.
// Warn admin but ignore this error for now, that's all we can do.
error!(
"Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!",
count_pk, count_sk, e
);
}
} }
// 2. Notify // 2. Notify
if let Some(new_ent) = new { if let Some(new_ent) = new {
self.subscriptions.notify(new_ent); self.subscriptions.notify(new_ent);
} }
Ok(())
} }
#[allow(clippy::nonminimal_bool)] #[allow(clippy::nonminimal_bool)]

View file

@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use garage_db as db;
use garage_util::data::*; use garage_util::data::*;
use garage_table::crdt::Crdt; use garage_table::crdt::Crdt;
@ -51,21 +53,23 @@ impl TableSchema for BlockRefTable {
type E = BlockRef; type E = BlockRef;
type Filter = DeletedFilter; type Filter = DeletedFilter;
fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { fn updated(
&self,
tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::Result<()> {
#[allow(clippy::or_fun_call)] #[allow(clippy::or_fun_call)]
let block = &old.or(new).unwrap().block; let block = old.or(new).unwrap().block;
let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false);
let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false); let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false);
if is_after && !was_before { if is_after && !was_before {
if let Err(e) = self.block_manager.block_incref(block) { self.block_manager.block_incref(tx, block)?;
warn!("block_incref failed for block {:?}: {}", block, e);
}
} }
if was_before && !is_after { if was_before && !is_after {
if let Err(e) = self.block_manager.block_decref(block) { self.block_manager.block_decref(tx, block)?;
warn!("block_decref failed for block {:?}: {}", block, e);
}
} }
Ok(())
} }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {

View file

@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::Arc; use std::sync::Arc;
use garage_db as db;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
@ -232,7 +234,12 @@ impl TableSchema for ObjectTable {
type E = Object; type E = Object;
type Filter = ObjectFilter; type Filter = ObjectFilter;
fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { fn updated(
&self,
_tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::Result<()> {
let version_table = self.version_table.clone(); let version_table = self.version_table.clone();
let old = old.cloned(); let old = old.cloned();
let new = new.cloned(); let new = new.cloned();
@ -259,7 +266,8 @@ impl TableSchema for ObjectTable {
} }
} }
Ok(()) Ok(())
}) });
Ok(())
} }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {

View file

@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use garage_db as db;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
@ -137,7 +139,12 @@ impl TableSchema for VersionTable {
type E = Version; type E = Version;
type Filter = DeletedFilter; type Filter = DeletedFilter;
fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { fn updated(
&self,
_tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::Result<()> {
let block_ref_table = self.block_ref_table.clone(); let block_ref_table = self.block_ref_table.clone();
let old = old.cloned(); let old = old.cloned();
let new = new.cloned(); let new = new.cloned();
@ -160,7 +167,9 @@ impl TableSchema for VersionTable {
} }
} }
Ok(()) Ok(())
}) });
Ok(())
} }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {

View file

@ -209,17 +209,20 @@ where
let new_bytes_hash = blake2sum(&new_bytes[..]); let new_bytes_hash = blake2sum(&new_bytes[..]);
tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?; tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?;
tx.insert(&self.store, tree_key, new_bytes)?; tx.insert(&self.store, tree_key, new_bytes)?;
Ok(Some((old_entry, new_entry, new_bytes_hash)))
self.instance
.updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
Ok(Some((new_entry, new_bytes_hash)))
} else { } else {
Ok(None) Ok(None)
} }
})?; })?;
if let Some((old_entry, new_entry, new_bytes_hash)) = changed { if let Some((new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1); self.metrics.internal_update_counter.add(1);
let is_tombstone = new_entry.is_tombstone(); let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry.as_ref(), Some(&new_entry));
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
if is_tombstone { if is_tombstone {
// We are only responsible for GC'ing this item if we are the // We are only responsible for GC'ing this item if we are the
@ -242,20 +245,23 @@ where
} }
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
let removed = self.store.db().transaction(|mut tx| { let removed = self
let remove = matches!(tx.get(&self.store, k)?, Some(cur_v) if cur_v == v); .store
if remove { .db()
tx.remove(&self.store, k)?; .transaction(|mut tx| match tx.get(&self.store, k)? {
tx.insert(&self.merkle_todo, k, vec![])?; Some(cur_v) if cur_v == v => {
} tx.remove(&self.store, k)?;
Ok(remove) tx.insert(&self.merkle_todo, k, vec![])?;
})?;
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
self.instance.updated(&mut tx, Some(&old_entry), None)?;
Ok(true)
}
_ => Ok(false),
})?;
if removed { if removed {
self.metrics.internal_delete_counter.add(1); self.metrics.internal_delete_counter.add(1);
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(&old_entry), None);
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
} }
Ok(removed) Ok(removed)
@ -266,26 +272,26 @@ where
k: &[u8], k: &[u8],
vhash: Hash, vhash: Hash,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let removed = self.store.db().transaction(|mut tx| { let removed = self
let remove_v = match tx.get(&self.store, k)? { .store
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => Some(cur_v), .db()
_ => None, .transaction(|mut tx| match tx.get(&self.store, k)? {
}; Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
if remove_v.is_some() { tx.remove(&self.store, k)?;
tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?;
tx.insert(&self.merkle_todo, k, vec![])?;
}
Ok(remove_v)
})?;
if let Some(old_v) = removed { let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
let old_entry = self.decode_entry(&old_v[..])?; self.instance.updated(&mut tx, Some(&old_entry), None)?;
self.instance.updated(Some(&old_entry), None); Ok(true)
}
_ => Ok(false),
})?;
if removed {
self.metrics.internal_delete_counter.add(1);
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
Ok(true)
} else {
Ok(false)
} }
Ok(removed)
} }
// ---- Utility functions ---- // ---- Utility functions ----

View file

@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_db as db;
use garage_util::data::*; use garage_util::data::*;
use crate::crdt::Crdt; use crate::crdt::Crdt;
@ -82,11 +83,19 @@ pub trait TableSchema: Send + Sync {
None None
} }
// Updated triggers some stuff downstream, but it is not supposed to block or fail, /// Actions triggered by data changing in a table. If such actions
// as the update itself is an unchangeable fact that will never go back /// include updates to the local database that should be applied
// due to CRDT logic. Typically errors in propagation of info should be logged /// atomically with the item update itself, a db transaction is
// to stderr. /// provided on which these changes should be done.
fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {} /// This function can return a DB error but that's all.
fn updated(
&self,
_tx: &mut db::Transaction,
_old: Option<&Self::E>,
_new: Option<&Self::E>,
) -> db::Result<()> {
Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool; fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
} }