Leader-based GC
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Alex 2021-03-16 18:42:33 +01:00
parent 08bcd51956
commit 7b10245dfb
2 changed files with 18 additions and 4 deletions

View file

@ -9,13 +9,16 @@ use tokio::sync::Notify;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_rpc::membership::System;
use crate::crdt::CRDT; use crate::crdt::CRDT;
use crate::replication::*; use crate::replication::*;
use crate::schema::*; use crate::schema::*;
pub struct TableData<F: TableSchema, R: TableReplication> { pub struct TableData<F: TableSchema, R: TableReplication> {
pub name: String, system: Arc<System>,
pub name: String,
pub(crate) instance: F, pub(crate) instance: F,
pub(crate) replication: R, pub(crate) replication: R,
@ -32,7 +35,7 @@ where
F: TableSchema, F: TableSchema,
R: TableReplication, R: TableReplication,
{ {
pub fn new(name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { pub fn new(system: Arc<System>, name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
let store = db let store = db
.open_tree(&format!("{}:table", name)) .open_tree(&format!("{}:table", name))
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
@ -49,6 +52,7 @@ where
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
Arc::new(Self { Arc::new(Self {
system,
name, name,
instance, instance,
replication, replication,
@ -157,7 +161,17 @@ where
self.instance.updated(old_entry, Some(new_entry)); self.instance.updated(old_entry, Some(new_entry));
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
if is_tombstone { if is_tombstone {
self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; // We are only responsible for GC'ing this item if we are the
// "leader" of the partition, i.e. the first node in the
// set of nodes that replicates this partition.
// This avoids GC loops and does not change the termination properties
// of the GC algorithm, as in all cases GC is suspended if
// any node of the partition is unavailable.
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
let nodes = self.replication.write_nodes(&pk_hash);
if nodes.first() == Some(&self.system.id) {
self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
}
} }
} }

View file

@ -64,7 +64,7 @@ where
let rpc_path = format!("table_{}", name); let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
let data = TableData::new(name, instance, replication, db); let data = TableData::new(system.clone(), name, instance, replication, db);
let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());