From cc255d46cdde707e6dec3e9a12ee4cd1ea484046 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 28 Oct 2021 12:49:37 +0200 Subject: [PATCH 1/4] Refactor and comment table GC logic --- src/table/data.rs | 3 +- src/table/gc.rs | 150 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 130 insertions(+), 23 deletions(-) diff --git a/src/table/data.rs b/src/table/data.rs index ffd494d5..13ec62bf 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -12,6 +12,7 @@ use garage_util::error::*; use garage_rpc::system::System; use crate::crdt::Crdt; +use crate::gc::GcTodoEntry; use crate::replication::*; use crate::schema::*; @@ -176,7 +177,7 @@ where let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let nodes = self.replication.write_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { - self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; + GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; } } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 9b3d60ff..cbd91b3a 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -12,7 +12,7 @@ use futures_util::future::*; use tokio::sync::watch; use garage_util::data::*; -use garage_util::error::Error; +use garage_util::error::*; use garage_rpc::system::System; use garage_rpc::*; @@ -24,7 +24,7 @@ use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); -pub struct TableGc { +pub(crate) struct TableGc { system: Arc, data: Arc>, @@ -94,31 +94,45 @@ where let mut entries = vec![]; let mut excluded = vec![]; - for item in self.data.gc_todo.iter() { - let (k, vhash) = item?; + // List entries in the GC todo list + // These entries are put there when a tombstone is inserted in the table + // This is detected and done in data.rs in update_entry + for entry_kv in self.data.gc_todo.iter() { + let (k, vhash) = entry_kv?; + let mut todo_entry = GcTodoEntry::parse(&k, &vhash); let vhash = Hash::try_from(&vhash[..]).unwrap(); - let v_opt = self + // Check if the tombstone is still the current value of the entry. + // If not, we don't actually want to GC it, and we will remove it + // from the gc_todo table later (below). + todo_entry.value = self .data .store .get(&k[..])? - .filter(|v| blake2sum(&v[..]) == vhash); + .filter(|v| blake2sum(&v[..]) == vhash) + .map(|v| v.to_vec()); - if let Some(v) = v_opt { - entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec()))); + if todo_entry.value.is_some() { + entries.push(todo_entry); if entries.len() >= TABLE_GC_BATCH_SIZE { break; } } else { - excluded.push((k, vhash)); + excluded.push(todo_entry); } } - for (k, vhash) in excluded { - self.todo_remove_if_equal(&k[..], vhash)?; + // Remove from gc_todo entries for tombstones where we have + // detected that the current value has changed and + // is no longer a tombstone. + for entry in excluded { + entry.remove_if_equal(&self.data.gc_todo)?; } + // Remaining in `entries` is the list of entries we want to GC, + // and for which they are still currently tombstones in the table. + if entries.is_empty() { // Nothing to do in this iteration return Ok(false); @@ -126,9 +140,17 @@ where debug!("({}) GC: doing {} items", self.data.name, entries.len()); + // Split entries to GC by the set of nodes on which they are stored. + // Here we call them partitions but they are not exactly + // the same as partitions as defined in the ring: those partitions + // are defined by the first 8 bits of the hash, but two of these + // partitions can be stored on the same set of nodes. + // Here we detect when entries are stored on the same set of nodes: + // even if they are not in the same 8-bit partition, we can still + // handle them together. let mut partitions = HashMap::new(); - for (k, vhash, v) in entries { - let pkh = Hash::try_from(&k[..32]).unwrap(); + for entry in entries { + let pkh = Hash::try_from(&entry.key[..32]).unwrap(); let mut nodes = self.data.replication.write_nodes(&pkh); nodes.retain(|x| *x != self.system.id); nodes.sort(); @@ -136,9 +158,12 @@ where if !partitions.contains_key(&nodes) { partitions.insert(nodes.clone(), vec![]); } - partitions.get_mut(&nodes).unwrap().push((k, vhash, v)); + partitions.get_mut(&nodes).unwrap().push(entry); } + // For each set of nodes that contains some items, + // ensure they are aware of the tombstone status, and once they + // are, instruct them to delete the entries. let resps = join_all( partitions .into_iter() @@ -146,6 +171,8 @@ where ) .await; + // Collect errors and return a single error value even if several + // errors occurred. let mut errs = vec![]; for resp in resps { if let Err(e) = resp { @@ -162,23 +189,40 @@ where .collect::>() .join(", "), )) + .err_context("in try_send_and_delete:") } } async fn try_send_and_delete( &self, nodes: Vec, - items: Vec<(ByteBuf, Hash, ByteBuf)>, + items: Vec, ) -> Result<(), Error> { let n_items = items.len(); + // Strategy: we first send all of the values to the remote nodes, + // to ensure that they are aware of the tombstone state. + // (if they have a newer state that overrides the tombstone, that's fine). + // Second, once everyone is at least at the tombstone state, + // we instruct everyone to delete the tombstone IF that is still their current state. + // If they are now at a different state, it means that that state overrides the + // tombstone in the CRDT lattice, and it will be propagated back to us at some point + // (either just a regular update that hasn't reached us yet, or later when the + // table is synced). + // Here, we store in updates all of the tombstones to send for step 1, + // and in deletes the list of keys and hashes of value for step 2. let mut updates = vec![]; let mut deletes = vec![]; - for (k, vhash, v) in items { - updates.push(v); - deletes.push((k, vhash)); + for item in items { + updates.push(ByteBuf::from(item.value.unwrap())); + deletes.push((ByteBuf::from(item.key), item.value_hash)); } + // Step 1: ensure everyone is at least at tombstone in CRDT lattice + // Here the quorum is nodes.len(): we cannot tolerate even a single failure, + // otherwise old values before the tombstone might come back in the data. + // GC'ing is not a critical function of the system, so it's not a big + // deal if we can't do it right now. self.system .rpc .try_call_many( @@ -189,13 +233,18 @@ where .with_quorum(nodes.len()) .with_timeout(TABLE_GC_RPC_TIMEOUT), ) - .await?; + .await + .err_context("GC: send tombstones")?; info!( "({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items ); + // Step 2: delete tombstones everywhere. + // Here we also fail if even a single node returns a failure: + // it means that the garbage collection wasn't completed and has + // to be retried later. self.system .rpc .try_call_many( @@ -206,11 +255,17 @@ where .with_quorum(nodes.len()) .with_timeout(TABLE_GC_RPC_TIMEOUT), ) - .await?; + .await + .err_context("GC: remote delete tombstones")?; + // GC has been successfull for all of these entries. + // We now remove them all from our local table and from the GC todo list. for (k, vhash) in deletes { - self.data.delete_if_equal_hash(&k[..], vhash)?; - self.todo_remove_if_equal(&k[..], vhash)?; + self.data + .delete_if_equal_hash(&k[..], vhash) + .err_context("GC: local delete tombstones")?; + self.todo_remove_if_equal(&k[..], vhash) + .err_context("GC: remove from todo list after successfull GC")?; } Ok(()) @@ -248,3 +303,54 @@ where } } } + +/// An entry stored in the gc_todo Sled tree associated with the table +/// Contains helper function for parsing, saving, and removing +/// such entry in Sled +pub(crate) struct GcTodoEntry { + key: Vec, + value_hash: Hash, + value: Option>, +} + +impl GcTodoEntry { + /// Creates a new GcTodoEntry (not saved in Sled) from its components: + /// the key of an entry in the table, and the hash of the associated + /// serialized value + pub(crate) fn new(key: Vec, value_hash: Hash) -> Self { + Self { + key, + value_hash, + value: None, + } + } + + /// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree + pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self { + Self { + key: sled_k.to_vec(), + value_hash: Hash::try_from(sled_v).unwrap(), + value: None, + } + } + + /// Saves the GcTodoEntry in the gc_todo tree + pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { + gc_todo_tree.insert(&self.key[..], self.value_hash.as_slice())?; + Ok(()) + } + + /// Removes the GcTodoEntry from the gc_todo tree if the + /// hash of the serialized value is the same here as in the tree. + /// This is usefull to remove a todo entry only under the condition + /// that it has not changed since the time it was read, i.e. + /// what we have to do is still the same + pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { + let _ = gc_todo_tree.compare_and_swap::<_, _, Vec>( + &self.key[..], + Some(self.value_hash), + None, + )?; + Ok(()) + } +} -- 2.40.1 From 74a7a550eb5c076d21185d44a1bdb60577a47779 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 28 Oct 2021 14:32:55 +0200 Subject: [PATCH 2/4] Safety: never voluntarily delete block in 10min interval after RC reaches zero --- src/garage/admin.rs | 2 +- src/garage/cli/structs.rs | 3 +- src/model/block.rs | 228 ++++++++++++++++++++++++++++++-------- 3 files changed, 182 insertions(+), 51 deletions(-) diff --git a/src/garage/admin.rs b/src/garage/admin.rs index b5fc9a7e..c3a83d02 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -446,7 +446,7 @@ impl AdminRpcHandler { if opt.detailed { writeln!( &mut ret, - " number of blocks: {}", + " number of RC entries (~= number of blocks): {}", self.garage.block_manager.rc_len() ) .unwrap(); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 0df6ef87..b930d8a8 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -8,8 +8,7 @@ pub enum Command { #[structopt(name = "server")] Server, - /// Print identifier (public key) of this garage node. - /// Generates a new keypair if necessary. + /// Print identifier (public key) of this Garage node #[structopt(name = "node-id")] NodeId(NodeIdOpt), diff --git a/src/model/block.rs b/src/model/block.rs index 406abf7b..3f40aaab 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -31,10 +31,20 @@ pub const INLINE_THRESHOLD: usize = 3072; pub const BACKGROUND_WORKERS: u64 = 1; pub const BACKGROUND_TRANQUILITY: u32 = 3; -const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); -const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); +// Timeout for RPCs that read and write blocks to remote nodes +const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30); +// Timeout for RPCs that ask other nodes whether they need a copy +// of a given block before we delete it locally const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); -const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); + +// The delay between the time where a resync operation fails +// and the time when it is retried. +const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60); + +// The delay between the moment when the reference counter +// drops to zero, and the moment where we allow ourselves +// to delete the block locally. +const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] @@ -180,7 +190,7 @@ impl BlockManager { /// that are required because of refcount > 0, and will try /// to fix any mismatch between the two. pub async fn repair_data_store(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - // 1. Repair blocks from RC table + // 1. Repair blocks from RC table. let garage = self.garage.load_full().unwrap(); let mut last_hash = None; for (i, entry) in garage.block_ref_table.data.store.iter().enumerate() { @@ -245,40 +255,51 @@ impl BlockManager { /// Increment the number of time a block is used, putting it to resynchronization if it is /// required, but not known pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - let old_rc = self.rc.fetch_and_update(&hash, |old| { - let old_v = old.map(u64_from_be_bytes).unwrap_or(0); - Some(u64::to_be_bytes(old_v + 1).to_vec()) - })?; - let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0); - if old_rc == 0 { - self.put_to_resync(hash, BLOCK_RW_TIMEOUT)?; + let old_rc = self + .rc + .fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?; + let old_rc = RcEntry::parse_opt(old_rc); + if old_rc.is_zero() { + // When the reference counter is incremented, there is + // normally a node that is responsible for sending us the + // data of the block. However that operation may fail, + // so in all cases we add the block here to the todo list + // to check later that it arrived correctly, and if not + // we will fecth it from someone. + self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; } Ok(()) } /// Decrement the number of time a block is used pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - let new_rc = self.rc.update_and_fetch(&hash, |old| { - let old_v = old.map(u64_from_be_bytes).unwrap_or(0); - if old_v > 1 { - Some(u64::to_be_bytes(old_v - 1).to_vec()) - } else { - None - } - })?; - if new_rc.is_none() { - self.put_to_resync(hash, BLOCK_GC_TIMEOUT)?; + let new_rc = self + .rc + .update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?; + let new_rc = RcEntry::parse_opt(new_rc); + if let RcEntry::Deletable { .. } = new_rc { + self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?; } Ok(()) } /// Read a block's reference count - pub fn get_block_rc(&self, hash: &Hash) -> Result { - Ok(self - .rc - .get(hash.as_ref())? - .map(u64_from_be_bytes) - .unwrap_or(0)) + fn get_block_rc(&self, hash: &Hash) -> Result { + Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?)) + } + + /// Delete an entry in the RC table if it is deletable and the + /// deletion time has passed + fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { + let now = now_msec(); + self.rc.update_and_fetch(&hash, |rcval| { + let updated = match RcEntry::parse_opt(rcval) { + RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent, + v => v, + }; + updated.serialize() + })?; + Ok(()) } // ---- Reading and writing blocks locally ---- @@ -300,7 +321,7 @@ impl BlockManager { Ok(f) => f, Err(e) => { // Not found but maybe we should have had it ?? - self.put_to_resync(hash, Duration::from_millis(0))?; + self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; return Err(Into::into(e)); } }; @@ -314,6 +335,7 @@ impl BlockManager { .await .move_block_to_corrupted(hash, self) .await?; + self.put_to_resync(hash, Duration::from_millis(0))?; return Err(Error::CorruptData(*hash)); } @@ -328,7 +350,7 @@ impl BlockManager { .await .check_block_status(hash, self) .await?; - Ok(needed && !exists) + Ok(needed.is_nonzero() && !exists) } /// Utility: gives the path of the directory in which a block should be found @@ -349,7 +371,7 @@ impl BlockManager { // ---- Resync loop ---- pub fn spawn_background_worker(self: Arc) { - // Launch 2 simultaneous workers for background resync loop preprocessing + // Launch n simultaneous workers for background resync loop preprocessing for i in 0..BACKGROUND_WORKERS { let bm2 = self.clone(); let background = self.system.background.clone(); @@ -407,7 +429,7 @@ impl BlockManager { let res = self.resync_block(&hash).await; if let Err(e) = &res { warn!("Error when resyncing {:?}: {}", hash, e); - self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; + self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?; } Ok(true) } else { @@ -437,15 +459,18 @@ impl BlockManager { .check_block_status(hash, self) .await?; - if exists != needed { - info!( - "Resync block {:?}: exists {}, needed {}", - hash, exists, needed + if exists != needed.is_needed() || exists != needed.is_nonzero() { + debug!( + "Resync block {:?}: exists {}, nonzero rc {}, deletable {}", + hash, + exists, + needed.is_nonzero(), + needed.is_deletable(), ); } - if exists && !needed { - trace!("Offloading block {:?}", hash); + if exists && needed.is_deletable() { + info!("Resync block {:?}: offloading and deleting", hash); let mut who = self.replication.write_nodes(hash); if who.len() < self.replication.write_quorum() { @@ -488,7 +513,7 @@ impl BlockManager { need_nodes.len() ); - let put_block_message = self.read_block(hash).await.err_context("PutBlock RPC")?; + let put_block_message = self.read_block(hash).await?; self.system .rpc .try_call_many( @@ -499,10 +524,11 @@ impl BlockManager { .with_quorum(need_nodes.len()) .with_timeout(BLOCK_RW_TIMEOUT), ) - .await?; + .await + .err_context("PutBlock RPC")?; } info!( - "Deleting block {:?}, offload finished ({} / {})", + "Deleting unneeded block {:?}, offload finished ({} / {})", hash, need_nodes.len(), who.len() @@ -513,12 +539,16 @@ impl BlockManager { .await .delete_if_unneeded(hash, self) .await?; + + self.clear_deleted_block_rc(hash)?; } - if needed && !exists { - // TODO find a way to not do this if they are sending it to us - // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay - // between the RC being incremented and this part being called. + if needed.is_nonzero() && !exists { + info!( + "Resync block {:?}: fetching absent but needed block (refcount > 0)", + hash + ); + let block_data = self.rpc_get_block(hash).await?; self.write_block(hash, &block_data[..]).await?; } @@ -526,6 +556,8 @@ impl BlockManager { Ok(()) } + // ---- Utility: iteration on files in the data directory ---- + async fn for_each_file( &self, state: State, @@ -608,7 +640,7 @@ impl EndpointHandler for BlockManager { struct BlockStatus { exists: bool, - needed: bool, + needed: RcEntry, } impl BlockManagerLocked { @@ -620,7 +652,7 @@ impl BlockManagerLocked { let path = mgr.block_path(hash); let exists = fs::metadata(&path).await.is_ok(); - let needed = mgr.get_block_rc(hash)? > 0; + let needed = mgr.get_block_rc(hash)?; Ok(BlockStatus { exists, needed }) } @@ -659,14 +691,13 @@ impl BlockManagerLocked { let mut path2 = path.clone(); path2.set_extension("corrupted"); fs::rename(path, path2).await?; - mgr.put_to_resync(hash, Duration::from_millis(0))?; Ok(()) } async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; - if exists && !needed { + if exists && needed.is_deletable() { let path = mgr.block_path(hash); fs::remove_file(path).await?; } @@ -680,3 +711,104 @@ fn u64_from_be_bytes>(bytes: T) -> u64 { x8.copy_from_slice(bytes.as_ref()); u64::from_be_bytes(x8) } + +/// Describes the state of the reference counter for a block +#[derive(Clone, Copy, Debug)] +enum RcEntry { + /// Present: the block has `count` references, with `count` > 0. + /// + /// This is stored as u64::to_be_bytes(count) + Present { count: u64 }, + + /// Deletable: the block has zero references, and can be deleted + /// once time (returned by now_msec) is larger than at_time + /// (in millis since Unix epoch) + /// + /// This is stored as [0u8; 8] followed by u64::to_be_bytes(at_time), + /// (this allows for the data format to be backwards compatible with + /// previous Garage versions that didn't have this intermediate state) + Deletable { at_time: u64 }, + + /// Absent: the block has zero references, and can be deleted + /// immediately + Absent, +} + +impl RcEntry { + fn parse(bytes: &[u8]) -> Self { + if bytes.len() == 8 { + RcEntry::Present { + count: u64::from_be_bytes(bytes.try_into().unwrap()), + } + } else if bytes.len() == 16 { + RcEntry::Deletable { + at_time: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), + } + } else { + panic!("Invalid RC entry: {:?}, database is corrupted. This is an error Garage is currently unable to recover from. Sorry, and also please report a bug.", + bytes + ) + } + } + + fn parse_opt>(bytes: Option) -> Self { + bytes + .map(|b| Self::parse(b.as_ref())) + .unwrap_or(Self::Absent) + } + + fn serialize(self) -> Option> { + match self { + RcEntry::Present { count } => Some(u64::to_be_bytes(count).to_vec()), + RcEntry::Deletable { at_time } => { + Some([u64::to_be_bytes(0), u64::to_be_bytes(at_time)].concat()) + } + RcEntry::Absent => None, + } + } + + fn increment(self) -> Self { + let old_count = match self { + RcEntry::Present { count } => count, + _ => 0, + }; + RcEntry::Present { + count: old_count + 1, + } + } + + fn decrement(self) -> Self { + match self { + RcEntry::Present { count } => { + if count > 1 { + RcEntry::Present { count: count - 1 } + } else { + RcEntry::Deletable { + at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64, + } + } + } + del => del, + } + } + + fn is_zero(&self) -> bool { + matches!(self, RcEntry::Deletable { .. } | RcEntry::Absent) + } + + fn is_nonzero(&self) -> bool { + !self.is_zero() + } + + fn is_deletable(&self) -> bool { + match self { + RcEntry::Present { .. } => false, + RcEntry::Deletable { at_time } => now_msec() > *at_time, + RcEntry::Absent => true, + } + } + + fn is_needed(&self) -> bool { + !self.is_deletable() + } +} -- 2.40.1 From ad7ab3141141d292f6f925d1712f0ebf4f906692 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 Nov 2021 22:07:43 +0100 Subject: [PATCH 3/4] Implement GC delay for table data --- src/model/block.rs | 10 +---- src/table/data.rs | 2 +- src/table/gc.rs | 108 ++++++++++++++++++++++++++++++--------------- 3 files changed, 76 insertions(+), 44 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index 3f40aaab..8b1919bb 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -1,3 +1,4 @@ +use std::convert::TryInto; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -422,7 +423,7 @@ impl BlockManager { async fn resync_iter(&self, must_exit: &mut watch::Receiver) -> Result { if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { - let time_msec = u64_from_be_bytes(&time_bytes[0..8]); + let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let now = now_msec(); if now >= time_msec { let hash = Hash::try_from(&hash_bytes[..]).unwrap(); @@ -705,13 +706,6 @@ impl BlockManagerLocked { } } -fn u64_from_be_bytes>(bytes: T) -> u64 { - assert!(bytes.as_ref().len() == 8); - let mut x8 = [0u8; 8]; - x8.copy_from_slice(bytes.as_ref()); - u64::from_be_bytes(x8) -} - /// Describes the state of the reference counter for a block #[derive(Clone, Copy, Debug)] enum RcEntry { diff --git a/src/table/data.rs b/src/table/data.rs index 13ec62bf..fb0b6d02 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -55,7 +55,7 @@ where .expect("Unable to open DB Merkle TODO tree"); let gc_todo = db - .open_tree(&format!("{}:gc_todo", name)) + .open_tree(&format!("{}:gc_todo_v2", name)) .expect("Unable to open DB tree"); Arc::new(Self { diff --git a/src/table/gc.rs b/src/table/gc.rs index cbd91b3a..98d7c95d 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; @@ -13,6 +14,7 @@ use tokio::sync::watch; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_rpc::system::System; use garage_rpc::*; @@ -24,6 +26,11 @@ use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); +// GC delay for table entries: 1 day (24 hours) +// (the delay before the entry is added in the GC todo list +// and the moment the garbage collection actually happens) +const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600); + pub(crate) struct TableGc { system: Arc, data: Arc>, @@ -72,35 +79,49 @@ where async fn gc_loop(self: Arc, mut must_exit: watch::Receiver) { while !*must_exit.borrow() { match self.gc_loop_iter().await { - Ok(true) => { + Ok(None) => { // Stuff was done, loop immediately - continue; } - Ok(false) => { - // Nothing was done, sleep for some time (below) + Ok(Some(wait_delay)) => { + // Nothing was done, wait specified delay. + select! { + _ = tokio::time::sleep(wait_delay).fuse() => {}, + _ = must_exit.changed().fuse() => {}, + } } Err(e) => { warn!("({}) Error doing GC: {}", self.data.name, e); } } - select! { - _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } } } - async fn gc_loop_iter(&self) -> Result { + async fn gc_loop_iter(&self) -> Result, Error> { + let now = now_msec(); + let mut entries = vec![]; let mut excluded = vec![]; // List entries in the GC todo list // These entries are put there when a tombstone is inserted in the table - // This is detected and done in data.rs in update_entry + // (see update_entry in data.rs) for entry_kv in self.data.gc_todo.iter() { let (k, vhash) = entry_kv?; let mut todo_entry = GcTodoEntry::parse(&k, &vhash); + if todo_entry.deletion_time() > now { + if entries.is_empty() && excluded.is_empty() { + // If the earliest entry in the todo list shouldn't yet be processed, + // return a duration to wait in the loop + return Ok(Some(Duration::from_millis( + todo_entry.deletion_time() - now, + ))); + } else { + // Otherwise we have some entries to process, do a normal iteration. + break; + } + } + let vhash = Hash::try_from(&vhash[..]).unwrap(); // Check if the tombstone is still the current value of the entry. @@ -134,8 +155,9 @@ where // and for which they are still currently tombstones in the table. if entries.is_empty() { - // Nothing to do in this iteration - return Ok(false); + // Nothing to do in this iteration (no entries present) + // Wait for a default delay of 60 seconds + return Ok(Some(Duration::from_secs(60))); } debug!("({}) GC: doing {} items", self.data.name, entries.len()); @@ -181,7 +203,7 @@ where } if errs.is_empty() { - Ok(true) + Ok(None) } else { Err(Error::Message( errs.into_iter() @@ -189,19 +211,20 @@ where .collect::>() .join(", "), )) - .err_context("in try_send_and_delete:") + .err_context("in try_send_and_delete in table GC:") } } async fn try_send_and_delete( &self, nodes: Vec, - items: Vec, + mut items: Vec, ) -> Result<(), Error> { let n_items = items.len(); // Strategy: we first send all of the values to the remote nodes, - // to ensure that they are aware of the tombstone state. + // to ensure that they are aware of the tombstone state, + // and that the previous state was correctly overwritten // (if they have a newer state that overrides the tombstone, that's fine). // Second, once everyone is at least at the tombstone state, // we instruct everyone to delete the tombstone IF that is still their current state. @@ -209,13 +232,14 @@ where // tombstone in the CRDT lattice, and it will be propagated back to us at some point // (either just a regular update that hasn't reached us yet, or later when the // table is synced). + // Here, we store in updates all of the tombstones to send for step 1, // and in deletes the list of keys and hashes of value for step 2. let mut updates = vec![]; let mut deletes = vec![]; - for item in items { - updates.push(ByteBuf::from(item.value.unwrap())); - deletes.push((ByteBuf::from(item.key), item.value_hash)); + for item in items.iter_mut() { + updates.push(ByteBuf::from(item.value.take().unwrap())); + deletes.push((ByteBuf::from(item.key.clone()), item.value_hash)); } // Step 1: ensure everyone is at least at tombstone in CRDT lattice @@ -250,7 +274,7 @@ where .try_call_many( &self.endpoint, &nodes[..], - GcRpc::DeleteIfEqualHash(deletes.clone()), + GcRpc::DeleteIfEqualHash(deletes), RequestStrategy::with_priority(PRIO_BACKGROUND) .with_quorum(nodes.len()) .with_timeout(TABLE_GC_RPC_TIMEOUT), @@ -260,24 +284,16 @@ where // GC has been successfull for all of these entries. // We now remove them all from our local table and from the GC todo list. - for (k, vhash) in deletes { + for item in items { self.data - .delete_if_equal_hash(&k[..], vhash) + .delete_if_equal_hash(&item.key[..], item.value_hash) .err_context("GC: local delete tombstones")?; - self.todo_remove_if_equal(&k[..], vhash) + item.remove_if_equal(&self.data.gc_todo) .err_context("GC: remove from todo list after successfull GC")?; } Ok(()) } - - fn todo_remove_if_equal(&self, key: &[u8], vhash: Hash) -> Result<(), Error> { - let _ = self - .data - .gc_todo - .compare_and_swap::<_, _, Vec>(key, Some(vhash), None)?; - Ok(()) - } } #[async_trait] @@ -295,7 +311,6 @@ where GcRpc::DeleteIfEqualHash(items) => { for (key, vhash) in items.iter() { self.data.delete_if_equal_hash(&key[..], *vhash)?; - self.todo_remove_if_equal(&key[..], *vhash)?; } Ok(GcRpc::Ok) } @@ -307,7 +322,16 @@ where /// An entry stored in the gc_todo Sled tree associated with the table /// Contains helper function for parsing, saving, and removing /// such entry in Sled +/// +/// Format of an entry: +/// - key = 8 bytes: timestamp of tombstone +/// (used to implement GC delay) +/// n bytes: key in the main data table +/// - value = hash of the table entry to delete (the tombstone) +/// for verification purpose, because we don't want to delete +/// things that aren't tombstones pub(crate) struct GcTodoEntry { + tombstone_timestamp: u64, key: Vec, value_hash: Hash, value: Option>, @@ -319,6 +343,7 @@ impl GcTodoEntry { /// serialized value pub(crate) fn new(key: Vec, value_hash: Hash) -> Self { Self { + tombstone_timestamp: now_msec(), key, value_hash, value: None, @@ -328,7 +353,8 @@ impl GcTodoEntry { /// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self { Self { - key: sled_k.to_vec(), + tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()), + key: sled_k[8..].to_vec(), value_hash: Hash::try_from(sled_v).unwrap(), value: None, } @@ -336,7 +362,7 @@ impl GcTodoEntry { /// Saves the GcTodoEntry in the gc_todo tree pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { - gc_todo_tree.insert(&self.key[..], self.value_hash.as_slice())?; + gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; Ok(()) } @@ -347,10 +373,22 @@ impl GcTodoEntry { /// what we have to do is still the same pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { let _ = gc_todo_tree.compare_and_swap::<_, _, Vec>( - &self.key[..], + &self.todo_table_key()[..], Some(self.value_hash), None, )?; Ok(()) } + + fn todo_table_key(&self) -> Vec { + [ + &u64::to_be_bytes(self.tombstone_timestamp)[..], + &self.key[..], + ] + .concat() + } + + fn deletion_time(&self) -> u64 { + self.tombstone_timestamp + TABLE_GC_DELAY.as_millis() as u64 + } } -- 2.40.1 From 08b1e8a7ea30b3004065f9a646f8f795458bda44 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 8 Nov 2021 16:03:15 +0100 Subject: [PATCH 4/4] Move design draft to separate file; write about GC in internals --- doc/book/src/SUMMARY.md | 1 + doc/book/src/design/design_draft.md | 162 +++++++++++++++++++ doc/book/src/design/internals.md | 231 ++++++++++------------------ 3 files changed, 247 insertions(+), 147 deletions(-) create mode 100644 doc/book/src/design/design_draft.md diff --git a/doc/book/src/SUMMARY.md b/doc/book/src/SUMMARY.md index bdc35135..2095523f 100644 --- a/doc/book/src/SUMMARY.md +++ b/doc/book/src/SUMMARY.md @@ -29,6 +29,7 @@ - [Design](./design/index.md) - [Related Work](./design/related_work.md) - [Internals](./design/internals.md) + - [Design draft](./design/design_draft.md) - [Development](./development/index.md) - [Setup your environment](./development/devenv.md) diff --git a/doc/book/src/design/design_draft.md b/doc/book/src/design/design_draft.md new file mode 100644 index 00000000..06ed46bd --- /dev/null +++ b/doc/book/src/design/design_draft.md @@ -0,0 +1,162 @@ +# Design draft + +**WARNING: this documentation is a design draft which was written before Garage's actual implementation. +The general principle are similar, but details have not been updated.** + + +#### Modules + +- `membership/`: configuration, membership management (gossip of node's presence and status), ring generation --> what about Serf (used by Consul/Nomad) : https://www.serf.io/? Seems a huge library with many features so maybe overkill/hard to integrate +- `metadata/`: metadata management +- `blocks/`: block management, writing, GC and rebalancing +- `internal/`: server to server communication (HTTP server and client that reuses connections, TLS if we want, etc) +- `api/`: S3 API +- `web/`: web management interface + +#### Metadata tables + +**Objects:** + +- *Hash key:* Bucket name (string) +- *Sort key:* Object key (string) +- *Sort key:* Version timestamp (int) +- *Sort key:* Version UUID (string) +- Complete: bool +- Inline: bool, true for objects < threshold (say 1024) +- Object size (int) +- Mime type (string) +- Data for inlined objects (blob) +- Hash of first block otherwise (string) + +*Having only a hash key on the bucket name will lead to storing all file entries of this table for a specific bucket on a single node. At the same time, it is the only way I see to rapidly being able to list all bucket entries...* + +**Blocks:** + +- *Hash key:* Version UUID (string) +- *Sort key:* Offset of block in total file (int) +- Hash of data block (string) + +A version is defined by the existence of at least one entry in the blocks table for a certain version UUID. +We must keep the following invariant: if a version exists in the blocks table, it has to be referenced in the objects table. +We explicitly manage concurrent versions of an object: the version timestamp and version UUID columns are index columns, thus we may have several concurrent versions of an object. +Important: before deleting an older version from the objects table, we must make sure that we did a successfull delete of the blocks of that version from the blocks table. + +Thus, the workflow for reading an object is as follows: + +1. Check permissions (LDAP) +2. Read entry in object table. If data is inline, we have its data, stop here. + -> if several versions, take newest one and launch deletion of old ones in background +3. Read first block from cluster. If size <= 1 block, stop here. +4. Simultaneously with previous step, if size > 1 block: query the Blocks table for the IDs of the next blocks +5. Read subsequent blocks from cluster + +Workflow for PUT: + +1. Check write permission (LDAP) +2. Select a new version UUID +3. Write a preliminary entry for the new version in the objects table with complete = false +4. Send blocks to cluster and write entries in the blocks table +5. Update the version with complete = true and all of the accurate information (size, etc) +6. Return success to the user +7. Launch a background job to check and delete older versions + +Workflow for DELETE: + +1. Check write permission (LDAP) +2. Get current version (or versions) in object table +3. Do the deletion of those versions NOT IN A BACKGROUND JOB THIS TIME +4. Return succes to the user if we were able to delete blocks from the blocks table and entries from the object table + +To delete a version: + +1. List the blocks from Cassandra +2. For each block, delete it from cluster. Don't care if some deletions fail, we can do GC. +3. Delete all of the blocks from the blocks table +4. Finally, delete the version from the objects table + +Known issue: if someone is reading from a version that we want to delete and the object is big, the read might be interrupted. I think it is ok to leave it like this, we just cut the connection if data disappears during a read. + +("Soit P un problème, on s'en fout est une solution à ce problème") + +#### Block storage on disk + +**Blocks themselves:** + +- file path = /blobs/(first 3 hex digits of hash)/(rest of hash) + +**Reverse index for GC & other block-level metadata:** + +- file path = /meta/(first 3 hex digits of hash)/(rest of hash) +- map block hash -> set of version UUIDs where it is referenced + +Usefull metadata: + +- list of versions that reference this block in the Casandra table, so that we can do GC by checking in Cassandra that the lines still exist +- list of other nodes that we know have acknowledged a write of this block, usefull in the rebalancing algorithm + +Write strategy: have a single thread that does all write IO so that it is serialized (or have several threads that manage independent parts of the hash space). When writing a blob, write it to a temporary file, close, then rename so that a concurrent read gets a consistent result (either not found or found with whole content). + +Read strategy: the only read operation is get(hash) that returns either the data or not found (can do a corruption check as well and return corrupted state if it is the case). Can be done concurrently with writes. + +**Internal API:** + +- get(block hash) -> ok+data/not found/corrupted +- put(block hash & data, version uuid + offset) -> ok/error +- put with no data(block hash, version uuid + offset) -> ok/not found plz send data/error +- delete(block hash, version uuid + offset) -> ok/error + +GC: when last ref is deleted, delete block. +Long GC procedure: check in Cassandra that version UUIDs still exist and references this block. + +Rebalancing: takes as argument the list of newly added nodes. + +- List all blocks that we have. For each block: +- If it hits a newly introduced node, send it to them. + Use put with no data first to check if it has to be sent to them already or not. + Use a random listing order to avoid race conditions (they do no harm but we might have two nodes sending the same thing at the same time thus wasting time). +- If it doesn't hit us anymore, delete it and its reference list. + +Only one balancing can be running at a same time. It can be restarted at the beginning with new parameters. + +#### Membership management + +Two sets of nodes: + +- set of nodes from which a ping was recently received, with status: number of stored blocks, request counters, error counters, GC%, rebalancing% + (eviction from this set after say 30 seconds without ping) +- set of nodes that are part of the system, explicitly modified by the operator using the web UI (persisted to disk), + is a CRDT using a version number for the value of the whole set + +Thus, three states for nodes: + +- healthy: in both sets +- missing: not pingable but part of desired cluster +- unused/draining: currently present but not part of the desired cluster, empty = if contains nothing, draining = if still contains some blocks + +Membership messages between nodes: + +- ping with current state + hash of current membership info -> reply with same info +- send&get back membership info (the ids of nodes that are in the two sets): used when no local membership change in a long time and membership info hash discrepancy detected with first message (passive membership fixing with full CRDT gossip) +- inform of newly pingable node(s) -> no result, when receive new info repeat to all (reliable broadcast) +- inform of operator membership change -> no result, when receive new info repeat to all (reliable broadcast) + +Ring: generated from the desired set of nodes, however when doing read/writes on the ring, skip nodes that are known to be not pingable. +The tokens are generated in a deterministic fashion from node IDs (hash of node id + token number from 1 to K). +Number K of tokens per node: decided by the operator & stored in the operator's list of nodes CRDT. Default value proposal: with node status information also broadcast disk total size and free space, and propose a default number of tokens equal to 80%Free space / 10Gb. (this is all user interface) + + +#### Constants + +- Block size: around 1MB ? --> Exoscale use 16MB chunks +- Number of tokens in the hash ring: one every 10Gb of allocated storage +- Threshold for storing data directly in Cassandra objects table: 1kb bytes (maybe up to 4kb?) +- Ping timeout (time after which a node is registered as unresponsive/missing): 30 seconds +- Ping interval: 10 seconds +- ?? + +#### Links + +- CDC: +- Erasure coding: +- [Openstack Storage Concepts](https://docs.openstack.org/arch-design/design-storage/design-storage-concepts.html) +- [RADOS](https://ceph.com/wp-content/uploads/2016/08/weil-rados-pdsw07.pdf) diff --git a/doc/book/src/design/internals.md b/doc/book/src/design/internals.md index e712ae07..255335fa 100644 --- a/doc/book/src/design/internals.md +++ b/doc/book/src/design/internals.md @@ -1,158 +1,95 @@ -**WARNING: this documentation is more a "design draft", which was written before Garage's actual implementation. The general principle is similar but details have not yet been updated.** +# Internals -#### Modules +## Overview -- `membership/`: configuration, membership management (gossip of node's presence and status), ring generation --> what about Serf (used by Consul/Nomad) : https://www.serf.io/? Seems a huge library with many features so maybe overkill/hard to integrate -- `metadata/`: metadata management -- `blocks/`: block management, writing, GC and rebalancing -- `internal/`: server to server communication (HTTP server and client that reuses connections, TLS if we want, etc) -- `api/`: S3 API -- `web/`: web management interface +TODO: write this section -#### Metadata tables +- The Dynamo ring -**Objects:** +- CRDTs -- *Hash key:* Bucket name (string) -- *Sort key:* Object key (string) -- *Sort key:* Version timestamp (int) -- *Sort key:* Version UUID (string) -- Complete: bool -- Inline: bool, true for objects < threshold (say 1024) -- Object size (int) -- Mime type (string) -- Data for inlined objects (blob) -- Hash of first block otherwise (string) +- Consistency model of Garage tables -*Having only a hash key on the bucket name will lead to storing all file entries of this table for a specific bucket on a single node. At the same time, it is the only way I see to rapidly being able to list all bucket entries...* - -**Blocks:** - -- *Hash key:* Version UUID (string) -- *Sort key:* Offset of block in total file (int) -- Hash of data block (string) - -A version is defined by the existence of at least one entry in the blocks table for a certain version UUID. -We must keep the following invariant: if a version exists in the blocks table, it has to be referenced in the objects table. -We explicitly manage concurrent versions of an object: the version timestamp and version UUID columns are index columns, thus we may have several concurrent versions of an object. -Important: before deleting an older version from the objects table, we must make sure that we did a successfull delete of the blocks of that version from the blocks table. - -Thus, the workflow for reading an object is as follows: - -1. Check permissions (LDAP) -2. Read entry in object table. If data is inline, we have its data, stop here. - -> if several versions, take newest one and launch deletion of old ones in background -3. Read first block from cluster. If size <= 1 block, stop here. -4. Simultaneously with previous step, if size > 1 block: query the Blocks table for the IDs of the next blocks -5. Read subsequent blocks from cluster - -Workflow for PUT: - -1. Check write permission (LDAP) -2. Select a new version UUID -3. Write a preliminary entry for the new version in the objects table with complete = false -4. Send blocks to cluster and write entries in the blocks table -5. Update the version with complete = true and all of the accurate information (size, etc) -6. Return success to the user -7. Launch a background job to check and delete older versions - -Workflow for DELETE: - -1. Check write permission (LDAP) -2. Get current version (or versions) in object table -3. Do the deletion of those versions NOT IN A BACKGROUND JOB THIS TIME -4. Return succes to the user if we were able to delete blocks from the blocks table and entries from the object table - -To delete a version: - -1. List the blocks from Cassandra -2. For each block, delete it from cluster. Don't care if some deletions fail, we can do GC. -3. Delete all of the blocks from the blocks table -4. Finally, delete the version from the objects table - -Known issue: if someone is reading from a version that we want to delete and the object is big, the read might be interrupted. I think it is ok to leave it like this, we just cut the connection if data disappears during a read. - -("Soit P un problème, on s'en fout est une solution à ce problème") - -#### Block storage on disk - -**Blocks themselves:** - -- file path = /blobs/(first 3 hex digits of hash)/(rest of hash) - -**Reverse index for GC & other block-level metadata:** - -- file path = /meta/(first 3 hex digits of hash)/(rest of hash) -- map block hash -> set of version UUIDs where it is referenced - -Usefull metadata: - -- list of versions that reference this block in the Casandra table, so that we can do GC by checking in Cassandra that the lines still exist -- list of other nodes that we know have acknowledged a write of this block, usefull in the rebalancing algorithm - -Write strategy: have a single thread that does all write IO so that it is serialized (or have several threads that manage independent parts of the hash space). When writing a blob, write it to a temporary file, close, then rename so that a concurrent read gets a consistent result (either not found or found with whole content). - -Read strategy: the only read operation is get(hash) that returns either the data or not found (can do a corruption check as well and return corrupted state if it is the case). Can be done concurrently with writes. - -**Internal API:** - -- get(block hash) -> ok+data/not found/corrupted -- put(block hash & data, version uuid + offset) -> ok/error -- put with no data(block hash, version uuid + offset) -> ok/not found plz send data/error -- delete(block hash, version uuid + offset) -> ok/error - -GC: when last ref is deleted, delete block. -Long GC procedure: check in Cassandra that version UUIDs still exist and references this block. - -Rebalancing: takes as argument the list of newly added nodes. - -- List all blocks that we have. For each block: -- If it hits a newly introduced node, send it to them. - Use put with no data first to check if it has to be sent to them already or not. - Use a random listing order to avoid race conditions (they do no harm but we might have two nodes sending the same thing at the same time thus wasting time). -- If it doesn't hit us anymore, delete it and its reference list. - -Only one balancing can be running at a same time. It can be restarted at the beginning with new parameters. - -#### Membership management - -Two sets of nodes: - -- set of nodes from which a ping was recently received, with status: number of stored blocks, request counters, error counters, GC%, rebalancing% - (eviction from this set after say 30 seconds without ping) -- set of nodes that are part of the system, explicitly modified by the operator using the web UI (persisted to disk), - is a CRDT using a version number for the value of the whole set - -Thus, three states for nodes: - -- healthy: in both sets -- missing: not pingable but part of desired cluster -- unused/draining: currently present but not part of the desired cluster, empty = if contains nothing, draining = if still contains some blocks - -Membership messages between nodes: - -- ping with current state + hash of current membership info -> reply with same info -- send&get back membership info (the ids of nodes that are in the two sets): used when no local membership change in a long time and membership info hash discrepancy detected with first message (passive membership fixing with full CRDT gossip) -- inform of newly pingable node(s) -> no result, when receive new info repeat to all (reliable broadcast) -- inform of operator membership change -> no result, when receive new info repeat to all (reliable broadcast) - -Ring: generated from the desired set of nodes, however when doing read/writes on the ring, skip nodes that are known to be not pingable. -The tokens are generated in a deterministic fashion from node IDs (hash of node id + token number from 1 to K). -Number K of tokens per node: decided by the operator & stored in the operator's list of nodes CRDT. Default value proposal: with node status information also broadcast disk total size and free space, and propose a default number of tokens equal to 80%Free space / 10Gb. (this is all user interface) +See this presentation (in French) for some first information: + -#### Constants +## Garbage collection -- Block size: around 1MB ? --> Exoscale use 16MB chunks -- Number of tokens in the hash ring: one every 10Gb of allocated storage -- Threshold for storing data directly in Cassandra objects table: 1kb bytes (maybe up to 4kb?) -- Ping timeout (time after which a node is registered as unresponsive/missing): 30 seconds -- Ping interval: 10 seconds -- ?? +A faulty garbage collection procedure has been the cause of +[critical bug #39](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/39). +This precise bug was fixed in the code, however there are potentially more +general issues with the garbage collector being too eager and deleting things +too early. This has been the subject of +[PR #135](https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/135). +This section summarizes the discussions on this topic. -#### Links +Rationale: we want to ensure Garage's safety by making sure things don't get +deleted from disk if they are still needed. Two aspects are involved in this. + +### 1. Garbage collection of table entries (in `meta/` directory) + +The `Entry` trait used for table entries (defined in `tables/schema.rs`) +defines a function `is_tombstone()` that returns `true` if that entry +represents an entry that is deleted in the table. CRDT semantics by default +keep all tombstones, because they are necessary for reconciliation: if node A +has a tombstone that supersedes a value `x`, and node B has value `x`, A has to +keep the tombstone in memory so that the value `x` can be properly deleted at +node `B`. Otherwise, due to the CRDT reconciliation rule, the value `x` from B +would flow back to A and a deleted item would reappear in the system. + +Here, we have some control on the nodes involved in storing Garage data. +Therefore we have a garbage collector that is able to delete tombstones UNDER +CERTAIN CONDITIONS. This garbage collector is implemented in `table/gc.rs`. To +delete a tombstone, the following condition has to be met: + +- All nodes responsible for storing this entry are aware of the existence of + the tombstone, i.e. they cannot hold another version of the entry that is + superseeded by the tombstone. This ensures that deleting the tombstone is + safe and that no deleted value will come back in the system. + +Garage makes use of Sled's atomic operations (such as compare-and-swap and +transactions) to ensure that only tombstones that have been correctly +propagated to other nodes are ever deleted from the local entry tree. + +This GC is safe in the following sense: no non-tombstone data is ever deleted +from Garage tables. + +**However**, there is an issue with the way this interacts with data +rebalancing in the case when a partition is moving between nodes. If a node has +some data of a partition for which it is not responsible, it has to offload it. +However that offload process takes some time. In that interval, the GC does not +check with that node if it has the tombstone before deleting the tombstone, so +perhaps it doesn't have it and when the offload finally happens, old data comes +back in the system. + +**PR 135 mostly fixes this** by implementing a 24-hour delay before anything is +garbage collected in a table. This works under the assumption that rebalances +that follow data shuffling terminate in less than 24 hours. + +**However**, in distributed systems, it is generally considered a bad practice +to make assumptions that information propagates in a certain time interval: +this consists in making a synchrony assumption, meaning that we are basically +assuming a computing model that has much stronger properties than otherwise. To +maximize the applicability of Garage, we would like to remove this assumption, +and implement a system where time does not play a role. To do this, we would +need to find a way to safely disable the GC when data is being shuffled around, +and safely detect that the shuffling has terminated and thus the GC can be +resumed. This introduces some complexity to the protocol and hasn't been +tackled yet. + +### 2. Garbage collection of data blocks (in `data/` directory) + +Blocks in the data directory are reference-counted. In Garage versions before +PR #135, blocks could get deleted from local disk as soon as their reference +counter reached zero. We had a mechanism to not trigger this immediately at the +rc-reaches-zero event, but the cleanup could be triggered by other means (for +example by a block repair operation...). PR #135 added a safety measure so that +blocks never get deleted in a 10 minute interval following the time when the RC +reaches zero. This is a measure to make impossible race conditions such as #39. +We would have liked to use a larger delay (e.g. 24 hours), but in the case of a +rebalance of data, this would have led to the disk utilization to explode +during the rebalancing, only to shrink again after 24 hours. The 10-minute +delay is a compromise that gives good security while not having this problem of +disk space explosion on rebalance. -- CDC: -- Erasure coding: -- [Openstack Storage Concepts](https://docs.openstack.org/arch-design/design-storage/design-storage-concepts.html) -- [RADOS](https://ceph.com/wp-content/uploads/2016/08/weil-rados-pdsw07.pdf) -- 2.40.1