Move block RC code to separate rc.rs
This commit is contained in:
parent
c3982a90b6
commit
8fd6745745
3 changed files with 175 additions and 136 deletions
|
@ -5,3 +5,4 @@ pub mod manager;
|
|||
|
||||
mod block;
|
||||
mod metrics;
|
||||
mod rc;
|
||||
|
|
|
@ -31,6 +31,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
|
|||
|
||||
use crate::metrics::*;
|
||||
use crate::block::*;
|
||||
use crate::rc::*;
|
||||
|
||||
/// Size under which data will be stored inlined in database instead of as files
|
||||
pub const INLINE_THRESHOLD: usize = 3072;
|
||||
|
@ -51,7 +52,7 @@ const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
|
|||
// The delay between the moment when the reference counter
|
||||
// drops to zero, and the moment where we allow ourselves
|
||||
// to delete the block locally.
|
||||
const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
|
||||
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
|
||||
|
||||
/// RPC messages used to share blocks of data between nodes
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
@ -86,7 +87,7 @@ pub struct BlockManager {
|
|||
|
||||
mutation_lock: Mutex<BlockManagerLocked>,
|
||||
|
||||
rc: sled::Tree,
|
||||
pub rc: BlockRc,
|
||||
|
||||
resync_queue: SledCountedTree,
|
||||
resync_notify: Notify,
|
||||
|
@ -114,6 +115,7 @@ impl BlockManager {
|
|||
let rc = db
|
||||
.open_tree("block_local_rc")
|
||||
.expect("Unable to open block_local_rc tree");
|
||||
let rc = BlockRc::new(rc);
|
||||
|
||||
let resync_queue = db
|
||||
.open_tree("block_local_resync_queue")
|
||||
|
@ -213,7 +215,7 @@ impl BlockManager {
|
|||
/// to fix any mismatch between the two.
|
||||
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
|
||||
// 1. Repair blocks from RC table.
|
||||
for (i, entry) in self.rc.iter().enumerate() {
|
||||
for (i, entry) in self.rc.rc.iter().enumerate() {
|
||||
let (hash, _) = entry?;
|
||||
let hash = Hash::try_from(&hash[..]).unwrap();
|
||||
self.put_to_resync(&hash, Duration::from_secs(0))?;
|
||||
|
@ -261,7 +263,7 @@ impl BlockManager {
|
|||
|
||||
/// Get number of items in the refcount table
|
||||
pub fn rc_len(&self) -> usize {
|
||||
self.rc.len()
|
||||
self.rc.rc.len()
|
||||
}
|
||||
|
||||
//// ----- Managing the reference counter ----
|
||||
|
@ -269,11 +271,7 @@ impl BlockManager {
|
|||
/// Increment the number of time a block is used, putting it to resynchronization if it is
|
||||
/// required, but not known
|
||||
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
|
||||
let old_rc = self
|
||||
.rc
|
||||
.fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?;
|
||||
let old_rc = RcEntry::parse_opt(old_rc);
|
||||
if old_rc.is_zero() {
|
||||
if self.rc.block_incref(hash)? {
|
||||
// When the reference counter is incremented, there is
|
||||
// normally a node that is responsible for sending us the
|
||||
// data of the block. However that operation may fail,
|
||||
|
@ -287,35 +285,17 @@ impl BlockManager {
|
|||
|
||||
/// Decrement the number of time a block is used
|
||||
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
|
||||
let new_rc = self
|
||||
.rc
|
||||
.update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?;
|
||||
let new_rc = RcEntry::parse_opt(new_rc);
|
||||
if let RcEntry::Deletable { .. } = new_rc {
|
||||
if self.rc.block_decref(hash)? {
|
||||
// When the RC is decremented, it might drop to zero,
|
||||
// indicating that we don't need the block.
|
||||
// There is a delay before we garbage collect it;
|
||||
// make sure that it is handled in the resync loop
|
||||
// after that delay has passed.
|
||||
self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read a block's reference count
|
||||
fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> {
|
||||
Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?))
|
||||
}
|
||||
|
||||
/// Delete an entry in the RC table if it is deletable and the
|
||||
/// deletion time has passed
|
||||
fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
|
||||
let now = now_msec();
|
||||
self.rc.update_and_fetch(&hash, |rcval| {
|
||||
let updated = match RcEntry::parse_opt(rcval) {
|
||||
RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent,
|
||||
v => v,
|
||||
};
|
||||
updated.serialize()
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---- Reading and writing blocks locally ----
|
||||
|
||||
/// Write a block to disk
|
||||
|
@ -659,7 +639,7 @@ impl BlockManager {
|
|||
.delete_if_unneeded(hash, self)
|
||||
.await?;
|
||||
|
||||
self.clear_deleted_block_rc(hash)?;
|
||||
self.rc.clear_deleted_block_rc(hash)?;
|
||||
}
|
||||
|
||||
if needed.is_nonzero() && !exists {
|
||||
|
@ -773,7 +753,7 @@ impl BlockManagerLocked {
|
|||
mgr: &BlockManager,
|
||||
) -> Result<BlockStatus, Error> {
|
||||
let exists = mgr.is_block_compressed(hash).await.is_ok();
|
||||
let needed = mgr.get_block_rc(hash)?;
|
||||
let needed = mgr.rc.get_block_rc(hash)?;
|
||||
|
||||
Ok(BlockStatus { exists, needed })
|
||||
}
|
||||
|
@ -869,107 +849,6 @@ impl BlockManagerLocked {
|
|||
}
|
||||
}
|
||||
|
||||
/// Describes the state of the reference counter for a block
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
enum RcEntry {
|
||||
/// Present: the block has `count` references, with `count` > 0.
|
||||
///
|
||||
/// This is stored as u64::to_be_bytes(count)
|
||||
Present { count: u64 },
|
||||
|
||||
/// Deletable: the block has zero references, and can be deleted
|
||||
/// once time (returned by now_msec) is larger than at_time
|
||||
/// (in millis since Unix epoch)
|
||||
///
|
||||
/// This is stored as [0u8; 8] followed by u64::to_be_bytes(at_time),
|
||||
/// (this allows for the data format to be backwards compatible with
|
||||
/// previous Garage versions that didn't have this intermediate state)
|
||||
Deletable { at_time: u64 },
|
||||
|
||||
/// Absent: the block has zero references, and can be deleted
|
||||
/// immediately
|
||||
Absent,
|
||||
}
|
||||
|
||||
impl RcEntry {
|
||||
fn parse(bytes: &[u8]) -> Self {
|
||||
if bytes.len() == 8 {
|
||||
RcEntry::Present {
|
||||
count: u64::from_be_bytes(bytes.try_into().unwrap()),
|
||||
}
|
||||
} else if bytes.len() == 16 {
|
||||
RcEntry::Deletable {
|
||||
at_time: u64::from_be_bytes(bytes[8..16].try_into().unwrap()),
|
||||
}
|
||||
} else {
|
||||
panic!("Invalid RC entry: {:?}, database is corrupted. This is an error Garage is currently unable to recover from. Sorry, and also please report a bug.",
|
||||
bytes
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_opt<V: AsRef<[u8]>>(bytes: Option<V>) -> Self {
|
||||
bytes
|
||||
.map(|b| Self::parse(b.as_ref()))
|
||||
.unwrap_or(Self::Absent)
|
||||
}
|
||||
|
||||
fn serialize(self) -> Option<Vec<u8>> {
|
||||
match self {
|
||||
RcEntry::Present { count } => Some(u64::to_be_bytes(count).to_vec()),
|
||||
RcEntry::Deletable { at_time } => {
|
||||
Some([u64::to_be_bytes(0), u64::to_be_bytes(at_time)].concat())
|
||||
}
|
||||
RcEntry::Absent => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn increment(self) -> Self {
|
||||
let old_count = match self {
|
||||
RcEntry::Present { count } => count,
|
||||
_ => 0,
|
||||
};
|
||||
RcEntry::Present {
|
||||
count: old_count + 1,
|
||||
}
|
||||
}
|
||||
|
||||
fn decrement(self) -> Self {
|
||||
match self {
|
||||
RcEntry::Present { count } => {
|
||||
if count > 1 {
|
||||
RcEntry::Present { count: count - 1 }
|
||||
} else {
|
||||
RcEntry::Deletable {
|
||||
at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64,
|
||||
}
|
||||
}
|
||||
}
|
||||
del => del,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_zero(&self) -> bool {
|
||||
matches!(self, RcEntry::Deletable { .. } | RcEntry::Absent)
|
||||
}
|
||||
|
||||
fn is_nonzero(&self) -> bool {
|
||||
!self.is_zero()
|
||||
}
|
||||
|
||||
fn is_deletable(&self) -> bool {
|
||||
match self {
|
||||
RcEntry::Present { .. } => false,
|
||||
RcEntry::Deletable { at_time } => now_msec() > *at_time,
|
||||
RcEntry::Absent => true,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_needed(&self) -> bool {
|
||||
!self.is_deletable()
|
||||
}
|
||||
}
|
||||
|
||||
/// Counts the number of errors when resyncing a block,
|
||||
/// and the time of the last try.
|
||||
/// Used to implement exponential backoff.
|
||||
|
|
159
src/block/rc.rs
Normal file
159
src/block/rc.rs
Normal file
|
@ -0,0 +1,159 @@
|
|||
use std::convert::TryInto;
|
||||
|
||||
use garage_util::error::*;
|
||||
use garage_util::data::*;
|
||||
use garage_util::time::*;
|
||||
|
||||
use crate::manager::BLOCK_GC_DELAY;
|
||||
|
||||
pub struct BlockRc {
|
||||
pub(crate) rc: sled::Tree,
|
||||
}
|
||||
|
||||
impl BlockRc {
|
||||
pub(crate) fn new(rc: sled::Tree) -> Self {
|
||||
Self {
|
||||
rc
|
||||
}
|
||||
}
|
||||
|
||||
/// Increment the reference counter associated to a hash.
|
||||
/// Returns true if the RC goes from zero to nonzero.
|
||||
pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> {
|
||||
let old_rc = self
|
||||
.rc
|
||||
.fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?;
|
||||
let old_rc = RcEntry::parse_opt(old_rc);
|
||||
Ok(old_rc.is_zero())
|
||||
}
|
||||
|
||||
/// Decrement the reference counter associated to a hash.
|
||||
/// Returns true if the RC is now zero.
|
||||
pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> {
|
||||
let new_rc = self
|
||||
.rc
|
||||
.update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?;
|
||||
let new_rc = RcEntry::parse_opt(new_rc);
|
||||
Ok(matches!(new_rc, RcEntry::Deletable {..}))
|
||||
}
|
||||
|
||||
/// Read a block's reference count
|
||||
pub(crate) fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> {
|
||||
Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?))
|
||||
}
|
||||
|
||||
/// Delete an entry in the RC table if it is deletable and the
|
||||
/// deletion time has passed
|
||||
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
|
||||
let now = now_msec();
|
||||
self.rc.update_and_fetch(&hash, |rcval| {
|
||||
let updated = match RcEntry::parse_opt(rcval) {
|
||||
RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent,
|
||||
v => v,
|
||||
};
|
||||
updated.serialize()
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Describes the state of the reference counter for a block
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub(crate) enum RcEntry {
|
||||
/// Present: the block has `count` references, with `count` > 0.
|
||||
///
|
||||
/// This is stored as u64::to_be_bytes(count)
|
||||
Present { count: u64 },
|
||||
|
||||
/// Deletable: the block has zero references, and can be deleted
|
||||
/// once time (returned by now_msec) is larger than at_time
|
||||
/// (in millis since Unix epoch)
|
||||
///
|
||||
/// This is stored as [0u8; 8] followed by u64::to_be_bytes(at_time),
|
||||
/// (this allows for the data format to be backwards compatible with
|
||||
/// previous Garage versions that didn't have this intermediate state)
|
||||
Deletable { at_time: u64 },
|
||||
|
||||
/// Absent: the block has zero references, and can be deleted
|
||||
/// immediately
|
||||
Absent,
|
||||
}
|
||||
|
||||
impl RcEntry {
|
||||
fn parse(bytes: &[u8]) -> Self {
|
||||
if bytes.len() == 8 {
|
||||
RcEntry::Present {
|
||||
count: u64::from_be_bytes(bytes.try_into().unwrap()),
|
||||
}
|
||||
} else if bytes.len() == 16 {
|
||||
RcEntry::Deletable {
|
||||
at_time: u64::from_be_bytes(bytes[8..16].try_into().unwrap()),
|
||||
}
|
||||
} else {
|
||||
panic!("Invalid RC entry: {:?}, database is corrupted. This is an error Garage is currently unable to recover from. Sorry, and also please report a bug.",
|
||||
bytes
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_opt<V: AsRef<[u8]>>(bytes: Option<V>) -> Self {
|
||||
bytes
|
||||
.map(|b| Self::parse(b.as_ref()))
|
||||
.unwrap_or(Self::Absent)
|
||||
}
|
||||
|
||||
fn serialize(self) -> Option<Vec<u8>> {
|
||||
match self {
|
||||
RcEntry::Present { count } => Some(u64::to_be_bytes(count).to_vec()),
|
||||
RcEntry::Deletable { at_time } => {
|
||||
Some([u64::to_be_bytes(0), u64::to_be_bytes(at_time)].concat())
|
||||
}
|
||||
RcEntry::Absent => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn increment(self) -> Self {
|
||||
let old_count = match self {
|
||||
RcEntry::Present { count } => count,
|
||||
_ => 0,
|
||||
};
|
||||
RcEntry::Present {
|
||||
count: old_count + 1,
|
||||
}
|
||||
}
|
||||
|
||||
fn decrement(self) -> Self {
|
||||
match self {
|
||||
RcEntry::Present { count } => {
|
||||
if count > 1 {
|
||||
RcEntry::Present { count: count - 1 }
|
||||
} else {
|
||||
RcEntry::Deletable {
|
||||
at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64,
|
||||
}
|
||||
}
|
||||
}
|
||||
del => del,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_zero(&self) -> bool {
|
||||
matches!(self, RcEntry::Deletable { .. } | RcEntry::Absent)
|
||||
}
|
||||
|
||||
pub(crate) fn is_nonzero(&self) -> bool {
|
||||
!self.is_zero()
|
||||
}
|
||||
|
||||
pub(crate) fn is_deletable(&self) -> bool {
|
||||
match self {
|
||||
RcEntry::Present { .. } => false,
|
||||
RcEntry::Deletable { at_time } => now_msec() > *at_time,
|
||||
RcEntry::Absent => true,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_needed(&self) -> bool {
|
||||
!self.is_deletable()
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue