Improvements to garbage collector (fix #39) #135

Merged
lx merged 4 commits from cleanups into main 2021-11-09 13:03:35 +00:00
8 changed files with 621 additions and 251 deletions

View file

@ -29,6 +29,7 @@
- [Design](./design/index.md) - [Design](./design/index.md)
- [Related Work](./design/related_work.md) - [Related Work](./design/related_work.md)
- [Internals](./design/internals.md) - [Internals](./design/internals.md)
- [Design draft](./design/design_draft.md)
- [Development](./development/index.md) - [Development](./development/index.md)
- [Setup your environment](./development/devenv.md) - [Setup your environment](./development/devenv.md)

View file

@ -0,0 +1,162 @@
# Design draft
**WARNING: this documentation is a design draft which was written before Garage's actual implementation.
The general principle are similar, but details have not been updated.**
#### Modules
- `membership/`: configuration, membership management (gossip of node's presence and status), ring generation --> what about Serf (used by Consul/Nomad) : https://www.serf.io/? Seems a huge library with many features so maybe overkill/hard to integrate
- `metadata/`: metadata management
- `blocks/`: block management, writing, GC and rebalancing
- `internal/`: server to server communication (HTTP server and client that reuses connections, TLS if we want, etc)
- `api/`: S3 API
- `web/`: web management interface
#### Metadata tables
**Objects:**
- *Hash key:* Bucket name (string)
- *Sort key:* Object key (string)
- *Sort key:* Version timestamp (int)
- *Sort key:* Version UUID (string)
- Complete: bool
- Inline: bool, true for objects < threshold (say 1024)
- Object size (int)
- Mime type (string)
- Data for inlined objects (blob)
- Hash of first block otherwise (string)
*Having only a hash key on the bucket name will lead to storing all file entries of this table for a specific bucket on a single node. At the same time, it is the only way I see to rapidly being able to list all bucket entries...*
**Blocks:**
- *Hash key:* Version UUID (string)
- *Sort key:* Offset of block in total file (int)
- Hash of data block (string)
A version is defined by the existence of at least one entry in the blocks table for a certain version UUID.
We must keep the following invariant: if a version exists in the blocks table, it has to be referenced in the objects table.
We explicitly manage concurrent versions of an object: the version timestamp and version UUID columns are index columns, thus we may have several concurrent versions of an object.
Important: before deleting an older version from the objects table, we must make sure that we did a successfull delete of the blocks of that version from the blocks table.
Thus, the workflow for reading an object is as follows:
1. Check permissions (LDAP)
2. Read entry in object table. If data is inline, we have its data, stop here.
-> if several versions, take newest one and launch deletion of old ones in background
3. Read first block from cluster. If size <= 1 block, stop here.
4. Simultaneously with previous step, if size > 1 block: query the Blocks table for the IDs of the next blocks
5. Read subsequent blocks from cluster
Workflow for PUT:
1. Check write permission (LDAP)
2. Select a new version UUID
3. Write a preliminary entry for the new version in the objects table with complete = false
4. Send blocks to cluster and write entries in the blocks table
5. Update the version with complete = true and all of the accurate information (size, etc)
6. Return success to the user
7. Launch a background job to check and delete older versions
Workflow for DELETE:
1. Check write permission (LDAP)
2. Get current version (or versions) in object table
3. Do the deletion of those versions NOT IN A BACKGROUND JOB THIS TIME
4. Return succes to the user if we were able to delete blocks from the blocks table and entries from the object table
To delete a version:
1. List the blocks from Cassandra
2. For each block, delete it from cluster. Don't care if some deletions fail, we can do GC.
3. Delete all of the blocks from the blocks table
4. Finally, delete the version from the objects table
Known issue: if someone is reading from a version that we want to delete and the object is big, the read might be interrupted. I think it is ok to leave it like this, we just cut the connection if data disappears during a read.
("Soit P un problème, on s'en fout est une solution à ce problème")
#### Block storage on disk
**Blocks themselves:**
- file path = /blobs/(first 3 hex digits of hash)/(rest of hash)
**Reverse index for GC & other block-level metadata:**
- file path = /meta/(first 3 hex digits of hash)/(rest of hash)
- map block hash -> set of version UUIDs where it is referenced
Usefull metadata:
- list of versions that reference this block in the Casandra table, so that we can do GC by checking in Cassandra that the lines still exist
- list of other nodes that we know have acknowledged a write of this block, usefull in the rebalancing algorithm
Write strategy: have a single thread that does all write IO so that it is serialized (or have several threads that manage independent parts of the hash space). When writing a blob, write it to a temporary file, close, then rename so that a concurrent read gets a consistent result (either not found or found with whole content).
Read strategy: the only read operation is get(hash) that returns either the data or not found (can do a corruption check as well and return corrupted state if it is the case). Can be done concurrently with writes.
**Internal API:**
- get(block hash) -> ok+data/not found/corrupted
- put(block hash & data, version uuid + offset) -> ok/error
- put with no data(block hash, version uuid + offset) -> ok/not found plz send data/error
- delete(block hash, version uuid + offset) -> ok/error
GC: when last ref is deleted, delete block.
Long GC procedure: check in Cassandra that version UUIDs still exist and references this block.
Rebalancing: takes as argument the list of newly added nodes.
- List all blocks that we have. For each block:
- If it hits a newly introduced node, send it to them.
Use put with no data first to check if it has to be sent to them already or not.
Use a random listing order to avoid race conditions (they do no harm but we might have two nodes sending the same thing at the same time thus wasting time).
- If it doesn't hit us anymore, delete it and its reference list.
Only one balancing can be running at a same time. It can be restarted at the beginning with new parameters.
#### Membership management
Two sets of nodes:
- set of nodes from which a ping was recently received, with status: number of stored blocks, request counters, error counters, GC%, rebalancing%
(eviction from this set after say 30 seconds without ping)
- set of nodes that are part of the system, explicitly modified by the operator using the web UI (persisted to disk),
is a CRDT using a version number for the value of the whole set
Thus, three states for nodes:
- healthy: in both sets
- missing: not pingable but part of desired cluster
- unused/draining: currently present but not part of the desired cluster, empty = if contains nothing, draining = if still contains some blocks
Membership messages between nodes:
- ping with current state + hash of current membership info -> reply with same info
- send&get back membership info (the ids of nodes that are in the two sets): used when no local membership change in a long time and membership info hash discrepancy detected with first message (passive membership fixing with full CRDT gossip)
- inform of newly pingable node(s) -> no result, when receive new info repeat to all (reliable broadcast)
- inform of operator membership change -> no result, when receive new info repeat to all (reliable broadcast)
Ring: generated from the desired set of nodes, however when doing read/writes on the ring, skip nodes that are known to be not pingable.
The tokens are generated in a deterministic fashion from node IDs (hash of node id + token number from 1 to K).
Number K of tokens per node: decided by the operator & stored in the operator's list of nodes CRDT. Default value proposal: with node status information also broadcast disk total size and free space, and propose a default number of tokens equal to 80%Free space / 10Gb. (this is all user interface)
#### Constants
- Block size: around 1MB ? --> Exoscale use 16MB chunks
- Number of tokens in the hash ring: one every 10Gb of allocated storage
- Threshold for storing data directly in Cassandra objects table: 1kb bytes (maybe up to 4kb?)
- Ping timeout (time after which a node is registered as unresponsive/missing): 30 seconds
- Ping interval: 10 seconds
- ??
#### Links
- CDC: <https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf>
- Erasure coding: <http://web.eecs.utk.edu/~jplank/plank/papers/CS-08-627.html>
- [Openstack Storage Concepts](https://docs.openstack.org/arch-design/design-storage/design-storage-concepts.html)
- [RADOS](https://ceph.com/wp-content/uploads/2016/08/weil-rados-pdsw07.pdf)

View file

@ -1,158 +1,95 @@
**WARNING: this documentation is more a "design draft", which was written before Garage's actual implementation. The general principle is similar but details have not yet been updated.** # Internals
#### Modules ## Overview
- `membership/`: configuration, membership management (gossip of node's presence and status), ring generation --> what about Serf (used by Consul/Nomad) : https://www.serf.io/? Seems a huge library with many features so maybe overkill/hard to integrate TODO: write this section
- `metadata/`: metadata management
- `blocks/`: block management, writing, GC and rebalancing
- `internal/`: server to server communication (HTTP server and client that reuses connections, TLS if we want, etc)
- `api/`: S3 API
- `web/`: web management interface
#### Metadata tables - The Dynamo ring
**Objects:** - CRDTs
- *Hash key:* Bucket name (string) - Consistency model of Garage tables
- *Sort key:* Object key (string)
- *Sort key:* Version timestamp (int)
- *Sort key:* Version UUID (string)
- Complete: bool
- Inline: bool, true for objects < threshold (say 1024)
- Object size (int)
- Mime type (string)
- Data for inlined objects (blob)
- Hash of first block otherwise (string)
*Having only a hash key on the bucket name will lead to storing all file entries of this table for a specific bucket on a single node. At the same time, it is the only way I see to rapidly being able to list all bucket entries...* See this presentation (in French) for some first information:
<https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/main/doc/talks/2020-12-02_wide-team/talk.pdf>
**Blocks:**
- *Hash key:* Version UUID (string)
- *Sort key:* Offset of block in total file (int)
- Hash of data block (string)
A version is defined by the existence of at least one entry in the blocks table for a certain version UUID.
We must keep the following invariant: if a version exists in the blocks table, it has to be referenced in the objects table.
We explicitly manage concurrent versions of an object: the version timestamp and version UUID columns are index columns, thus we may have several concurrent versions of an object.
Important: before deleting an older version from the objects table, we must make sure that we did a successfull delete of the blocks of that version from the blocks table.
Thus, the workflow for reading an object is as follows:
1. Check permissions (LDAP)
2. Read entry in object table. If data is inline, we have its data, stop here.
-> if several versions, take newest one and launch deletion of old ones in background
3. Read first block from cluster. If size <= 1 block, stop here.
4. Simultaneously with previous step, if size > 1 block: query the Blocks table for the IDs of the next blocks
5. Read subsequent blocks from cluster
Workflow for PUT:
1. Check write permission (LDAP)
2. Select a new version UUID
3. Write a preliminary entry for the new version in the objects table with complete = false
4. Send blocks to cluster and write entries in the blocks table
5. Update the version with complete = true and all of the accurate information (size, etc)
6. Return success to the user
7. Launch a background job to check and delete older versions
Workflow for DELETE:
1. Check write permission (LDAP)
2. Get current version (or versions) in object table
3. Do the deletion of those versions NOT IN A BACKGROUND JOB THIS TIME
4. Return succes to the user if we were able to delete blocks from the blocks table and entries from the object table
To delete a version:
1. List the blocks from Cassandra
2. For each block, delete it from cluster. Don't care if some deletions fail, we can do GC.
3. Delete all of the blocks from the blocks table
4. Finally, delete the version from the objects table
Known issue: if someone is reading from a version that we want to delete and the object is big, the read might be interrupted. I think it is ok to leave it like this, we just cut the connection if data disappears during a read.
("Soit P un problème, on s'en fout est une solution à ce problème")
#### Block storage on disk
**Blocks themselves:**
- file path = /blobs/(first 3 hex digits of hash)/(rest of hash)
**Reverse index for GC & other block-level metadata:**
- file path = /meta/(first 3 hex digits of hash)/(rest of hash)
- map block hash -> set of version UUIDs where it is referenced
Usefull metadata:
- list of versions that reference this block in the Casandra table, so that we can do GC by checking in Cassandra that the lines still exist
- list of other nodes that we know have acknowledged a write of this block, usefull in the rebalancing algorithm
Write strategy: have a single thread that does all write IO so that it is serialized (or have several threads that manage independent parts of the hash space). When writing a blob, write it to a temporary file, close, then rename so that a concurrent read gets a consistent result (either not found or found with whole content).
Read strategy: the only read operation is get(hash) that returns either the data or not found (can do a corruption check as well and return corrupted state if it is the case). Can be done concurrently with writes.
**Internal API:**
- get(block hash) -> ok+data/not found/corrupted
- put(block hash & data, version uuid + offset) -> ok/error
- put with no data(block hash, version uuid + offset) -> ok/not found plz send data/error
- delete(block hash, version uuid + offset) -> ok/error
GC: when last ref is deleted, delete block.
Long GC procedure: check in Cassandra that version UUIDs still exist and references this block.
Rebalancing: takes as argument the list of newly added nodes.
- List all blocks that we have. For each block:
- If it hits a newly introduced node, send it to them.
Use put with no data first to check if it has to be sent to them already or not.
Use a random listing order to avoid race conditions (they do no harm but we might have two nodes sending the same thing at the same time thus wasting time).
- If it doesn't hit us anymore, delete it and its reference list.
Only one balancing can be running at a same time. It can be restarted at the beginning with new parameters.
#### Membership management
Two sets of nodes:
- set of nodes from which a ping was recently received, with status: number of stored blocks, request counters, error counters, GC%, rebalancing%
(eviction from this set after say 30 seconds without ping)
- set of nodes that are part of the system, explicitly modified by the operator using the web UI (persisted to disk),
is a CRDT using a version number for the value of the whole set
Thus, three states for nodes:
- healthy: in both sets
- missing: not pingable but part of desired cluster
- unused/draining: currently present but not part of the desired cluster, empty = if contains nothing, draining = if still contains some blocks
Membership messages between nodes:
- ping with current state + hash of current membership info -> reply with same info
- send&get back membership info (the ids of nodes that are in the two sets): used when no local membership change in a long time and membership info hash discrepancy detected with first message (passive membership fixing with full CRDT gossip)
- inform of newly pingable node(s) -> no result, when receive new info repeat to all (reliable broadcast)
- inform of operator membership change -> no result, when receive new info repeat to all (reliable broadcast)
Ring: generated from the desired set of nodes, however when doing read/writes on the ring, skip nodes that are known to be not pingable.
The tokens are generated in a deterministic fashion from node IDs (hash of node id + token number from 1 to K).
Number K of tokens per node: decided by the operator & stored in the operator's list of nodes CRDT. Default value proposal: with node status information also broadcast disk total size and free space, and propose a default number of tokens equal to 80%Free space / 10Gb. (this is all user interface)
#### Constants ## Garbage collection
- Block size: around 1MB ? --> Exoscale use 16MB chunks A faulty garbage collection procedure has been the cause of
- Number of tokens in the hash ring: one every 10Gb of allocated storage [critical bug #39](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/39).
- Threshold for storing data directly in Cassandra objects table: 1kb bytes (maybe up to 4kb?) This precise bug was fixed in the code, however there are potentially more
- Ping timeout (time after which a node is registered as unresponsive/missing): 30 seconds general issues with the garbage collector being too eager and deleting things
- Ping interval: 10 seconds too early. This has been the subject of
- ?? [PR #135](https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/135).
This section summarizes the discussions on this topic.
#### Links Rationale: we want to ensure Garage's safety by making sure things don't get
deleted from disk if they are still needed. Two aspects are involved in this.
### 1. Garbage collection of table entries (in `meta/` directory)
The `Entry` trait used for table entries (defined in `tables/schema.rs`)
defines a function `is_tombstone()` that returns `true` if that entry
represents an entry that is deleted in the table. CRDT semantics by default
keep all tombstones, because they are necessary for reconciliation: if node A
has a tombstone that supersedes a value `x`, and node B has value `x`, A has to
keep the tombstone in memory so that the value `x` can be properly deleted at
node `B`. Otherwise, due to the CRDT reconciliation rule, the value `x` from B
would flow back to A and a deleted item would reappear in the system.
Here, we have some control on the nodes involved in storing Garage data.
Therefore we have a garbage collector that is able to delete tombstones UNDER
CERTAIN CONDITIONS. This garbage collector is implemented in `table/gc.rs`. To
delete a tombstone, the following condition has to be met:
- All nodes responsible for storing this entry are aware of the existence of
the tombstone, i.e. they cannot hold another version of the entry that is
superseeded by the tombstone. This ensures that deleting the tombstone is
safe and that no deleted value will come back in the system.
Garage makes use of Sled's atomic operations (such as compare-and-swap and
transactions) to ensure that only tombstones that have been correctly
propagated to other nodes are ever deleted from the local entry tree.
This GC is safe in the following sense: no non-tombstone data is ever deleted
from Garage tables.
**However**, there is an issue with the way this interacts with data
rebalancing in the case when a partition is moving between nodes. If a node has
some data of a partition for which it is not responsible, it has to offload it.
However that offload process takes some time. In that interval, the GC does not
check with that node if it has the tombstone before deleting the tombstone, so
perhaps it doesn't have it and when the offload finally happens, old data comes
back in the system.
**PR 135 mostly fixes this** by implementing a 24-hour delay before anything is
garbage collected in a table. This works under the assumption that rebalances
that follow data shuffling terminate in less than 24 hours.
**However**, in distributed systems, it is generally considered a bad practice
to make assumptions that information propagates in a certain time interval:
this consists in making a synchrony assumption, meaning that we are basically
assuming a computing model that has much stronger properties than otherwise. To
maximize the applicability of Garage, we would like to remove this assumption,
and implement a system where time does not play a role. To do this, we would
need to find a way to safely disable the GC when data is being shuffled around,
and safely detect that the shuffling has terminated and thus the GC can be
resumed. This introduces some complexity to the protocol and hasn't been
tackled yet.
### 2. Garbage collection of data blocks (in `data/` directory)
Blocks in the data directory are reference-counted. In Garage versions before
PR #135, blocks could get deleted from local disk as soon as their reference
counter reached zero. We had a mechanism to not trigger this immediately at the
rc-reaches-zero event, but the cleanup could be triggered by other means (for
example by a block repair operation...). PR #135 added a safety measure so that
blocks never get deleted in a 10 minute interval following the time when the RC
reaches zero. This is a measure to make impossible race conditions such as #39.
We would have liked to use a larger delay (e.g. 24 hours), but in the case of a
rebalance of data, this would have led to the disk utilization to explode
during the rebalancing, only to shrink again after 24 hours. The 10-minute
delay is a compromise that gives good security while not having this problem of
disk space explosion on rebalance.
- CDC: <https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf>
- Erasure coding: <http://web.eecs.utk.edu/~jplank/plank/papers/CS-08-627.html>
- [Openstack Storage Concepts](https://docs.openstack.org/arch-design/design-storage/design-storage-concepts.html)
- [RADOS](https://ceph.com/wp-content/uploads/2016/08/weil-rados-pdsw07.pdf)

View file

@ -446,7 +446,7 @@ impl AdminRpcHandler {
if opt.detailed { if opt.detailed {
writeln!( writeln!(
&mut ret, &mut ret,
" number of blocks: {}", " number of RC entries (~= number of blocks): {}",
self.garage.block_manager.rc_len() self.garage.block_manager.rc_len()
) )
.unwrap(); .unwrap();

View file

@ -8,8 +8,7 @@ pub enum Command {
#[structopt(name = "server")] #[structopt(name = "server")]
Server, Server,
/// Print identifier (public key) of this garage node. /// Print identifier (public key) of this Garage node
/// Generates a new keypair if necessary.
#[structopt(name = "node-id")] #[structopt(name = "node-id")]
NodeId(NodeIdOpt), NodeId(NodeIdOpt),

View file

@ -1,3 +1,4 @@
use std::convert::TryInto;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -31,10 +32,20 @@ pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1; pub const BACKGROUND_WORKERS: u64 = 1;
pub const BACKGROUND_TRANQUILITY: u32 = 3; pub const BACKGROUND_TRANQUILITY: u32 = 3;
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); // Timeout for RPCs that read and write blocks to remote nodes
const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
// Timeout for RPCs that ask other nodes whether they need a copy
// of a given block before we delete it locally
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
// The delay between the time where a resync operation fails
// and the time when it is retried.
const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
// The delay between the moment when the reference counter
// drops to zero, and the moment where we allow ourselves
// to delete the block locally.
const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
/// RPC messages used to share blocks of data between nodes /// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -180,7 +191,7 @@ impl BlockManager {
/// that are required because of refcount > 0, and will try /// that are required because of refcount > 0, and will try
/// to fix any mismatch between the two. /// to fix any mismatch between the two.
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// 1. Repair blocks from RC table // 1. Repair blocks from RC table.
let garage = self.garage.load_full().unwrap(); let garage = self.garage.load_full().unwrap();
let mut last_hash = None; let mut last_hash = None;
for (i, entry) in garage.block_ref_table.data.store.iter().enumerate() { for (i, entry) in garage.block_ref_table.data.store.iter().enumerate() {
@ -245,40 +256,51 @@ impl BlockManager {
/// Increment the number of time a block is used, putting it to resynchronization if it is /// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known /// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
let old_rc = self.rc.fetch_and_update(&hash, |old| { let old_rc = self
let old_v = old.map(u64_from_be_bytes).unwrap_or(0); .rc
Some(u64::to_be_bytes(old_v + 1).to_vec()) .fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?;
})?; let old_rc = RcEntry::parse_opt(old_rc);
let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0); if old_rc.is_zero() {
if old_rc == 0 { // When the reference counter is incremented, there is
self.put_to_resync(hash, BLOCK_RW_TIMEOUT)?; // normally a node that is responsible for sending us the
// data of the block. However that operation may fail,
// so in all cases we add the block here to the todo list
// to check later that it arrived correctly, and if not
// we will fecth it from someone.
self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?;
Review

why are we using 2 times the timeout?

why are we using 2 times the timeout?
Review

Because 1/ it's not a big issue if we do it too late, 2/ it makes sure that if there was a block put operation, it had enough time to complete/fail even if it took slightly longer than the timeout delay for some reason (clocks are imprecise)

Because 1/ it's not a big issue if we do it too late, 2/ it makes sure that if there was a block put operation, it had enough time to complete/fail even if it took slightly longer than the timeout delay for some reason (clocks are imprecise)
} }
Ok(()) Ok(())
} }
/// Decrement the number of time a block is used /// Decrement the number of time a block is used
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.update_and_fetch(&hash, |old| { let new_rc = self
let old_v = old.map(u64_from_be_bytes).unwrap_or(0); .rc
if old_v > 1 { .update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?;
Some(u64::to_be_bytes(old_v - 1).to_vec()) let new_rc = RcEntry::parse_opt(new_rc);
} else { if let RcEntry::Deletable { .. } = new_rc {
None self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?;
Review

Why are we adding an hard coded 10 seconds?

Why are we adding an hard coded 10 seconds?
Review

Same reason, because clocks are imprecise and it's not a big issue if we do it too late

Same reason, because clocks are imprecise and it's not a big issue if we do it too late
}
})?;
if new_rc.is_none() {
self.put_to_resync(hash, BLOCK_GC_TIMEOUT)?;
} }
Ok(()) Ok(())
} }
/// Read a block's reference count /// Read a block's reference count
pub fn get_block_rc(&self, hash: &Hash) -> Result<u64, Error> { fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> {
Ok(self Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?))
.rc }
.get(hash.as_ref())?
.map(u64_from_be_bytes) /// Delete an entry in the RC table if it is deletable and the
.unwrap_or(0)) /// deletion time has passed
fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
self.rc.update_and_fetch(&hash, |rcval| {
let updated = match RcEntry::parse_opt(rcval) {
RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent,
v => v,
};
updated.serialize()
})?;
Ok(())
} }
// ---- Reading and writing blocks locally ---- // ---- Reading and writing blocks locally ----
@ -300,7 +322,7 @@ impl BlockManager {
Ok(f) => f, Ok(f) => f,
Err(e) => { Err(e) => {
// Not found but maybe we should have had it ?? // Not found but maybe we should have had it ??
self.put_to_resync(hash, Duration::from_millis(0))?; self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?;
Review

2 times here also.

2 times here also.
return Err(Into::into(e)); return Err(Into::into(e));
} }
}; };
@ -314,6 +336,7 @@ impl BlockManager {
.await .await
.move_block_to_corrupted(hash, self) .move_block_to_corrupted(hash, self)
.await?; .await?;
self.put_to_resync(hash, Duration::from_millis(0))?;
return Err(Error::CorruptData(*hash)); return Err(Error::CorruptData(*hash));
} }
@ -328,7 +351,7 @@ impl BlockManager {
.await .await
.check_block_status(hash, self) .check_block_status(hash, self)
.await?; .await?;
Ok(needed && !exists) Ok(needed.is_nonzero() && !exists)
} }
/// Utility: gives the path of the directory in which a block should be found /// Utility: gives the path of the directory in which a block should be found
@ -349,7 +372,7 @@ impl BlockManager {
// ---- Resync loop ---- // ---- Resync loop ----
pub fn spawn_background_worker(self: Arc<Self>) { pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing // Launch n simultaneous workers for background resync loop preprocessing
for i in 0..BACKGROUND_WORKERS { for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone(); let bm2 = self.clone();
let background = self.system.background.clone(); let background = self.system.background.clone();
@ -400,14 +423,14 @@ impl BlockManager {
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> { async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
let time_msec = u64_from_be_bytes(&time_bytes[0..8]); let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec(); let now = now_msec();
if now >= time_msec { if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap(); let hash = Hash::try_from(&hash_bytes[..]).unwrap();
let res = self.resync_block(&hash).await; let res = self.resync_block(&hash).await;
if let Err(e) = &res { if let Err(e) = &res {
warn!("Error when resyncing {:?}: {}", hash, e); warn!("Error when resyncing {:?}: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?;
} }
Ok(true) Ok(true)
} else { } else {
@ -437,15 +460,18 @@ impl BlockManager {
.check_block_status(hash, self) .check_block_status(hash, self)
.await?; .await?;
if exists != needed { if exists != needed.is_needed() || exists != needed.is_nonzero() {
info!( debug!(
"Resync block {:?}: exists {}, needed {}", "Resync block {:?}: exists {}, nonzero rc {}, deletable {}",
hash, exists, needed hash,
exists,
needed.is_nonzero(),
needed.is_deletable(),
); );
} }
if exists && !needed { if exists && needed.is_deletable() {
trace!("Offloading block {:?}", hash); info!("Resync block {:?}: offloading and deleting", hash);
let mut who = self.replication.write_nodes(hash); let mut who = self.replication.write_nodes(hash);
if who.len() < self.replication.write_quorum() { if who.len() < self.replication.write_quorum() {
@ -488,7 +514,7 @@ impl BlockManager {
need_nodes.len() need_nodes.len()
); );
let put_block_message = self.read_block(hash).await.err_context("PutBlock RPC")?; let put_block_message = self.read_block(hash).await?;
self.system self.system
.rpc .rpc
.try_call_many( .try_call_many(
@ -499,10 +525,11 @@ impl BlockManager {
.with_quorum(need_nodes.len()) .with_quorum(need_nodes.len())
.with_timeout(BLOCK_RW_TIMEOUT), .with_timeout(BLOCK_RW_TIMEOUT),
) )
.await?; .await
.err_context("PutBlock RPC")?;
} }
info!( info!(
"Deleting block {:?}, offload finished ({} / {})", "Deleting unneeded block {:?}, offload finished ({} / {})",
hash, hash,
need_nodes.len(), need_nodes.len(),
who.len() who.len()
@ -513,12 +540,16 @@ impl BlockManager {
.await .await
.delete_if_unneeded(hash, self) .delete_if_unneeded(hash, self)
.await?; .await?;
self.clear_deleted_block_rc(hash)?;
} }
if needed && !exists { if needed.is_nonzero() && !exists {
// TODO find a way to not do this if they are sending it to us info!(
// Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay "Resync block {:?}: fetching absent but needed block (refcount > 0)",
// between the RC being incremented and this part being called. hash
);
let block_data = self.rpc_get_block(hash).await?; let block_data = self.rpc_get_block(hash).await?;
self.write_block(hash, &block_data[..]).await?; self.write_block(hash, &block_data[..]).await?;
} }
@ -526,6 +557,8 @@ impl BlockManager {
Ok(()) Ok(())
} }
// ---- Utility: iteration on files in the data directory ----
async fn for_each_file<F, Fut, State>( async fn for_each_file<F, Fut, State>(
&self, &self,
state: State, state: State,
@ -608,7 +641,7 @@ impl EndpointHandler<BlockRpc> for BlockManager {
struct BlockStatus { struct BlockStatus {
exists: bool, exists: bool,
needed: bool, needed: RcEntry,
} }
impl BlockManagerLocked { impl BlockManagerLocked {
@ -620,7 +653,7 @@ impl BlockManagerLocked {
let path = mgr.block_path(hash); let path = mgr.block_path(hash);
let exists = fs::metadata(&path).await.is_ok(); let exists = fs::metadata(&path).await.is_ok();
let needed = mgr.get_block_rc(hash)? > 0; let needed = mgr.get_block_rc(hash)?;
Ok(BlockStatus { exists, needed }) Ok(BlockStatus { exists, needed })
} }
@ -659,14 +692,13 @@ impl BlockManagerLocked {
let mut path2 = path.clone(); let mut path2 = path.clone();
path2.set_extension("corrupted"); path2.set_extension("corrupted");
fs::rename(path, path2).await?; fs::rename(path, path2).await?;
mgr.put_to_resync(hash, Duration::from_millis(0))?;
Ok(()) Ok(())
} }
async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?;
if exists && !needed { if exists && needed.is_deletable() {
let path = mgr.block_path(hash); let path = mgr.block_path(hash);
fs::remove_file(path).await?; fs::remove_file(path).await?;
} }
@ -674,9 +706,103 @@ impl BlockManagerLocked {
} }
} }
fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 { /// Describes the state of the reference counter for a block
assert!(bytes.as_ref().len() == 8); #[derive(Clone, Copy, Debug)]
let mut x8 = [0u8; 8]; enum RcEntry {
Review

Just me being nitpicking but we created this enum because we want a bit more than a Reference Counter, something like a Reference Counter with Delay. We could rename it to RcDelayEntry or RcDelay. But feel free to keep this name too if you are more comfortable with the current name :)

Just me being nitpicking but we created this enum because we want a bit more than a Reference Counter, something like a Reference Counter with Delay. We could rename it to `RcDelayEntry` or `RcDelay`. But feel free to keep this name too if you are more comfortable with the current name :)
x8.copy_from_slice(bytes.as_ref()); /// Present: the block has `count` references, with `count` > 0.
u64::from_be_bytes(x8) ///
/// This is stored as u64::to_be_bytes(count)
Present { count: u64 },
/// Deletable: the block has zero references, and can be deleted
/// once time (returned by now_msec) is larger than at_time
/// (in millis since Unix epoch)
///
/// This is stored as [0u8; 8] followed by u64::to_be_bytes(at_time),
/// (this allows for the data format to be backwards compatible with
/// previous Garage versions that didn't have this intermediate state)
Deletable { at_time: u64 },
Review

What is our policy for backward compatibility: is there a risk that we make our code worse over time to ease some backward compatibility?
In this specific case, it does not seem very critical to me but I am curious about our code strategy / technical debt strategy :)

What is our policy for backward compatibility: is there a risk that we make our code worse over time to ease some backward compatibility? In this specific case, it does not seem very critical to me but I am curious about our code strategy / technical debt strategy :)
Review

I don't see this format as technical debt. It's a good format with a simple custom serialization that happens to be compatible with the old format.

It's nice here that we don't have to break compatibility, but if we had to do it we would have done it. (we did break compatibility for the table GC)

I think until release 1.0 we don't need to think too much about backward compatiblity. v0.4 removes at least one code path for data format migration, and we generally don't want to have those for now.

I don't see this format as technical debt. It's a good format with a simple custom serialization that happens to be compatible with the old format. It's nice here that we don't have to break compatibility, but if we had to do it we would have done it. (we did break compatibility for the table GC) I think until release 1.0 we don't need to think too much about backward compatiblity. v0.4 removes at least one code path for data format migration, and we generally don't want to have those for now.
/// Absent: the block has zero references, and can be deleted
/// immediately
Absent,
}
impl RcEntry {
fn parse(bytes: &[u8]) -> Self {
if bytes.len() == 8 {
RcEntry::Present {
count: u64::from_be_bytes(bytes.try_into().unwrap()),
}
} else if bytes.len() == 16 {
RcEntry::Deletable {
at_time: u64::from_be_bytes(bytes[8..16].try_into().unwrap()),
}
} else {
panic!("Invalid RC entry: {:?}, database is corrupted. This is an error Garage is currently unable to recover from. Sorry, and also please report a bug.",
bytes
)
}
}
fn parse_opt<V: AsRef<[u8]>>(bytes: Option<V>) -> Self {
bytes
.map(|b| Self::parse(b.as_ref()))
.unwrap_or(Self::Absent)
}
fn serialize(self) -> Option<Vec<u8>> {
match self {
RcEntry::Present { count } => Some(u64::to_be_bytes(count).to_vec()),
RcEntry::Deletable { at_time } => {
Some([u64::to_be_bytes(0), u64::to_be_bytes(at_time)].concat())
}
RcEntry::Absent => None,
}
}
fn increment(self) -> Self {
let old_count = match self {
RcEntry::Present { count } => count,
_ => 0,
};
RcEntry::Present {
count: old_count + 1,
}
}
fn decrement(self) -> Self {
match self {
RcEntry::Present { count } => {
if count > 1 {
RcEntry::Present { count: count - 1 }
} else {
RcEntry::Deletable {
at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64,
}
}
}
del => del,
}
}
fn is_zero(&self) -> bool {
matches!(self, RcEntry::Deletable { .. } | RcEntry::Absent)
}
fn is_nonzero(&self) -> bool {
!self.is_zero()
}
fn is_deletable(&self) -> bool {
match self {
RcEntry::Present { .. } => false,
RcEntry::Deletable { at_time } => now_msec() > *at_time,
RcEntry::Absent => true,
}
}
fn is_needed(&self) -> bool {
!self.is_deletable()
}
} }

View file

@ -12,6 +12,7 @@ use garage_util::error::*;
use garage_rpc::system::System; use garage_rpc::system::System;
use crate::crdt::Crdt; use crate::crdt::Crdt;
use crate::gc::GcTodoEntry;
use crate::replication::*; use crate::replication::*;
use crate::schema::*; use crate::schema::*;
@ -54,7 +55,7 @@ where
.expect("Unable to open DB Merkle TODO tree"); .expect("Unable to open DB Merkle TODO tree");
let gc_todo = db let gc_todo = db
.open_tree(&format!("{}:gc_todo", name)) .open_tree(&format!("{}:gc_todo_v2", name))
Review

Here, do we create a new db/table/key format because the previous one had a different format that is now incompatible? Does it mean that we will have a "dead" gc_todo table on already deployed nodes?
I do not see that as a blocking problem either but it is good to know.

Here, do we create a new db/table/key format because the previous one had a different format that is now incompatible? Does it mean that we will have a "dead" gc_todo table on already deployed nodes? I do not see that as a blocking problem either but it is good to know.
Review

Yes it's a dead table. If the migration is executed properly, i.e. Garage is read-only for enough time for the GC in the old version to process everything, gc_todo should be empty.

Yes it's a dead table. If the migration is executed properly, i.e. Garage is read-only for enough time for the GC in the old version to process everything, `gc_todo` should be empty.
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
Arc::new(Self { Arc::new(Self {
@ -176,7 +177,7 @@ where
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
let nodes = self.replication.write_nodes(&pk_hash); let nodes = self.replication.write_nodes(&pk_hash);
if nodes.first() == Some(&self.system.id) { if nodes.first() == Some(&self.system.id) {
self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
} }
} }
} }

View file

@ -1,4 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -12,7 +13,8 @@ use futures_util::future::*;
use tokio::sync::watch; use tokio::sync::watch;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::*;
use garage_util::time::*;
use garage_rpc::system::System; use garage_rpc::system::System;
use garage_rpc::*; use garage_rpc::*;
@ -24,7 +26,12 @@ use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> { // GC delay for table entries: 1 day (24 hours)
// (the delay before the entry is added in the GC todo list
// and the moment the garbage collection actually happens)
const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600);
pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
system: Arc<System>, system: Arc<System>,
data: Arc<TableData<F, R>>, data: Arc<TableData<F, R>>,
@ -72,63 +79,100 @@ where
async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() { while !*must_exit.borrow() {
match self.gc_loop_iter().await { match self.gc_loop_iter().await {
Ok(true) => { Ok(None) => {
// Stuff was done, loop immediately // Stuff was done, loop immediately
continue;
} }
Ok(false) => { Ok(Some(wait_delay)) => {
// Nothing was done, sleep for some time (below) // Nothing was done, wait specified delay.
select! {
_ = tokio::time::sleep(wait_delay).fuse() => {},
_ = must_exit.changed().fuse() => {},
}
} }
Err(e) => { Err(e) => {
warn!("({}) Error doing GC: {}", self.data.name, e); warn!("({}) Error doing GC: {}", self.data.name, e);
} }
} }
select! {
_ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {},
_ = must_exit.changed().fuse() => {},
}
} }
} }
async fn gc_loop_iter(&self) -> Result<bool, Error> { async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
let mut entries = vec![]; let mut entries = vec![];
let mut excluded = vec![]; let mut excluded = vec![];
for item in self.data.gc_todo.iter() { // List entries in the GC todo list
let (k, vhash) = item?; // These entries are put there when a tombstone is inserted in the table
// (see update_entry in data.rs)
for entry_kv in self.data.gc_todo.iter() {
let (k, vhash) = entry_kv?;
let mut todo_entry = GcTodoEntry::parse(&k, &vhash);
if todo_entry.deletion_time() > now {
if entries.is_empty() && excluded.is_empty() {
// If the earliest entry in the todo list shouldn't yet be processed,
// return a duration to wait in the loop
return Ok(Some(Duration::from_millis(
todo_entry.deletion_time() - now,
)));
} else {
// Otherwise we have some entries to process, do a normal iteration.
break;
}
}
let vhash = Hash::try_from(&vhash[..]).unwrap(); let vhash = Hash::try_from(&vhash[..]).unwrap();
let v_opt = self // Check if the tombstone is still the current value of the entry.
Review

Can you confirm that one reason to invalid a tombstone is a case where I upload a file, detele it, and then reupload it?

Can you confirm that one reason to invalid a tombstone is a case where I upload a file, detele it, and then reupload it?
Review

Yes! For the object tables, an entry can take the following sequence of values:

  1. an old version (not a tombstone)
  2. a deletion marker (a tombstone which we want to GC if possible)
  3. a new version (not a tombstone)
Yes! For the object tables, an entry can take the following sequence of values: 1. an old version (not a tombstone) 2. a deletion marker (a tombstone which we want to GC if possible) 3. a new version (not a tombstone)
// If not, we don't actually want to GC it, and we will remove it
// from the gc_todo table later (below).
todo_entry.value = self
.data .data
.store .store
.get(&k[..])? .get(&k[..])?
.filter(|v| blake2sum(&v[..]) == vhash); .filter(|v| blake2sum(&v[..]) == vhash)
.map(|v| v.to_vec());
if let Some(v) = v_opt { if todo_entry.value.is_some() {
entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec()))); entries.push(todo_entry);
if entries.len() >= TABLE_GC_BATCH_SIZE { if entries.len() >= TABLE_GC_BATCH_SIZE {
break; break;
} }
} else { } else {
excluded.push((k, vhash)); excluded.push(todo_entry);
} }
} }
for (k, vhash) in excluded { // Remove from gc_todo entries for tombstones where we have
self.todo_remove_if_equal(&k[..], vhash)?; // detected that the current value has changed and
// is no longer a tombstone.
for entry in excluded {
entry.remove_if_equal(&self.data.gc_todo)?;
} }
// Remaining in `entries` is the list of entries we want to GC,
// and for which they are still currently tombstones in the table.
if entries.is_empty() { if entries.is_empty() {
// Nothing to do in this iteration // Nothing to do in this iteration (no entries present)
return Ok(false); // Wait for a default delay of 60 seconds
return Ok(Some(Duration::from_secs(60)));
} }
debug!("({}) GC: doing {} items", self.data.name, entries.len()); debug!("({}) GC: doing {} items", self.data.name, entries.len());
// Split entries to GC by the set of nodes on which they are stored.
// Here we call them partitions but they are not exactly
// the same as partitions as defined in the ring: those partitions
// are defined by the first 8 bits of the hash, but two of these
// partitions can be stored on the same set of nodes.
// Here we detect when entries are stored on the same set of nodes:
// even if they are not in the same 8-bit partition, we can still
// handle them together.
let mut partitions = HashMap::new(); let mut partitions = HashMap::new();
for (k, vhash, v) in entries { for entry in entries {
let pkh = Hash::try_from(&k[..32]).unwrap(); let pkh = Hash::try_from(&entry.key[..32]).unwrap();
let mut nodes = self.data.replication.write_nodes(&pkh); let mut nodes = self.data.replication.write_nodes(&pkh);
nodes.retain(|x| *x != self.system.id); nodes.retain(|x| *x != self.system.id);
nodes.sort(); nodes.sort();
@ -136,9 +180,12 @@ where
if !partitions.contains_key(&nodes) { if !partitions.contains_key(&nodes) {
partitions.insert(nodes.clone(), vec![]); partitions.insert(nodes.clone(), vec![]);
} }
partitions.get_mut(&nodes).unwrap().push((k, vhash, v)); partitions.get_mut(&nodes).unwrap().push(entry);
} }
// For each set of nodes that contains some items,
// ensure they are aware of the tombstone status, and once they
// are, instruct them to delete the entries.
let resps = join_all( let resps = join_all(
partitions partitions
.into_iter() .into_iter()
@ -146,6 +193,8 @@ where
) )
.await; .await;
// Collect errors and return a single error value even if several
// errors occurred.
let mut errs = vec![]; let mut errs = vec![];
for resp in resps { for resp in resps {
if let Err(e) = resp { if let Err(e) = resp {
@ -154,7 +203,7 @@ where
} }
if errs.is_empty() { if errs.is_empty() {
Ok(true) Ok(None)
} else { } else {
Err(Error::Message( Err(Error::Message(
errs.into_iter() errs.into_iter()
@ -162,23 +211,42 @@ where
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(", "), .join(", "),
)) ))
.err_context("in try_send_and_delete in table GC:")
} }
} }
async fn try_send_and_delete( async fn try_send_and_delete(
&self, &self,
nodes: Vec<Uuid>, nodes: Vec<Uuid>,
items: Vec<(ByteBuf, Hash, ByteBuf)>, mut items: Vec<GcTodoEntry>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let n_items = items.len(); let n_items = items.len();
// Strategy: we first send all of the values to the remote nodes,
Review

Does it mean that deleting a tombstone is a particularly expensive operation? Could we create a deny of service simply by deleting lot of data in a row? or deleting lot of data in a row while a node is down?

But it might be totally ok as it seems we batch our requests, can you confirm?

Edit: I just understood that nodes does not map to all nodes of the cluster but to all nodes of the same "partition"

Does it mean that deleting a tombstone is a particularly expensive operation? Could we create a deny of service simply by deleting lot of data in a row? or deleting lot of data in a row while a node is down? But it might be totally ok as it seems we batch our requests, can you confirm? Edit: I just understood that `nodes` does not map to all nodes of the cluster but to all nodes of the same "partition"
Review

It's not that expensive (tombstones are small and can be batched), and it's the cost of a correct algorithm :)

At least we are not doing a full Raft or Paxos

It's not that expensive (tombstones are small and can be batched), and it's the cost of a correct algorithm :) At least we are not doing a full Raft or Paxos
// to ensure that they are aware of the tombstone state,
// and that the previous state was correctly overwritten
// (if they have a newer state that overrides the tombstone, that's fine).
// Second, once everyone is at least at the tombstone state,
// we instruct everyone to delete the tombstone IF that is still their current state.
// If they are now at a different state, it means that that state overrides the
// tombstone in the CRDT lattice, and it will be propagated back to us at some point
// (either just a regular update that hasn't reached us yet, or later when the
// table is synced).
// Here, we store in updates all of the tombstones to send for step 1,
// and in deletes the list of keys and hashes of value for step 2.
let mut updates = vec![]; let mut updates = vec![];
let mut deletes = vec![]; let mut deletes = vec![];
for (k, vhash, v) in items { for item in items.iter_mut() {
updates.push(v); updates.push(ByteBuf::from(item.value.take().unwrap()));
deletes.push((k, vhash)); deletes.push((ByteBuf::from(item.key.clone()), item.value_hash));
} }
// Step 1: ensure everyone is at least at tombstone in CRDT lattice
// Here the quorum is nodes.len(): we cannot tolerate even a single failure,
// otherwise old values before the tombstone might come back in the data.
// GC'ing is not a critical function of the system, so it's not a big
// deal if we can't do it right now.
self.system self.system
.rpc .rpc
.try_call_many( .try_call_many(
@ -189,40 +257,43 @@ where
.with_quorum(nodes.len()) .with_quorum(nodes.len())
.with_timeout(TABLE_GC_RPC_TIMEOUT), .with_timeout(TABLE_GC_RPC_TIMEOUT),
) )
.await?; .await
.err_context("GC: send tombstones")?;
info!( info!(
"({}) GC: {} items successfully pushed, will try to delete.", "({}) GC: {} items successfully pushed, will try to delete.",
self.data.name, n_items self.data.name, n_items
); );
// Step 2: delete tombstones everywhere.
// Here we also fail if even a single node returns a failure:
// it means that the garbage collection wasn't completed and has
// to be retried later.
self.system self.system
.rpc .rpc
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
&nodes[..], &nodes[..],
GcRpc::DeleteIfEqualHash(deletes.clone()), GcRpc::DeleteIfEqualHash(deletes),
RequestStrategy::with_priority(PRIO_BACKGROUND) RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_quorum(nodes.len()) .with_quorum(nodes.len())
.with_timeout(TABLE_GC_RPC_TIMEOUT), .with_timeout(TABLE_GC_RPC_TIMEOUT),
) )
.await?; .await
.err_context("GC: remote delete tombstones")?;
for (k, vhash) in deletes { // GC has been successfull for all of these entries.
self.data.delete_if_equal_hash(&k[..], vhash)?; // We now remove them all from our local table and from the GC todo list.
self.todo_remove_if_equal(&k[..], vhash)?; for item in items {
self.data
.delete_if_equal_hash(&item.key[..], item.value_hash)
.err_context("GC: local delete tombstones")?;
item.remove_if_equal(&self.data.gc_todo)
.err_context("GC: remove from todo list after successfull GC")?;
} }
Ok(()) Ok(())
} }
fn todo_remove_if_equal(&self, key: &[u8], vhash: Hash) -> Result<(), Error> {
let _ = self
.data
.gc_todo
.compare_and_swap::<_, _, Vec<u8>>(key, Some(vhash), None)?;
Ok(())
}
} }
#[async_trait] #[async_trait]
@ -240,7 +311,6 @@ where
GcRpc::DeleteIfEqualHash(items) => { GcRpc::DeleteIfEqualHash(items) => {
for (key, vhash) in items.iter() { for (key, vhash) in items.iter() {
self.data.delete_if_equal_hash(&key[..], *vhash)?; self.data.delete_if_equal_hash(&key[..], *vhash)?;
self.todo_remove_if_equal(&key[..], *vhash)?;
} }
Ok(GcRpc::Ok) Ok(GcRpc::Ok)
} }
@ -248,3 +318,77 @@ where
} }
} }
} }
/// An entry stored in the gc_todo Sled tree associated with the table
/// Contains helper function for parsing, saving, and removing
/// such entry in Sled
///
/// Format of an entry:
/// - key = 8 bytes: timestamp of tombstone
Review

Here also we have some byte manipulations too, this is even the purpose of this object. Are we doing them because of a Sled limitation (it needs a primitive type as a key) or for backward compatibility reasons? (we were storing a byte array before so we keep it)? Are you not affraid that it could introduce bugs with time as we are loosing some checks done by the compiler?

Here also we have some byte manipulations too, this is even the purpose of this object. Are we doing them because of a Sled limitation (it needs a primitive type as a key) or for backward compatibility reasons? (we were storing a byte array before so we keep it)? Are you not affraid that it could introduce bugs with time as we are loosing some checks done by the compiler?
Review

We are limited by Sled that basically acts as a BTreeMap<Vec<u8>, Vec<u8>>: we have to manage stuff as byte arrays for both keys and values. It's true we have to be extra carefull with this: that's the reason why I extracted the serialization and parsing logic in a separate struct. This would typically be a good place for unit testing if complexity gets out of hand (for now I don't think it has).

We are limited by Sled that basically acts as a `BTreeMap<Vec<u8>, Vec<u8>>`: we have to manage stuff as byte arrays for both keys and values. It's true we have to be extra carefull with this: that's the reason why I extracted the serialization and parsing logic in a separate struct. This would typically be a good place for unit testing if complexity gets out of hand (for now I don't think it has).
/// (used to implement GC delay)
/// n bytes: key in the main data table
/// - value = hash of the table entry to delete (the tombstone)
/// for verification purpose, because we don't want to delete
/// things that aren't tombstones
pub(crate) struct GcTodoEntry {
tombstone_timestamp: u64,
key: Vec<u8>,
value_hash: Hash,
value: Option<Vec<u8>>,
}
impl GcTodoEntry {
/// Creates a new GcTodoEntry (not saved in Sled) from its components:
/// the key of an entry in the table, and the hash of the associated
/// serialized value
pub(crate) fn new(key: Vec<u8>, value_hash: Hash) -> Self {
Self {
tombstone_timestamp: now_msec(),
key,
value_hash,
value: None,
}
}
/// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree
pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self {
Self {
tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()),
key: sled_k[8..].to_vec(),
value_hash: Hash::try_from(sled_v).unwrap(),
value: None,
}
}
/// Saves the GcTodoEntry in the gc_todo tree
pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
/// Removes the GcTodoEntry from the gc_todo tree if the
/// hash of the serialized value is the same here as in the tree.
/// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
&self.todo_table_key()[..],
Some(self.value_hash),
None,
)?;
Ok(())
}
fn todo_table_key(&self) -> Vec<u8> {
[
&u64::to_be_bytes(self.tombstone_timestamp)[..],
&self.key[..],
]
.concat()
}
fn deletion_time(&self) -> u64 {
self.tombstone_timestamp + TABLE_GC_DELAY.as_millis() as u64
}
}