2021-11-03 21:07:43 +00:00
use std ::convert ::TryInto ;
2021-04-23 19:57:32 +00:00
use std ::path ::{ Path , PathBuf } ;
2020-04-17 13:36:16 +00:00
use std ::sync ::Arc ;
2022-02-22 13:52:41 +00:00
use std ::time ::Duration ;
2020-04-09 21:45:07 +00:00
2020-04-17 16:51:29 +00:00
use arc_swap ::ArcSwapOption ;
2021-10-14 09:50:12 +00:00
use async_trait ::async_trait ;
2022-02-17 22:28:23 +00:00
use serde ::{ Deserialize , Serialize } ;
use zstd ::stream ::{ decode_all as zstd_decode , Encoder } ;
2020-04-17 17:16:08 +00:00
use futures ::future ::* ;
2020-04-22 20:32:58 +00:00
use futures ::select ;
2020-04-09 21:45:07 +00:00
use tokio ::fs ;
2021-03-15 21:36:41 +00:00
use tokio ::io ::{ AsyncReadExt , AsyncWriteExt } ;
2020-04-22 20:32:58 +00:00
use tokio ::sync ::{ watch , Mutex , Notify } ;
2022-02-17 22:28:23 +00:00
2022-02-18 19:39:55 +00:00
use opentelemetry ::{
trace ::{ FutureExt as OtelFutureExt , TraceContextExt , Tracer } ,
Context , KeyValue ,
} ;
2020-04-09 21:45:07 +00:00
2020-04-24 10:10:01 +00:00
use garage_util ::data ::* ;
2021-10-26 17:13:41 +00:00
use garage_util ::error ::* ;
2022-02-22 13:52:41 +00:00
use garage_util ::metrics ::RecordDuration ;
2022-02-24 13:59:49 +00:00
use garage_util ::sled_counter ::SledCountedTree ;
2021-03-15 15:21:41 +00:00
use garage_util ::time ::* ;
2021-11-03 17:28:43 +00:00
use garage_util ::tranquilizer ::Tranquilizer ;
2020-04-18 17:39:08 +00:00
2021-10-14 09:50:12 +00:00
use garage_rpc ::system ::System ;
use garage_rpc ::* ;
2020-04-23 17:05:46 +00:00
2021-03-26 18:41:46 +00:00
use garage_table ::replication ::{ TableReplication , TableShardedReplication } ;
2020-04-23 17:05:46 +00:00
2022-02-16 13:23:04 +00:00
use crate ::block_metrics ::* ;
2020-04-24 10:10:01 +00:00
use crate ::block_ref_table ::* ;
use crate ::garage ::Garage ;
2020-04-11 21:00:26 +00:00
2021-03-26 20:53:28 +00:00
/// Size under which data will be stored inlined in database instead of as files
2020-04-18 17:30:05 +00:00
pub const INLINE_THRESHOLD : usize = 3072 ;
2022-02-25 19:42:56 +00:00
pub const BACKGROUND_TRANQUILITY : u32 = 2 ;
2021-03-15 18:51:16 +00:00
2021-10-28 12:32:55 +00:00
// Timeout for RPCs that read and write blocks to remote nodes
const BLOCK_RW_TIMEOUT : Duration = Duration ::from_secs ( 30 ) ;
// Timeout for RPCs that ask other nodes whether they need a copy
// of a given block before we delete it locally
2020-04-17 17:16:08 +00:00
const NEED_BLOCK_QUERY_TIMEOUT : Duration = Duration ::from_secs ( 5 ) ;
2021-10-28 12:32:55 +00:00
// The delay between the time where a resync operation fails
2022-02-25 19:42:56 +00:00
// and the time when it is retried, with exponential backoff
// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
2021-10-28 12:32:55 +00:00
const RESYNC_RETRY_DELAY : Duration = Duration ::from_secs ( 60 ) ;
// The delay between the moment when the reference counter
// drops to zero, and the moment where we allow ourselves
// to delete the block locally.
const BLOCK_GC_DELAY : Duration = Duration ::from_secs ( 600 ) ;
2020-04-17 17:16:08 +00:00
2021-03-26 20:53:28 +00:00
/// RPC messages used to share blocks of data between nodes
2020-04-18 17:21:34 +00:00
#[ derive(Debug, Serialize, Deserialize) ]
2021-10-14 09:50:12 +00:00
pub enum BlockRpc {
2020-04-18 17:21:34 +00:00
Ok ,
2021-03-26 20:53:28 +00:00
/// Message to ask for a block of data, by hash
2020-04-18 17:21:34 +00:00
GetBlock ( Hash ) ,
2021-03-26 20:53:28 +00:00
/// Message to send a block of data, either because requested, of for first delivery of new
/// block
2021-12-15 10:26:43 +00:00
PutBlock {
hash : Hash ,
data : DataBlock ,
} ,
2021-03-26 20:53:28 +00:00
/// Ask other node if they should have this block, but don't actually have it
2020-04-18 17:21:34 +00:00
NeedBlockQuery ( Hash ) ,
2021-03-26 20:53:28 +00:00
/// Response : whether the node do require that block
2020-04-18 17:21:34 +00:00
NeedBlockReply ( bool ) ,
}
2021-12-15 10:26:43 +00:00
/// A possibly compressed block of data
2020-04-18 17:30:05 +00:00
#[ derive(Debug, Serialize, Deserialize) ]
2021-12-15 10:26:43 +00:00
pub enum DataBlock {
/// Uncompressed data
Plain ( #[ serde(with = " serde_bytes " ) ] Vec < u8 > ) ,
/// Data compressed with zstd
Compressed ( #[ serde(with = " serde_bytes " ) ] Vec < u8 > ) ,
}
impl DataBlock {
/// Query whether this block is compressed
pub fn is_compressed ( & self ) -> bool {
matches! ( self , DataBlock ::Compressed ( _ ) )
}
/// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
/// instead
pub fn inner_buffer ( & self ) -> & [ u8 ] {
use DataBlock ::* ;
let ( Plain ( ref res ) | Compressed ( ref res ) ) = self ;
res
}
2020-04-18 17:30:05 +00:00
2021-12-15 10:26:43 +00:00
/// Get the buffer, possibly decompressing it, and verify it's integrity.
/// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
/// is used instead.
pub fn verify_get ( self , hash : Hash ) -> Result < Vec < u8 > , Error > {
match self {
DataBlock ::Plain ( data ) = > {
if blake2sum ( & data ) = = hash {
Ok ( data )
} else {
Err ( Error ::CorruptData ( hash ) )
}
}
DataBlock ::Compressed ( data ) = > {
zstd_decode ( & data [ .. ] ) . map_err ( | _ | Error ::CorruptData ( hash ) )
}
}
}
/// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
/// does not return the buffer content.
pub fn verify ( & self , hash : Hash ) -> Result < ( ) , Error > {
match self {
DataBlock ::Plain ( data ) = > {
if blake2sum ( data ) = = hash {
Ok ( ( ) )
} else {
Err ( Error ::CorruptData ( hash ) )
}
}
DataBlock ::Compressed ( data ) = > zstd ::stream ::copy_decode ( & data [ .. ] , std ::io ::sink ( ) )
. map_err ( | _ | Error ::CorruptData ( hash ) ) ,
}
}
pub fn from_buffer ( data : Vec < u8 > , level : Option < i32 > ) -> DataBlock {
if let Some ( level ) = level {
if let Ok ( data ) = zstd_encode ( & data [ .. ] , level ) {
return DataBlock ::Compressed ( data ) ;
}
}
DataBlock ::Plain ( data )
}
2020-04-18 17:30:05 +00:00
}
2021-10-15 09:05:09 +00:00
impl Rpc for BlockRpc {
type Response = Result < BlockRpc , Error > ;
2021-10-14 09:50:12 +00:00
}
2020-04-18 17:21:34 +00:00
2021-03-26 20:53:28 +00:00
/// The block manager, handling block exchange between nodes, and block storage on local node
2020-04-11 21:00:26 +00:00
pub struct BlockManager {
2021-03-26 20:53:28 +00:00
/// Replication strategy, allowing to find on which node blocks should be located
2020-04-23 17:05:46 +00:00
pub replication : TableShardedReplication ,
2021-03-26 20:53:28 +00:00
/// Directory in which block are stored
2020-04-11 21:00:26 +00:00
pub data_dir : PathBuf ,
2021-10-22 10:09:03 +00:00
mutation_lock : Mutex < BlockManagerLocked > ,
2020-04-22 20:32:58 +00:00
2021-03-15 18:51:16 +00:00
rc : sled ::Tree ,
2020-04-22 20:32:58 +00:00
2022-02-24 13:59:49 +00:00
resync_queue : SledCountedTree ,
2021-03-15 18:51:16 +00:00
resync_notify : Notify ,
2022-02-25 19:42:56 +00:00
resync_errors : SledCountedTree ,
2020-04-22 20:32:58 +00:00
2021-03-15 18:51:16 +00:00
system : Arc < System > ,
2021-10-14 09:50:12 +00:00
endpoint : Arc < Endpoint < BlockRpc , Self > > ,
2021-03-15 18:51:16 +00:00
pub ( crate ) garage : ArcSwapOption < Garage > ,
2022-02-16 13:23:04 +00:00
metrics : BlockManagerMetrics ,
2020-04-09 21:45:07 +00:00
}
2021-10-22 10:09:03 +00:00
// This custom struct contains functions that must only be ran
// when the lock is held. We ensure that it is the case by storing
// it INSIDE a Mutex.
struct BlockManagerLocked ( ) ;
2020-04-11 21:00:26 +00:00
impl BlockManager {
2020-04-18 17:21:34 +00:00
pub fn new (
db : & sled ::Db ,
data_dir : PathBuf ,
2020-04-23 17:05:46 +00:00
replication : TableShardedReplication ,
2020-04-18 17:21:34 +00:00
system : Arc < System > ,
) -> Arc < Self > {
2020-04-11 21:53:32 +00:00
let rc = db
. open_tree ( " block_local_rc " )
2020-04-11 21:00:26 +00:00
. expect ( " Unable to open block_local_rc tree " ) ;
2020-04-17 13:36:16 +00:00
let resync_queue = db
. open_tree ( " block_local_resync_queue " )
. expect ( " Unable to open block_local_resync_queue tree " ) ;
2022-02-24 13:59:49 +00:00
let resync_queue = SledCountedTree ::new ( resync_queue ) ;
2020-04-17 13:36:16 +00:00
2022-02-25 19:42:56 +00:00
let resync_errors = db
. open_tree ( " block_local_resync_errors " )
. expect ( " Unable to open block_local_resync_errors tree " ) ;
let resync_errors = SledCountedTree ::new ( resync_errors ) ;
2021-10-19 14:16:10 +00:00
let endpoint = system
. netapp
. endpoint ( " garage_model/block.rs/Rpc " . to_string ( ) ) ;
2020-04-18 17:21:34 +00:00
2021-10-22 10:09:03 +00:00
let manager_locked = BlockManagerLocked ( ) ;
2022-02-25 19:42:56 +00:00
let metrics = BlockManagerMetrics ::new ( resync_queue . clone ( ) , resync_errors . clone ( ) ) ;
2022-02-16 13:23:04 +00:00
2020-04-18 17:21:34 +00:00
let block_manager = Arc ::new ( Self {
2020-04-23 17:05:46 +00:00
replication ,
2020-04-22 20:32:58 +00:00
data_dir ,
2021-10-22 10:09:03 +00:00
mutation_lock : Mutex ::new ( manager_locked ) ,
2020-04-11 21:00:26 +00:00
rc ,
2020-04-17 13:36:16 +00:00
resync_queue ,
2020-04-22 20:32:58 +00:00
resync_notify : Notify ::new ( ) ,
2022-02-25 19:42:56 +00:00
resync_errors ,
2020-04-17 13:36:16 +00:00
system ,
2021-10-14 09:50:12 +00:00
endpoint ,
2020-04-17 15:09:57 +00:00
garage : ArcSwapOption ::from ( None ) ,
2022-02-16 13:23:04 +00:00
metrics ,
2020-04-18 17:21:34 +00:00
} ) ;
2021-10-14 09:50:12 +00:00
block_manager . endpoint . set_handler ( block_manager . clone ( ) ) ;
2020-04-23 14:40:59 +00:00
2021-10-14 09:50:12 +00:00
block_manager
2020-04-23 14:40:59 +00:00
}
2021-12-15 10:26:43 +00:00
/// Ask nodes that might have a (possibly compressed) block for it
async fn rpc_get_raw_block ( & self , hash : & Hash ) -> Result < DataBlock , Error > {
2021-10-26 08:20:05 +00:00
let who = self . replication . read_nodes ( hash ) ;
2021-10-22 10:09:03 +00:00
let resps = self
. system
. rpc
. try_call_many (
& self . endpoint ,
& who [ .. ] ,
BlockRpc ::GetBlock ( * hash ) ,
RequestStrategy ::with_priority ( PRIO_NORMAL )
. with_quorum ( 1 )
. with_timeout ( BLOCK_RW_TIMEOUT )
. interrupt_after_quorum ( true ) ,
)
. await ? ;
for resp in resps {
2021-12-15 10:26:43 +00:00
if let BlockRpc ::PutBlock { data , .. } = resp {
return Ok ( data ) ;
2021-10-22 10:09:03 +00:00
}
2020-04-17 18:58:10 +00:00
}
2021-10-22 10:09:03 +00:00
Err ( Error ::Message ( format! (
" Unable to read block {:?}: no valid blocks returned " ,
hash
) ) )
2020-04-11 21:00:26 +00:00
}
2021-12-15 10:26:43 +00:00
// ---- Public interface ----
/// Ask nodes that might have a block for it
pub async fn rpc_get_block ( & self , hash : & Hash ) -> Result < Vec < u8 > , Error > {
self . rpc_get_raw_block ( hash ) . await ? . verify_get ( * hash )
}
2021-10-22 10:09:03 +00:00
/// Send block to nodes that should have it
pub async fn rpc_put_block ( & self , hash : Hash , data : Vec < u8 > ) -> Result < ( ) , Error > {
let who = self . replication . write_nodes ( & hash ) ;
2021-12-15 10:26:43 +00:00
let compression_level = self
. garage
. load ( )
. as_ref ( )
. unwrap ( )
. config
. compression_level ;
let data = DataBlock ::from_buffer ( data , compression_level ) ;
2021-10-22 10:09:03 +00:00
self . system
. rpc
. try_call_many (
& self . endpoint ,
& who [ .. ] ,
2021-12-15 10:26:43 +00:00
BlockRpc ::PutBlock { hash , data } ,
2021-10-22 10:09:03 +00:00
RequestStrategy ::with_priority ( PRIO_NORMAL )
. with_quorum ( self . replication . write_quorum ( ) )
. with_timeout ( BLOCK_RW_TIMEOUT ) ,
)
. await ? ;
Ok ( ( ) )
}
2020-04-11 21:00:26 +00:00
2021-10-22 10:09:03 +00:00
/// Launch the repair procedure on the data store
///
/// This will list all blocks locally present, as well as those
/// that are required because of refcount > 0, and will try
/// to fix any mismatch between the two.
pub async fn repair_data_store ( & self , must_exit : & watch ::Receiver < bool > ) -> Result < ( ) , Error > {
2021-10-28 12:32:55 +00:00
// 1. Repair blocks from RC table.
2021-10-22 10:09:03 +00:00
let garage = self . garage . load_full ( ) . unwrap ( ) ;
let mut last_hash = None ;
for ( i , entry ) in garage . block_ref_table . data . store . iter ( ) . enumerate ( ) {
let ( _k , v_bytes ) = entry ? ;
let block_ref = rmp_serde ::decode ::from_read_ref ::< _ , BlockRef > ( v_bytes . as_ref ( ) ) ? ;
if Some ( & block_ref . block ) = = last_hash . as_ref ( ) {
continue ;
}
if ! block_ref . deleted . get ( ) {
last_hash = Some ( block_ref . block ) ;
self . put_to_resync ( & block_ref . block , Duration ::from_secs ( 0 ) ) ? ;
}
if i & 0xFF = = 0 & & * must_exit . borrow ( ) {
return Ok ( ( ) ) ;
}
}
2020-04-09 21:45:07 +00:00
2021-10-22 10:09:03 +00:00
// 2. Repair blocks actually on disk
2021-06-23 23:34:28 +00:00
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
self . for_each_file (
( ) ,
move | _ , hash | async move { self . put_to_resync ( & hash , Duration ::from_secs ( 0 ) ) } ,
must_exit ,
)
. await
2021-10-22 10:09:03 +00:00
}
2021-10-27 08:36:04 +00:00
/// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
/// this function.
pub async fn scrub_data_store (
& self ,
must_exit : & watch ::Receiver < bool > ,
2021-11-03 17:28:43 +00:00
tranquility : u32 ,
2021-10-27 08:36:04 +00:00
) -> Result < ( ) , Error > {
2021-11-03 17:28:43 +00:00
let tranquilizer = Tranquilizer ::new ( 30 ) ;
2021-10-27 08:36:04 +00:00
self . for_each_file (
2021-11-03 17:28:43 +00:00
tranquilizer ,
move | mut tranquilizer , hash | async move {
let _ = self . read_block ( & hash ) . await ;
tranquilizer . tranquilize ( tranquility ) . await ;
Ok ( tranquilizer )
2021-10-27 08:36:04 +00:00
} ,
must_exit ,
)
. await
}
2021-10-22 10:09:03 +00:00
/// Get lenght of resync queue
pub fn resync_queue_len ( & self ) -> usize {
self . resync_queue . len ( )
}
/// Get number of items in the refcount table
pub fn rc_len ( & self ) -> usize {
self . rc . len ( )
}
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known
pub fn block_incref ( & self , hash : & Hash ) -> Result < ( ) , Error > {
2021-10-28 12:32:55 +00:00
let old_rc = self
. rc
. fetch_and_update ( & hash , | old | RcEntry ::parse_opt ( old ) . increment ( ) . serialize ( ) ) ? ;
let old_rc = RcEntry ::parse_opt ( old_rc ) ;
if old_rc . is_zero ( ) {
// When the reference counter is incremented, there is
// normally a node that is responsible for sending us the
// data of the block. However that operation may fail,
// so in all cases we add the block here to the todo list
// to check later that it arrived correctly, and if not
// we will fecth it from someone.
self . put_to_resync ( hash , 2 * BLOCK_RW_TIMEOUT ) ? ;
2020-04-11 21:00:26 +00:00
}
2021-10-22 10:09:03 +00:00
Ok ( ( ) )
}
2020-04-09 21:45:07 +00:00
2021-10-22 10:09:03 +00:00
/// Decrement the number of time a block is used
pub fn block_decref ( & self , hash : & Hash ) -> Result < ( ) , Error > {
2021-10-28 12:32:55 +00:00
let new_rc = self
. rc
. update_and_fetch ( & hash , | old | RcEntry ::parse_opt ( old ) . decrement ( ) . serialize ( ) ) ? ;
let new_rc = RcEntry ::parse_opt ( new_rc ) ;
if let RcEntry ::Deletable { .. } = new_rc {
self . put_to_resync ( hash , BLOCK_GC_DELAY + Duration ::from_secs ( 10 ) ) ? ;
2021-10-22 10:09:03 +00:00
}
Ok ( ( ) )
}
2020-04-11 21:00:26 +00:00
2021-10-22 10:09:03 +00:00
/// Read a block's reference count
2021-10-28 12:32:55 +00:00
fn get_block_rc ( & self , hash : & Hash ) -> Result < RcEntry , Error > {
Ok ( RcEntry ::parse_opt ( self . rc . get ( hash . as_ref ( ) ) ? ) )
}
/// Delete an entry in the RC table if it is deletable and the
/// deletion time has passed
fn clear_deleted_block_rc ( & self , hash : & Hash ) -> Result < ( ) , Error > {
let now = now_msec ( ) ;
self . rc . update_and_fetch ( & hash , | rcval | {
let updated = match RcEntry ::parse_opt ( rcval ) {
RcEntry ::Deletable { at_time } if now > at_time = > RcEntry ::Absent ,
v = > v ,
} ;
updated . serialize ( )
} ) ? ;
Ok ( ( ) )
2021-10-22 10:09:03 +00:00
}
// ---- Reading and writing blocks locally ----
/// Write a block to disk
2021-12-15 10:26:43 +00:00
async fn write_block ( & self , hash : & Hash , data : & DataBlock ) -> Result < BlockRpc , Error > {
2022-02-16 13:23:04 +00:00
let write_size = data . inner_buffer ( ) . len ( ) as u64 ;
let res = self
. mutation_lock
2021-10-22 10:09:03 +00:00
. lock ( )
. await
. write_block ( hash , data , self )
2022-02-22 12:53:59 +00:00
. bound_record_duration ( & self . metrics . block_write_duration )
2022-02-16 13:23:04 +00:00
. await ? ;
self . metrics . bytes_written . add ( write_size ) ;
Ok ( res )
2020-04-09 21:45:07 +00:00
}
2021-03-26 20:53:28 +00:00
/// Read block from disk, verifying it's integrity
2021-10-14 09:50:12 +00:00
async fn read_block ( & self , hash : & Hash ) -> Result < BlockRpc , Error > {
2022-02-22 13:52:41 +00:00
let data = self
. read_block_internal ( hash )
2022-02-22 12:53:59 +00:00
. bound_record_duration ( & self . metrics . block_read_duration )
. await ? ;
2022-02-22 13:52:41 +00:00
self . metrics
. bytes_read
. add ( data . inner_buffer ( ) . len ( ) as u64 ) ;
2022-02-22 12:53:59 +00:00
Ok ( BlockRpc ::PutBlock { hash : * hash , data } )
}
2022-02-16 13:23:04 +00:00
2022-02-22 12:53:59 +00:00
async fn read_block_internal ( & self , hash : & Hash ) -> Result < DataBlock , Error > {
2021-12-15 10:26:43 +00:00
let mut path = self . block_path ( hash ) ;
let compressed = match self . is_block_compressed ( hash ) . await {
Ok ( c ) = > c ,
2020-04-17 16:38:11 +00:00
Err ( e ) = > {
// Not found but maybe we should have had it ??
2021-10-28 12:32:55 +00:00
self . put_to_resync ( hash , 2 * BLOCK_RW_TIMEOUT ) ? ;
2020-04-17 16:38:11 +00:00
return Err ( Into ::into ( e ) ) ;
}
} ;
2021-12-15 10:26:43 +00:00
if compressed {
path . set_extension ( " zst " ) ;
}
let mut f = fs ::File ::open ( & path ) . await ? ;
2020-04-11 21:00:26 +00:00
let mut data = vec! [ ] ;
f . read_to_end ( & mut data ) . await ? ;
2020-04-17 13:36:16 +00:00
drop ( f ) ;
2021-12-15 10:26:43 +00:00
let data = if compressed {
DataBlock ::Compressed ( data )
} else {
DataBlock ::Plain ( data )
} ;
if data . verify ( * hash ) . is_err ( ) {
2022-02-16 13:23:04 +00:00
self . metrics . corruption_counter . add ( 1 ) ;
2021-10-22 10:09:03 +00:00
self . mutation_lock
. lock ( )
. await
. move_block_to_corrupted ( hash , self )
. await ? ;
2021-10-28 12:32:55 +00:00
self . put_to_resync ( hash , Duration ::from_millis ( 0 ) ) ? ;
2020-04-21 17:08:42 +00:00
return Err ( Error ::CorruptData ( * hash ) ) ;
2020-04-17 13:36:16 +00:00
}
2020-04-11 21:00:26 +00:00
2022-02-22 12:53:59 +00:00
Ok ( data )
2020-04-11 21:00:26 +00:00
}
2020-04-09 21:45:07 +00:00
2021-03-26 20:53:28 +00:00
/// Check if this node should have a block, but don't actually have it
2021-04-06 03:25:28 +00:00
async fn need_block ( & self , hash : & Hash ) -> Result < bool , Error > {
2021-10-22 10:09:03 +00:00
let BlockStatus { exists , needed } = self
. mutation_lock
. lock ( )
. await
. check_block_status ( hash , self )
. await ? ;
2021-10-28 12:32:55 +00:00
Ok ( needed . is_nonzero ( ) & & ! exists )
2020-04-17 17:16:08 +00:00
}
2021-10-22 10:09:03 +00:00
/// Utility: gives the path of the directory in which a block should be found
2020-04-11 21:00:26 +00:00
fn block_dir ( & self , hash : & Hash ) -> PathBuf {
let mut path = self . data_dir . clone ( ) ;
path . push ( hex ::encode ( & hash . as_slice ( ) [ 0 .. 1 ] ) ) ;
path . push ( hex ::encode ( & hash . as_slice ( ) [ 1 .. 2 ] ) ) ;
path
}
2021-10-22 10:09:03 +00:00
2021-12-15 10:26:43 +00:00
/// Utility: give the full path where a block should be found, minus extension if block is
/// compressed
2020-04-17 17:20:17 +00:00
fn block_path ( & self , hash : & Hash ) -> PathBuf {
let mut path = self . block_dir ( hash ) ;
path . push ( hex ::encode ( hash . as_ref ( ) ) ) ;
path
}
2020-04-09 21:45:07 +00:00
2021-12-15 10:26:43 +00:00
/// Utility: check if block is stored compressed. Error if block is not stored
async fn is_block_compressed ( & self , hash : & Hash ) -> Result < bool , Error > {
let mut path = self . block_path ( hash ) ;
path . set_extension ( " zst " ) ;
if fs ::metadata ( & path ) . await . is_ok ( ) {
return Ok ( true ) ;
}
path . set_extension ( " " ) ;
fs ::metadata ( & path ) . await . map ( | _ | false ) . map_err ( Into ::into )
}
2021-10-22 10:09:03 +00:00
// ---- Resync loop ----
2020-04-09 21:45:07 +00:00
2021-10-22 10:09:03 +00:00
pub fn spawn_background_worker ( self : Arc < Self > ) {
2022-03-01 10:57:18 +00:00
// Launch a background workers for background resync loop processing
let background = self . system . background . clone ( ) ;
tokio ::spawn ( async move {
tokio ::time ::sleep ( Duration ::from_secs ( 10 ) ) . await ;
background . spawn_worker ( " block resync worker " . into ( ) , move | must_exit | {
self . resync_loop ( must_exit )
2021-10-22 10:09:03 +00:00
} ) ;
2022-03-01 10:57:18 +00:00
} ) ;
2020-04-17 13:36:16 +00:00
}
2021-03-15 13:46:37 +00:00
fn put_to_resync ( & self , hash : & Hash , delay : Duration ) -> Result < ( ) , Error > {
let when = now_msec ( ) + delay . as_millis ( ) as u64 ;
2022-02-25 19:42:56 +00:00
self . put_to_resync_at ( hash , when )
}
fn put_to_resync_at ( & self , hash : & Hash , when : u64 ) -> Result < ( ) , Error > {
2020-04-21 12:54:55 +00:00
trace! ( " Put resync_queue: {} {:?} " , when , hash ) ;
2020-04-17 15:09:57 +00:00
let mut key = u64 ::to_be_bytes ( when ) . to_vec ( ) ;
key . extend ( hash . as_ref ( ) ) ;
self . resync_queue . insert ( key , hash . as_ref ( ) ) ? ;
2021-03-15 21:36:41 +00:00
self . resync_notify . notify_waiters ( ) ;
2020-04-17 15:09:57 +00:00
Ok ( ( ) )
}
2021-03-15 22:14:12 +00:00
async fn resync_loop ( self : Arc < Self > , mut must_exit : watch ::Receiver < bool > ) {
2021-11-03 17:28:43 +00:00
let mut tranquilizer = Tranquilizer ::new ( 30 ) ;
2020-04-17 13:36:16 +00:00
while ! * must_exit . borrow ( ) {
2021-11-03 17:28:43 +00:00
match self . resync_iter ( & mut must_exit ) . await {
Ok ( true ) = > {
tranquilizer . tranquilize ( BACKGROUND_TRANQUILITY ) . await ;
}
Ok ( false ) = > {
tranquilizer . reset ( ) ;
}
Err ( e ) = > {
// The errors that we have here are only Sled errors
// We don't really know how to handle them so just ¯\_(ツ)_/¯
// (there is kind of an assumption that Sled won't error on us,
// if it does there is not much we can do -- TODO should we just panic?)
error! (
" Could not do a resync iteration: {} (this is a very bad error) " ,
e
) ;
tranquilizer . reset ( ) ;
2021-03-15 22:14:12 +00:00
}
2021-03-15 19:09:44 +00:00
}
}
}
2021-11-03 17:28:43 +00:00
async fn resync_iter ( & self , must_exit : & mut watch ::Receiver < bool > ) -> Result < bool , Error > {
2022-03-01 13:55:37 +00:00
if let Some ( first_pair_res ) = self . resync_queue . iter ( ) . next ( ) {
let ( time_bytes , hash_bytes ) = first_pair_res ? ;
2021-11-03 21:07:43 +00:00
let time_msec = u64 ::from_be_bytes ( time_bytes [ 0 .. 8 ] . try_into ( ) . unwrap ( ) ) ;
2021-03-15 19:09:44 +00:00
let now = now_msec ( ) ;
2022-03-01 13:55:37 +00:00
2021-03-15 19:09:44 +00:00
if now > = time_msec {
let hash = Hash ::try_from ( & hash_bytes [ .. ] ) . unwrap ( ) ;
2022-02-18 19:39:55 +00:00
2022-02-25 19:42:56 +00:00
if let Some ( ec ) = self . resync_errors . get ( hash . as_slice ( ) ) ? {
let ec = ErrorCounter ::decode ( ec ) ;
if now < ec . next_try ( ) {
// if next retry after an error is not yet,
// don't do resync and return early, but still
// make sure the item is still in queue at expected time
self . put_to_resync_at ( & hash , ec . next_try ( ) ) ? ;
2022-03-01 13:55:37 +00:00
// ec.next_try() > now >= time_msec, so this remove
// is not removing the one we added just above
self . resync_queue . remove ( time_bytes ) ? ;
2022-02-25 19:42:56 +00:00
return Ok ( false ) ;
}
}
2022-02-18 19:39:55 +00:00
let tracer = opentelemetry ::global ::tracer ( " garage " ) ;
let trace_id = gen_uuid ( ) ;
let span = tracer
. span_builder ( " Resync block " )
. with_trace_id (
opentelemetry ::trace ::TraceId ::from_hex ( & hex ::encode (
& trace_id . as_slice ( ) [ .. 16 ] ,
) )
. unwrap ( ) ,
)
. with_attributes ( vec! [ KeyValue ::new ( " block " , format! ( " {:?} " , hash ) ) ] )
. start ( & tracer ) ;
let res = self
. resync_block ( & hash )
. with_context ( Context ::current_with_span ( span ) )
2022-02-22 12:53:59 +00:00
. bound_record_duration ( & self . metrics . resync_duration )
2022-02-18 19:39:55 +00:00
. await ;
2022-02-16 13:23:04 +00:00
self . metrics . resync_counter . add ( 1 ) ;
2021-03-15 19:09:44 +00:00
if let Err ( e ) = & res {
2022-02-16 13:23:04 +00:00
self . metrics . resync_error_counter . add ( 1 ) ;
2021-03-15 19:09:44 +00:00
warn! ( " Error when resyncing {:?}: {} " , hash , e ) ;
2022-02-25 19:42:56 +00:00
let err_counter = match self . resync_errors . get ( hash . as_slice ( ) ) ? {
2022-03-01 13:55:37 +00:00
Some ( ec ) = > ErrorCounter ::decode ( ec ) . add1 ( now + 1 ) ,
None = > ErrorCounter ::new ( now + 1 ) ,
2022-02-25 19:42:56 +00:00
} ;
self . resync_errors
. insert ( hash . as_slice ( ) , err_counter . encode ( ) ) ? ;
2022-03-01 13:55:37 +00:00
self . put_to_resync_at ( & hash , err_counter . next_try ( ) ) ? ;
// err_counter.next_try() >= now + 1 > now,
// the entry we remove from the queue is not
// the entry we inserted with put_to_resync_at
self . resync_queue . remove ( time_bytes ) ? ;
2022-02-25 19:42:56 +00:00
} else {
self . resync_errors . remove ( hash . as_slice ( ) ) ? ;
2022-03-01 13:55:37 +00:00
self . resync_queue . remove ( time_bytes ) ? ;
2020-04-22 20:32:58 +00:00
}
2022-02-25 19:42:56 +00:00
2021-11-03 17:28:43 +00:00
Ok ( true )
2020-04-22 20:32:58 +00:00
} else {
2021-03-15 21:36:41 +00:00
let delay = tokio ::time ::sleep ( Duration ::from_millis ( time_msec - now ) ) ;
2020-04-22 20:32:58 +00:00
select! {
2021-04-23 19:57:32 +00:00
_ = delay . fuse ( ) = > { } ,
_ = self . resync_notify . notified ( ) . fuse ( ) = > { } ,
_ = must_exit . changed ( ) . fuse ( ) = > { } ,
2020-04-17 13:36:16 +00:00
}
2021-11-03 17:28:43 +00:00
Ok ( false )
2020-04-11 21:00:26 +00:00
}
2021-03-15 19:09:44 +00:00
} else {
select! {
2021-04-23 19:57:32 +00:00
_ = self . resync_notify . notified ( ) . fuse ( ) = > { } ,
_ = must_exit . changed ( ) . fuse ( ) = > { } ,
2021-03-15 19:09:44 +00:00
}
2021-11-03 17:28:43 +00:00
Ok ( false )
2020-04-11 21:00:26 +00:00
}
}
2020-04-17 13:36:16 +00:00
2021-03-15 19:09:44 +00:00
async fn resync_block ( & self , hash : & Hash ) -> Result < ( ) , Error > {
2021-10-22 10:09:03 +00:00
let BlockStatus { exists , needed } = self
. mutation_lock
. lock ( )
. await
. check_block_status ( hash , self )
. await ? ;
2020-04-17 13:36:16 +00:00
2021-10-28 12:32:55 +00:00
if exists ! = needed . is_needed ( ) | | exists ! = needed . is_nonzero ( ) {
debug! (
" Resync block {:?}: exists {}, nonzero rc {}, deletable {} " ,
hash ,
exists ,
needed . is_nonzero ( ) ,
needed . is_deletable ( ) ,
2020-04-21 12:54:55 +00:00
) ;
}
2020-04-17 15:09:57 +00:00
2021-10-28 12:32:55 +00:00
if exists & & needed . is_deletable ( ) {
info! ( " Resync block {:?}: offloading and deleting " , hash ) ;
2021-02-24 10:58:03 +00:00
2021-10-26 08:20:05 +00:00
let mut who = self . replication . write_nodes ( hash ) ;
2021-03-16 10:14:27 +00:00
if who . len ( ) < self . replication . write_quorum ( ) {
2021-04-23 19:57:32 +00:00
return Err ( Error ::Message ( " Not trying to offload block because we don't have a quorum of nodes to write to " . to_string ( ) ) ) ;
2021-03-11 18:06:27 +00:00
}
2021-02-24 10:58:03 +00:00
who . retain ( | id | * id ! = self . system . id ) ;
2021-10-14 09:50:12 +00:00
let msg = Arc ::new ( BlockRpc ::NeedBlockQuery ( * hash ) ) ;
2021-02-24 10:58:03 +00:00
let who_needs_fut = who . iter ( ) . map ( | to | {
2021-10-14 09:50:12 +00:00
self . system . rpc . call_arc (
& self . endpoint ,
* to ,
msg . clone ( ) ,
2021-10-26 17:13:41 +00:00
RequestStrategy ::with_priority ( PRIO_BACKGROUND )
2021-10-14 09:50:12 +00:00
. with_timeout ( NEED_BLOCK_QUERY_TIMEOUT ) ,
)
2021-02-24 10:58:03 +00:00
} ) ;
let who_needs_resps = join_all ( who_needs_fut ) . await ;
let mut need_nodes = vec! [ ] ;
for ( node , needed ) in who . iter ( ) . zip ( who_needs_resps . into_iter ( ) ) {
2021-10-26 17:13:41 +00:00
match needed . err_context ( " NeedBlockQuery RPC " ) ? {
2021-10-14 09:50:12 +00:00
BlockRpc ::NeedBlockReply ( needed ) = > {
2021-02-24 10:58:03 +00:00
if needed {
need_nodes . push ( * node ) ;
2020-04-17 17:16:08 +00:00
}
}
2022-01-03 12:58:05 +00:00
m = > {
return Err ( Error ::unexpected_rpc_message ( m ) ) ;
2021-02-24 10:58:03 +00:00
}
2020-04-17 17:16:08 +00:00
}
2021-02-24 10:58:03 +00:00
}
2021-04-23 19:57:32 +00:00
if ! need_nodes . is_empty ( ) {
2021-03-05 14:09:18 +00:00
trace! (
" Block {:?} needed by {} nodes, sending " ,
hash ,
need_nodes . len ( )
) ;
2020-04-17 17:16:08 +00:00
2022-02-16 13:23:04 +00:00
for node in need_nodes . iter ( ) {
self . metrics
. resync_send_counter
. add ( 1 , & [ KeyValue ::new ( " to " , format! ( " {:?} " , node ) ) ] ) ;
}
2021-10-28 12:32:55 +00:00
let put_block_message = self . read_block ( hash ) . await ? ;
2021-10-14 09:50:12 +00:00
self . system
. rpc
2021-03-12 20:52:19 +00:00
. try_call_many (
2021-10-14 09:50:12 +00:00
& self . endpoint ,
2021-03-12 18:57:37 +00:00
& need_nodes [ .. ] ,
put_block_message ,
2021-10-26 17:13:41 +00:00
RequestStrategy ::with_priority ( PRIO_BACKGROUND )
2021-10-14 09:50:12 +00:00
. with_quorum ( need_nodes . len ( ) )
2021-03-12 20:52:19 +00:00
. with_timeout ( BLOCK_RW_TIMEOUT ) ,
)
2021-10-28 12:32:55 +00:00
. await
. err_context ( " PutBlock RPC " ) ? ;
2020-04-17 15:09:57 +00:00
}
2021-03-16 10:14:27 +00:00
info! (
2021-10-28 12:32:55 +00:00
" Deleting unneeded block {:?}, offload finished ({} / {}) " ,
2021-03-05 14:09:18 +00:00
hash ,
need_nodes . len ( ) ,
who . len ( )
) ;
2021-02-24 10:58:03 +00:00
2021-10-22 10:09:03 +00:00
self . mutation_lock
. lock ( )
. await
. delete_if_unneeded ( hash , self )
. await ? ;
2021-10-28 12:32:55 +00:00
self . clear_deleted_block_rc ( hash ) ? ;
2020-04-17 13:36:16 +00:00
}
2021-10-28 12:32:55 +00:00
if needed . is_nonzero ( ) & & ! exists {
info! (
" Resync block {:?}: fetching absent but needed block (refcount > 0) " ,
hash
) ;
2021-12-15 10:26:43 +00:00
let block_data = self . rpc_get_raw_block ( hash ) . await ? ;
2022-02-16 13:23:04 +00:00
self . metrics . resync_recv_counter . add ( 1 ) ;
2021-12-15 10:26:43 +00:00
self . write_block ( hash , & block_data ) . await ? ;
2020-04-17 13:36:16 +00:00
}
Ok ( ( ) )
}
2020-04-18 17:21:34 +00:00
2021-10-28 12:32:55 +00:00
// ---- Utility: iteration on files in the data directory ----
2021-06-23 23:34:28 +00:00
async fn for_each_file < F , Fut , State > (
& self ,
state : State ,
mut f : F ,
must_exit : & watch ::Receiver < bool > ,
) -> Result < ( ) , Error >
where
F : FnMut ( State , Hash ) -> Fut + Send ,
Fut : Future < Output = Result < State , Error > > + Send ,
State : Send ,
{
self . for_each_file_rec ( & self . data_dir , state , & mut f , must_exit )
. await
. map ( | _ | ( ) )
}
fn for_each_file_rec < ' a , F , Fut , State > (
2021-03-05 14:09:18 +00:00
& ' a self ,
2021-04-23 19:57:32 +00:00
path : & ' a Path ,
2021-06-23 23:34:28 +00:00
mut state : State ,
f : & ' a mut F ,
2021-03-05 14:09:18 +00:00
must_exit : & ' a watch ::Receiver < bool > ,
2021-06-23 23:34:28 +00:00
) -> BoxFuture < ' a , Result < State , Error > >
where
F : FnMut ( State , Hash ) -> Fut + Send ,
Fut : Future < Output = Result < State , Error > > + Send ,
State : Send + ' a ,
{
2021-02-24 10:58:03 +00:00
async move {
let mut ls_data_dir = fs ::read_dir ( path ) . await ? ;
2021-06-23 23:34:28 +00:00
while let Some ( data_dir_ent ) = ls_data_dir . next_entry ( ) . await ? {
if * must_exit . borrow ( ) {
break ;
}
2021-02-24 10:58:03 +00:00
let name = data_dir_ent . file_name ( ) ;
2021-06-23 23:34:28 +00:00
let name = if let Ok ( n ) = name . into_string ( ) {
n
} else {
continue ;
2020-04-19 20:36:36 +00:00
} ;
2021-02-24 10:58:03 +00:00
let ent_type = data_dir_ent . file_type ( ) . await ? ;
2021-12-15 10:26:43 +00:00
let name = name . strip_suffix ( " .zst " ) . unwrap_or ( & name ) ;
2021-02-24 10:58:03 +00:00
if name . len ( ) = = 2 & & hex ::decode ( & name ) . is_ok ( ) & & ent_type . is_dir ( ) {
2021-06-23 23:34:28 +00:00
state = self
. for_each_file_rec ( & data_dir_ent . path ( ) , state , f , must_exit )
2021-03-05 14:09:18 +00:00
. await ? ;
2021-02-24 10:58:03 +00:00
} else if name . len ( ) = = 64 {
2021-06-23 23:34:28 +00:00
let hash_bytes = if let Ok ( h ) = hex ::decode ( & name ) {
h
} else {
continue ;
2021-02-24 10:58:03 +00:00
} ;
let mut hash = [ 0 u8 ; 32 ] ;
hash . copy_from_slice ( & hash_bytes [ .. ] ) ;
2021-06-23 23:34:28 +00:00
state = f ( state , hash . into ( ) ) . await ? ;
2020-04-19 20:36:36 +00:00
}
}
2021-06-23 23:34:28 +00:00
Ok ( state )
2021-03-05 14:09:18 +00:00
}
. boxed ( )
2020-04-19 20:36:36 +00:00
}
2020-04-17 13:36:16 +00:00
}
2021-10-14 09:50:12 +00:00
#[ async_trait ]
impl EndpointHandler < BlockRpc > for BlockManager {
2021-10-15 09:05:09 +00:00
async fn handle (
self : & Arc < Self > ,
message : & BlockRpc ,
_from : NodeID ,
) -> Result < BlockRpc , Error > {
match message {
2021-12-15 10:26:43 +00:00
BlockRpc ::PutBlock { hash , data } = > self . write_block ( hash , data ) . await ,
2021-10-15 09:05:09 +00:00
BlockRpc ::GetBlock ( h ) = > self . read_block ( h ) . await ,
BlockRpc ::NeedBlockQuery ( h ) = > self . need_block ( h ) . await . map ( BlockRpc ::NeedBlockReply ) ,
2022-01-03 12:58:05 +00:00
m = > Err ( Error ::unexpected_rpc_message ( m ) ) ,
2021-10-15 09:05:09 +00:00
}
2021-10-14 09:50:12 +00:00
}
}
2021-10-22 10:09:03 +00:00
struct BlockStatus {
exists : bool ,
2021-10-28 12:32:55 +00:00
needed : RcEntry ,
2021-10-22 10:09:03 +00:00
}
impl BlockManagerLocked {
async fn check_block_status (
& self ,
hash : & Hash ,
mgr : & BlockManager ,
) -> Result < BlockStatus , Error > {
2021-12-15 10:26:43 +00:00
let exists = mgr . is_block_compressed ( hash ) . await . is_ok ( ) ;
2021-10-28 12:32:55 +00:00
let needed = mgr . get_block_rc ( hash ) ? ;
2021-10-22 10:09:03 +00:00
Ok ( BlockStatus { exists , needed } )
}
async fn write_block (
& self ,
hash : & Hash ,
2021-12-15 10:26:43 +00:00
data : & DataBlock ,
2021-10-22 10:09:03 +00:00
mgr : & BlockManager ,
) -> Result < BlockRpc , Error > {
2021-12-15 10:26:43 +00:00
let compressed = data . is_compressed ( ) ;
let data = data . inner_buffer ( ) ;
2021-10-22 10:09:03 +00:00
let mut path = mgr . block_dir ( hash ) ;
2022-03-01 10:52:12 +00:00
let directory = path . clone ( ) ;
2021-10-22 10:09:03 +00:00
path . push ( hex ::encode ( hash ) ) ;
2022-03-01 10:52:12 +00:00
fs ::create_dir_all ( & directory ) . await ? ;
2021-12-15 10:26:43 +00:00
let to_delete = match ( mgr . is_block_compressed ( hash ) . await , compressed ) {
( Ok ( true ) , _ ) = > return Ok ( BlockRpc ::Ok ) ,
( Ok ( false ) , false ) = > return Ok ( BlockRpc ::Ok ) ,
( Ok ( false ) , true ) = > {
let path_to_delete = path . clone ( ) ;
path . set_extension ( " zst " ) ;
Some ( path_to_delete )
}
( Err ( _ ) , compressed ) = > {
if compressed {
path . set_extension ( " zst " ) ;
}
None
}
} ;
2021-10-22 10:09:03 +00:00
let mut path2 = path . clone ( ) ;
path2 . set_extension ( " tmp " ) ;
let mut f = fs ::File ::create ( & path2 ) . await ? ;
f . write_all ( data ) . await ? ;
2022-03-01 10:52:12 +00:00
f . sync_all ( ) . await ? ;
2021-10-22 10:09:03 +00:00
drop ( f ) ;
fs ::rename ( path2 , path ) . await ? ;
2021-12-15 10:26:43 +00:00
if let Some ( to_delete ) = to_delete {
fs ::remove_file ( to_delete ) . await ? ;
}
2021-10-22 10:09:03 +00:00
2022-03-14 10:54:00 +00:00
// We want to ensure that when this function returns, data is properly persisted
// to disk. The first step is the sync_all above that does an fsync on the data file.
// Now, we do an fsync on the containing directory, to ensure that the rename
// is persisted properly. See:
// http://thedjbway.b0llix.net/qmail/syncdir.html
2022-03-01 10:52:12 +00:00
let dir = fs ::OpenOptions ::new ( )
. read ( true )
. mode ( 0 )
. open ( directory )
. await ? ;
dir . sync_all ( ) . await ? ;
drop ( dir ) ;
2021-10-22 10:09:03 +00:00
Ok ( BlockRpc ::Ok )
}
async fn move_block_to_corrupted ( & self , hash : & Hash , mgr : & BlockManager ) -> Result < ( ) , Error > {
warn! (
" Block {:?} is corrupted. Renaming to .corrupted and resyncing. " ,
hash
) ;
2021-12-15 10:26:43 +00:00
let mut path = mgr . block_path ( hash ) ;
2021-10-22 10:09:03 +00:00
let mut path2 = path . clone ( ) ;
2021-12-15 10:26:43 +00:00
if mgr . is_block_compressed ( hash ) . await ? {
path . set_extension ( " zst " ) ;
path2 . set_extension ( " zst.corrupted " ) ;
} else {
path2 . set_extension ( " corrupted " ) ;
}
2021-10-22 10:09:03 +00:00
fs ::rename ( path , path2 ) . await ? ;
Ok ( ( ) )
}
async fn delete_if_unneeded ( & self , hash : & Hash , mgr : & BlockManager ) -> Result < ( ) , Error > {
let BlockStatus { exists , needed } = self . check_block_status ( hash , mgr ) . await ? ;
2021-10-28 12:32:55 +00:00
if exists & & needed . is_deletable ( ) {
2021-12-15 10:26:43 +00:00
let mut path = mgr . block_path ( hash ) ;
if mgr . is_block_compressed ( hash ) . await ? {
path . set_extension ( " zst " ) ;
}
2021-10-22 10:09:03 +00:00
fs ::remove_file ( path ) . await ? ;
2022-02-16 13:23:04 +00:00
mgr . metrics . delete_counter . add ( 1 ) ;
2021-10-22 10:09:03 +00:00
}
Ok ( ( ) )
}
}
2021-10-28 12:32:55 +00:00
/// Describes the state of the reference counter for a block
#[ derive(Clone, Copy, Debug) ]
enum RcEntry {
/// Present: the block has `count` references, with `count` > 0.
///
/// This is stored as u64::to_be_bytes(count)
Present { count : u64 } ,
/// Deletable: the block has zero references, and can be deleted
/// once time (returned by now_msec) is larger than at_time
/// (in millis since Unix epoch)
///
/// This is stored as [0u8; 8] followed by u64::to_be_bytes(at_time),
/// (this allows for the data format to be backwards compatible with
/// previous Garage versions that didn't have this intermediate state)
Deletable { at_time : u64 } ,
/// Absent: the block has zero references, and can be deleted
/// immediately
Absent ,
}
impl RcEntry {
fn parse ( bytes : & [ u8 ] ) -> Self {
if bytes . len ( ) = = 8 {
RcEntry ::Present {
count : u64 ::from_be_bytes ( bytes . try_into ( ) . unwrap ( ) ) ,
}
} else if bytes . len ( ) = = 16 {
RcEntry ::Deletable {
at_time : u64 ::from_be_bytes ( bytes [ 8 .. 16 ] . try_into ( ) . unwrap ( ) ) ,
}
} else {
panic! ( " Invalid RC entry: {:?} , database is corrupted. This is an error Garage is currently unable to recover from. Sorry, and also please report a bug. " ,
bytes
)
}
}
fn parse_opt < V : AsRef < [ u8 ] > > ( bytes : Option < V > ) -> Self {
bytes
. map ( | b | Self ::parse ( b . as_ref ( ) ) )
. unwrap_or ( Self ::Absent )
}
fn serialize ( self ) -> Option < Vec < u8 > > {
match self {
RcEntry ::Present { count } = > Some ( u64 ::to_be_bytes ( count ) . to_vec ( ) ) ,
RcEntry ::Deletable { at_time } = > {
Some ( [ u64 ::to_be_bytes ( 0 ) , u64 ::to_be_bytes ( at_time ) ] . concat ( ) )
}
RcEntry ::Absent = > None ,
}
}
fn increment ( self ) -> Self {
let old_count = match self {
RcEntry ::Present { count } = > count ,
_ = > 0 ,
} ;
RcEntry ::Present {
count : old_count + 1 ,
}
}
fn decrement ( self ) -> Self {
match self {
RcEntry ::Present { count } = > {
if count > 1 {
RcEntry ::Present { count : count - 1 }
} else {
RcEntry ::Deletable {
at_time : now_msec ( ) + BLOCK_GC_DELAY . as_millis ( ) as u64 ,
}
}
}
del = > del ,
}
}
fn is_zero ( & self ) -> bool {
matches! ( self , RcEntry ::Deletable { .. } | RcEntry ::Absent )
}
fn is_nonzero ( & self ) -> bool {
! self . is_zero ( )
}
fn is_deletable ( & self ) -> bool {
match self {
RcEntry ::Present { .. } = > false ,
RcEntry ::Deletable { at_time } = > now_msec ( ) > * at_time ,
RcEntry ::Absent = > true ,
}
}
fn is_needed ( & self ) -> bool {
! self . is_deletable ( )
}
}
2021-12-15 10:26:43 +00:00
2022-02-25 19:42:56 +00:00
/// Counts the number of errors when resyncing a block,
/// and the time of the last try.
/// Used to implement exponential backoff.
#[ derive(Clone, Copy, Debug) ]
struct ErrorCounter {
errors : u64 ,
last_try : u64 ,
}
2022-03-01 13:55:37 +00:00
impl ErrorCounter {
fn new ( now : u64 ) -> Self {
2022-02-25 19:42:56 +00:00
Self {
errors : 1 ,
2022-03-01 13:55:37 +00:00
last_try : now ,
2022-02-25 19:42:56 +00:00
}
}
fn decode ( data : sled ::IVec ) -> Self {
Self {
errors : u64 ::from_be_bytes ( data [ 0 .. 8 ] . try_into ( ) . unwrap ( ) ) ,
last_try : u64 ::from_be_bytes ( data [ 8 .. 16 ] . try_into ( ) . unwrap ( ) ) ,
}
}
fn encode ( & self ) -> Vec < u8 > {
[
u64 ::to_be_bytes ( self . errors ) ,
u64 ::to_be_bytes ( self . last_try ) ,
]
. concat ( )
}
2022-03-01 13:55:37 +00:00
fn add1 ( self , now : u64 ) -> Self {
2022-02-25 19:42:56 +00:00
Self {
errors : self . errors + 1 ,
2022-03-01 13:55:37 +00:00
last_try : now ,
2022-02-25 19:42:56 +00:00
}
}
fn delay_msec ( & self ) -> u64 {
( RESYNC_RETRY_DELAY . as_millis ( ) as u64 ) < < std ::cmp ::min ( self . errors - 1 , 10 )
}
fn next_try ( & self ) -> u64 {
self . last_try + self . delay_msec ( )
}
}
2021-12-15 10:26:43 +00:00
fn zstd_encode < R : std ::io ::Read > ( mut source : R , level : i32 ) -> std ::io ::Result < Vec < u8 > > {
let mut result = Vec ::< u8 > ::new ( ) ;
let mut encoder = Encoder ::new ( & mut result , level ) ? ;
encoder . include_checksum ( true ) ? ;
std ::io ::copy ( & mut source , & mut encoder ) ? ;
encoder . finish ( ) ? ;
Ok ( result )
}