Fix a bug when a migration is followed by a rebalance
Nodes would stabilize on different encoding formats for the values, some having the pre-migration format and some having the post-migration format. This would be reflected in the Merkle trees never converging and thus having an infinite resync loop.
This commit is contained in:
parent
4ae03aa774
commit
af261e1789
1 changed files with 18 additions and 9 deletions
|
@ -130,22 +130,31 @@ where
|
||||||
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
|
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
|
||||||
|
|
||||||
let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
|
let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
|
||||||
let (old_entry, new_entry) = match store.get(&tree_key)? {
|
let (old_entry, old_bytes, new_entry) = match store.get(&tree_key)? {
|
||||||
Some(prev_bytes) => {
|
Some(old_bytes) => {
|
||||||
let old_entry = self
|
let old_entry = self
|
||||||
.decode_entry(&prev_bytes)
|
.decode_entry(&old_bytes)
|
||||||
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
|
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
|
||||||
let mut new_entry = old_entry.clone();
|
let mut new_entry = old_entry.clone();
|
||||||
new_entry.merge(&update);
|
new_entry.merge(&update);
|
||||||
(Some(old_entry), new_entry)
|
(Some(old_entry), Some(old_bytes), new_entry)
|
||||||
}
|
}
|
||||||
None => (None, update.clone()),
|
None => (None, None, update.clone()),
|
||||||
};
|
};
|
||||||
|
|
||||||
if Some(&new_entry) != old_entry.as_ref() {
|
// Scenario 1: the value changed, so of course there is a change
|
||||||
|
let value_changed = Some(&new_entry) != old_entry.as_ref();
|
||||||
|
|
||||||
|
// Scenario 2: the value didn't change but due to a migration in the
|
||||||
|
// data format, the messagepack encoding changed. In this case
|
||||||
|
// we have to write the migrated value in the table and update
|
||||||
|
// the associated Merkle tree entry.
|
||||||
let new_bytes = rmp_to_vec_all_named(&new_entry)
|
let new_bytes = rmp_to_vec_all_named(&new_entry)
|
||||||
.map_err(Error::RmpEncode)
|
.map_err(Error::RmpEncode)
|
||||||
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
|
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
|
||||||
|
let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
|
||||||
|
|
||||||
|
if value_changed || encoding_changed {
|
||||||
let new_bytes_hash = blake2sum(&new_bytes[..]);
|
let new_bytes_hash = blake2sum(&new_bytes[..]);
|
||||||
mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
|
mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
|
||||||
store.insert(tree_key.clone(), new_bytes)?;
|
store.insert(tree_key.clone(), new_bytes)?;
|
||||||
|
|
Loading…
Reference in a new issue