Improvements to garbage collector (fix #39) #135
|
@ -12,6 +12,7 @@ use garage_util::error::*;
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
|
|
||||||
use crate::crdt::Crdt;
|
use crate::crdt::Crdt;
|
||||||
|
use crate::gc::GcTodoEntry;
|
||||||
use crate::replication::*;
|
use crate::replication::*;
|
||||||
use crate::schema::*;
|
use crate::schema::*;
|
||||||
|
|
||||||
|
@ -176,7 +177,7 @@ where
|
||||||
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
|
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
|
||||||
let nodes = self.replication.write_nodes(&pk_hash);
|
let nodes = self.replication.write_nodes(&pk_hash);
|
||||||
if nodes.first() == Some(&self.system.id) {
|
if nodes.first() == Some(&self.system.id) {
|
||||||
self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
|
GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
150
src/table/gc.rs
|
@ -12,7 +12,7 @@ use futures_util::future::*;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::Error;
|
use garage_util::error::*;
|
||||||
|
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
@ -24,7 +24,7 @@ use crate::schema::*;
|
||||||
const TABLE_GC_BATCH_SIZE: usize = 1024;
|
const TABLE_GC_BATCH_SIZE: usize = 1024;
|
||||||
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
|
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
|
||||||
pub struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
|
pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
data: Arc<TableData<F, R>>,
|
data: Arc<TableData<F, R>>,
|
||||||
|
|
||||||
|
@ -94,31 +94,45 @@ where
|
||||||
let mut entries = vec![];
|
let mut entries = vec![];
|
||||||
let mut excluded = vec![];
|
let mut excluded = vec![];
|
||||||
|
|
||||||
for item in self.data.gc_todo.iter() {
|
// List entries in the GC todo list
|
||||||
let (k, vhash) = item?;
|
// These entries are put there when a tombstone is inserted in the table
|
||||||
|
// This is detected and done in data.rs in update_entry
|
||||||
|
for entry_kv in self.data.gc_todo.iter() {
|
||||||
|
let (k, vhash) = entry_kv?;
|
||||||
|
let mut todo_entry = GcTodoEntry::parse(&k, &vhash);
|
||||||
|
|
||||||
let vhash = Hash::try_from(&vhash[..]).unwrap();
|
let vhash = Hash::try_from(&vhash[..]).unwrap();
|
||||||
|
|
||||||
let v_opt = self
|
// Check if the tombstone is still the current value of the entry.
|
||||||
|
// If not, we don't actually want to GC it, and we will remove it
|
||||||
|
// from the gc_todo table later (below).
|
||||||
|
todo_entry.value = self
|
||||||
.data
|
.data
|
||||||
.store
|
.store
|
||||||
.get(&k[..])?
|
.get(&k[..])?
|
||||||
.filter(|v| blake2sum(&v[..]) == vhash);
|
.filter(|v| blake2sum(&v[..]) == vhash)
|
||||||
|
.map(|v| v.to_vec());
|
||||||
|
|
||||||
if let Some(v) = v_opt {
|
if todo_entry.value.is_some() {
|
||||||
entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec())));
|
entries.push(todo_entry);
|
||||||
if entries.len() >= TABLE_GC_BATCH_SIZE {
|
if entries.len() >= TABLE_GC_BATCH_SIZE {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
excluded.push((k, vhash));
|
excluded.push(todo_entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (k, vhash) in excluded {
|
// Remove from gc_todo entries for tombstones where we have
|
||||||
self.todo_remove_if_equal(&k[..], vhash)?;
|
// detected that the current value has changed and
|
||||||
|
|||||||
|
// is no longer a tombstone.
|
||||||
|
for entry in excluded {
|
||||||
|
entry.remove_if_equal(&self.data.gc_todo)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remaining in `entries` is the list of entries we want to GC,
|
||||||
|
// and for which they are still currently tombstones in the table.
|
||||||
|
|
||||||
if entries.is_empty() {
|
if entries.is_empty() {
|
||||||
// Nothing to do in this iteration
|
// Nothing to do in this iteration
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
|
@ -126,9 +140,17 @@ where
|
||||||
|
|
||||||
debug!("({}) GC: doing {} items", self.data.name, entries.len());
|
debug!("({}) GC: doing {} items", self.data.name, entries.len());
|
||||||
|
|
||||||
|
// Split entries to GC by the set of nodes on which they are stored.
|
||||||
|
// Here we call them partitions but they are not exactly
|
||||||
|
// the same as partitions as defined in the ring: those partitions
|
||||||
|
// are defined by the first 8 bits of the hash, but two of these
|
||||||
|
// partitions can be stored on the same set of nodes.
|
||||||
|
// Here we detect when entries are stored on the same set of nodes:
|
||||||
|
// even if they are not in the same 8-bit partition, we can still
|
||||||
|
// handle them together.
|
||||||
let mut partitions = HashMap::new();
|
let mut partitions = HashMap::new();
|
||||||
for (k, vhash, v) in entries {
|
for entry in entries {
|
||||||
let pkh = Hash::try_from(&k[..32]).unwrap();
|
let pkh = Hash::try_from(&entry.key[..32]).unwrap();
|
||||||
let mut nodes = self.data.replication.write_nodes(&pkh);
|
let mut nodes = self.data.replication.write_nodes(&pkh);
|
||||||
nodes.retain(|x| *x != self.system.id);
|
nodes.retain(|x| *x != self.system.id);
|
||||||
nodes.sort();
|
nodes.sort();
|
||||||
|
@ -136,9 +158,12 @@ where
|
||||||
if !partitions.contains_key(&nodes) {
|
if !partitions.contains_key(&nodes) {
|
||||||
partitions.insert(nodes.clone(), vec![]);
|
partitions.insert(nodes.clone(), vec![]);
|
||||||
}
|
}
|
||||||
partitions.get_mut(&nodes).unwrap().push((k, vhash, v));
|
partitions.get_mut(&nodes).unwrap().push(entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For each set of nodes that contains some items,
|
||||||
|
// ensure they are aware of the tombstone status, and once they
|
||||||
|
// are, instruct them to delete the entries.
|
||||||
let resps = join_all(
|
let resps = join_all(
|
||||||
partitions
|
partitions
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -146,6 +171,8 @@ where
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
// Collect errors and return a single error value even if several
|
||||||
|
// errors occurred.
|
||||||
let mut errs = vec![];
|
let mut errs = vec![];
|
||||||
for resp in resps {
|
for resp in resps {
|
||||||
if let Err(e) = resp {
|
if let Err(e) = resp {
|
||||||
|
@ -162,23 +189,40 @@ where
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.join(", "),
|
.join(", "),
|
||||||
))
|
))
|
||||||
|
.err_context("in try_send_and_delete:")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_send_and_delete(
|
async fn try_send_and_delete(
|
||||||
&self,
|
&self,
|
||||||
nodes: Vec<Uuid>,
|
nodes: Vec<Uuid>,
|
||||||
items: Vec<(ByteBuf, Hash, ByteBuf)>,
|
items: Vec<GcTodoEntry>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let n_items = items.len();
|
let n_items = items.len();
|
||||||
|
|
||||||
|
// Strategy: we first send all of the values to the remote nodes,
|
||||||
|
// to ensure that they are aware of the tombstone state.
|
||||||
|
// (if they have a newer state that overrides the tombstone, that's fine).
|
||||||
|
// Second, once everyone is at least at the tombstone state,
|
||||||
|
// we instruct everyone to delete the tombstone IF that is still their current state.
|
||||||
|
// If they are now at a different state, it means that that state overrides the
|
||||||
|
// tombstone in the CRDT lattice, and it will be propagated back to us at some point
|
||||||
|
// (either just a regular update that hasn't reached us yet, or later when the
|
||||||
|
// table is synced).
|
||||||
|
// Here, we store in updates all of the tombstones to send for step 1,
|
||||||
|
// and in deletes the list of keys and hashes of value for step 2.
|
||||||
let mut updates = vec![];
|
let mut updates = vec![];
|
||||||
let mut deletes = vec![];
|
let mut deletes = vec![];
|
||||||
for (k, vhash, v) in items {
|
for item in items {
|
||||||
updates.push(v);
|
updates.push(ByteBuf::from(item.value.unwrap()));
|
||||||
deletes.push((k, vhash));
|
deletes.push((ByteBuf::from(item.key), item.value_hash));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Step 1: ensure everyone is at least at tombstone in CRDT lattice
|
||||||
|
// Here the quorum is nodes.len(): we cannot tolerate even a single failure,
|
||||||
|
// otherwise old values before the tombstone might come back in the data.
|
||||||
|
// GC'ing is not a critical function of the system, so it's not a big
|
||||||
|
// deal if we can't do it right now.
|
||||||
quentin
commented
Does it mean that deleting a tombstone is a particularly expensive operation? Could we create a deny of service simply by deleting lot of data in a row? or deleting lot of data in a row while a node is down? But it might be totally ok as it seems we batch our requests, can you confirm? Edit: I just understood that Does it mean that deleting a tombstone is a particularly expensive operation? Could we create a deny of service simply by deleting lot of data in a row? or deleting lot of data in a row while a node is down?
But it might be totally ok as it seems we batch our requests, can you confirm?
Edit: I just understood that `nodes` does not map to all nodes of the cluster but to all nodes of the same "partition"
lx
commented
It's not that expensive (tombstones are small and can be batched), and it's the cost of a correct algorithm :) At least we are not doing a full Raft or Paxos It's not that expensive (tombstones are small and can be batched), and it's the cost of a correct algorithm :)
At least we are not doing a full Raft or Paxos
|
|||||||
self.system
|
self.system
|
||||||
.rpc
|
.rpc
|
||||||
.try_call_many(
|
.try_call_many(
|
||||||
|
@ -189,13 +233,18 @@ where
|
||||||
.with_quorum(nodes.len())
|
.with_quorum(nodes.len())
|
||||||
.with_timeout(TABLE_GC_RPC_TIMEOUT),
|
.with_timeout(TABLE_GC_RPC_TIMEOUT),
|
||||||
)
|
)
|
||||||
.await?;
|
.await
|
||||||
|
.err_context("GC: send tombstones")?;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"({}) GC: {} items successfully pushed, will try to delete.",
|
"({}) GC: {} items successfully pushed, will try to delete.",
|
||||||
self.data.name, n_items
|
self.data.name, n_items
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Step 2: delete tombstones everywhere.
|
||||||
|
// Here we also fail if even a single node returns a failure:
|
||||||
|
// it means that the garbage collection wasn't completed and has
|
||||||
|
// to be retried later.
|
||||||
self.system
|
self.system
|
||||||
.rpc
|
.rpc
|
||||||
.try_call_many(
|
.try_call_many(
|
||||||
|
@ -206,11 +255,17 @@ where
|
||||||
.with_quorum(nodes.len())
|
.with_quorum(nodes.len())
|
||||||
.with_timeout(TABLE_GC_RPC_TIMEOUT),
|
.with_timeout(TABLE_GC_RPC_TIMEOUT),
|
||||||
)
|
)
|
||||||
.await?;
|
.await
|
||||||
|
.err_context("GC: remote delete tombstones")?;
|
||||||
|
|
||||||
|
// GC has been successfull for all of these entries.
|
||||||
|
// We now remove them all from our local table and from the GC todo list.
|
||||||
for (k, vhash) in deletes {
|
for (k, vhash) in deletes {
|
||||||
self.data.delete_if_equal_hash(&k[..], vhash)?;
|
self.data
|
||||||
self.todo_remove_if_equal(&k[..], vhash)?;
|
.delete_if_equal_hash(&k[..], vhash)
|
||||||
|
.err_context("GC: local delete tombstones")?;
|
||||||
|
self.todo_remove_if_equal(&k[..], vhash)
|
||||||
|
.err_context("GC: remove from todo list after successfull GC")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -248,3 +303,54 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An entry stored in the gc_todo Sled tree associated with the table
|
||||||
|
/// Contains helper function for parsing, saving, and removing
|
||||||
|
/// such entry in Sled
|
||||||
|
pub(crate) struct GcTodoEntry {
|
||||||
|
key: Vec<u8>,
|
||||||
|
value_hash: Hash,
|
||||||
|
value: Option<Vec<u8>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GcTodoEntry {
|
||||||
|
/// Creates a new GcTodoEntry (not saved in Sled) from its components:
|
||||||
|
/// the key of an entry in the table, and the hash of the associated
|
||||||
|
/// serialized value
|
||||||
|
pub(crate) fn new(key: Vec<u8>, value_hash: Hash) -> Self {
|
||||||
|
Self {
|
||||||
|
key,
|
||||||
|
value_hash,
|
||||||
|
value: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
quentin
commented
Here also we have some byte manipulations too, this is even the purpose of this object. Are we doing them because of a Sled limitation (it needs a primitive type as a key) or for backward compatibility reasons? (we were storing a byte array before so we keep it)? Are you not affraid that it could introduce bugs with time as we are loosing some checks done by the compiler? Here also we have some byte manipulations too, this is even the purpose of this object. Are we doing them because of a Sled limitation (it needs a primitive type as a key) or for backward compatibility reasons? (we were storing a byte array before so we keep it)? Are you not affraid that it could introduce bugs with time as we are loosing some checks done by the compiler?
lx
commented
We are limited by Sled that basically acts as a We are limited by Sled that basically acts as a `BTreeMap<Vec<u8>, Vec<u8>>`: we have to manage stuff as byte arrays for both keys and values. It's true we have to be extra carefull with this: that's the reason why I extracted the serialization and parsing logic in a separate struct. This would typically be a good place for unit testing if complexity gets out of hand (for now I don't think it has).
|
|||||||
|
/// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree
|
||||||
|
pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self {
|
||||||
|
Self {
|
||||||
|
key: sled_k.to_vec(),
|
||||||
|
value_hash: Hash::try_from(sled_v).unwrap(),
|
||||||
|
value: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Saves the GcTodoEntry in the gc_todo tree
|
||||||
|
pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
|
||||||
|
gc_todo_tree.insert(&self.key[..], self.value_hash.as_slice())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes the GcTodoEntry from the gc_todo tree if the
|
||||||
|
/// hash of the serialized value is the same here as in the tree.
|
||||||
|
/// This is usefull to remove a todo entry only under the condition
|
||||||
|
/// that it has not changed since the time it was read, i.e.
|
||||||
|
/// what we have to do is still the same
|
||||||
|
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
|
||||||
|
let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
|
||||||
|
&self.key[..],
|
||||||
|
Some(self.value_hash),
|
||||||
|
None,
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Can you confirm that one reason to invalid a tombstone is a case where I upload a file, detele it, and then reupload it?
Yes! For the object tables, an entry can take the following sequence of values: