2021-11-03 21:07:43 +00:00
use std ::convert ::TryInto ;
2021-04-23 19:57:32 +00:00
use std ::path ::{ Path , PathBuf } ;
2020-04-17 13:36:16 +00:00
use std ::sync ::Arc ;
use std ::time ::Duration ;
2020-04-09 21:45:07 +00:00
2020-04-17 16:51:29 +00:00
use arc_swap ::ArcSwapOption ;
2021-10-14 09:50:12 +00:00
use async_trait ::async_trait ;
2020-04-17 17:16:08 +00:00
use futures ::future ::* ;
2020-04-22 20:32:58 +00:00
use futures ::select ;
2020-04-18 17:21:34 +00:00
use serde ::{ Deserialize , Serialize } ;
2020-04-09 21:45:07 +00:00
use tokio ::fs ;
2021-03-15 21:36:41 +00:00
use tokio ::io ::{ AsyncReadExt , AsyncWriteExt } ;
2020-04-22 20:32:58 +00:00
use tokio ::sync ::{ watch , Mutex , Notify } ;
2020-04-09 21:45:07 +00:00
2020-04-24 10:10:01 +00:00
use garage_util ::data ::* ;
2021-10-26 17:13:41 +00:00
use garage_util ::error ::* ;
2021-03-15 15:21:41 +00:00
use garage_util ::time ::* ;
2021-11-03 17:28:43 +00:00
use garage_util ::tranquilizer ::Tranquilizer ;
2020-04-18 17:39:08 +00:00
2021-10-14 09:50:12 +00:00
use garage_rpc ::system ::System ;
use garage_rpc ::* ;
2020-04-23 17:05:46 +00:00
2021-03-26 18:41:46 +00:00
use garage_table ::replication ::{ TableReplication , TableShardedReplication } ;
2020-04-23 17:05:46 +00:00
2020-04-24 10:10:01 +00:00
use crate ::block_ref_table ::* ;
2020-04-23 17:05:46 +00:00
2020-04-24 10:10:01 +00:00
use crate ::garage ::Garage ;
2020-04-11 21:00:26 +00:00
2021-03-26 20:53:28 +00:00
/// Size under which data will be stored inlined in database instead of as files
2020-04-18 17:30:05 +00:00
pub const INLINE_THRESHOLD : usize = 3072 ;
2021-10-26 17:13:41 +00:00
pub const BACKGROUND_WORKERS : u64 = 1 ;
2021-11-03 17:28:43 +00:00
pub const BACKGROUND_TRANQUILITY : u32 = 3 ;
2021-03-15 18:51:16 +00:00
2021-10-28 12:32:55 +00:00
// Timeout for RPCs that read and write blocks to remote nodes
const BLOCK_RW_TIMEOUT : Duration = Duration ::from_secs ( 30 ) ;
// Timeout for RPCs that ask other nodes whether they need a copy
// of a given block before we delete it locally
2020-04-17 17:16:08 +00:00
const NEED_BLOCK_QUERY_TIMEOUT : Duration = Duration ::from_secs ( 5 ) ;
2021-10-28 12:32:55 +00:00
// The delay between the time where a resync operation fails
// and the time when it is retried.
const RESYNC_RETRY_DELAY : Duration = Duration ::from_secs ( 60 ) ;
// The delay between the moment when the reference counter
// drops to zero, and the moment where we allow ourselves
// to delete the block locally.
const BLOCK_GC_DELAY : Duration = Duration ::from_secs ( 600 ) ;
2020-04-17 17:16:08 +00:00
2021-03-26 20:53:28 +00:00
/// RPC messages used to share blocks of data between nodes
2020-04-18 17:21:34 +00:00
#[ derive(Debug, Serialize, Deserialize) ]
2021-10-14 09:50:12 +00:00
pub enum BlockRpc {
2020-04-18 17:21:34 +00:00
Ok ,
2021-03-26 20:53:28 +00:00
/// Message to ask for a block of data, by hash
2020-04-18 17:21:34 +00:00
GetBlock ( Hash ) ,
2021-03-26 20:53:28 +00:00
/// Message to send a block of data, either because requested, of for first delivery of new
/// block
2020-04-18 17:21:34 +00:00
PutBlock ( PutBlockMessage ) ,
2021-03-26 20:53:28 +00:00
/// Ask other node if they should have this block, but don't actually have it
2020-04-18 17:21:34 +00:00
NeedBlockQuery ( Hash ) ,
2021-03-26 20:53:28 +00:00
/// Response : whether the node do require that block
2020-04-18 17:21:34 +00:00
NeedBlockReply ( bool ) ,
}
2021-03-26 20:53:28 +00:00
/// Structure used to send a block
2020-04-18 17:30:05 +00:00
#[ derive(Debug, Serialize, Deserialize) ]
pub struct PutBlockMessage {
2021-03-26 20:53:28 +00:00
/// Hash of the block
2020-04-18 17:30:05 +00:00
pub hash : Hash ,
2021-03-26 20:53:28 +00:00
/// Content of the block
2020-04-18 17:30:05 +00:00
#[ serde(with = " serde_bytes " ) ]
pub data : Vec < u8 > ,
}
2021-10-15 09:05:09 +00:00
impl Rpc for BlockRpc {
type Response = Result < BlockRpc , Error > ;
2021-10-14 09:50:12 +00:00
}
2020-04-18 17:21:34 +00:00
2021-03-26 20:53:28 +00:00
/// The block manager, handling block exchange between nodes, and block storage on local node
2020-04-11 21:00:26 +00:00
pub struct BlockManager {
2021-03-26 20:53:28 +00:00
/// Replication strategy, allowing to find on which node blocks should be located
2020-04-23 17:05:46 +00:00
pub replication : TableShardedReplication ,
2021-03-26 20:53:28 +00:00
/// Directory in which block are stored
2020-04-11 21:00:26 +00:00
pub data_dir : PathBuf ,
2021-10-22 10:09:03 +00:00
mutation_lock : Mutex < BlockManagerLocked > ,
2020-04-22 20:32:58 +00:00
2021-03-15 18:51:16 +00:00
rc : sled ::Tree ,
2020-04-22 20:32:58 +00:00
2021-03-15 18:51:16 +00:00
resync_queue : sled ::Tree ,
resync_notify : Notify ,
2020-04-22 20:32:58 +00:00
2021-03-15 18:51:16 +00:00
system : Arc < System > ,
2021-10-14 09:50:12 +00:00
endpoint : Arc < Endpoint < BlockRpc , Self > > ,
2021-03-15 18:51:16 +00:00
pub ( crate ) garage : ArcSwapOption < Garage > ,
2020-04-09 21:45:07 +00:00
}
2021-10-22 10:09:03 +00:00
// This custom struct contains functions that must only be ran
// when the lock is held. We ensure that it is the case by storing
// it INSIDE a Mutex.
struct BlockManagerLocked ( ) ;
2020-04-11 21:00:26 +00:00
impl BlockManager {
2020-04-18 17:21:34 +00:00
pub fn new (
db : & sled ::Db ,
data_dir : PathBuf ,
2020-04-23 17:05:46 +00:00
replication : TableShardedReplication ,
2020-04-18 17:21:34 +00:00
system : Arc < System > ,
) -> Arc < Self > {
2020-04-11 21:53:32 +00:00
let rc = db
. open_tree ( " block_local_rc " )
2020-04-11 21:00:26 +00:00
. expect ( " Unable to open block_local_rc tree " ) ;
2020-04-17 13:36:16 +00:00
let resync_queue = db
. open_tree ( " block_local_resync_queue " )
. expect ( " Unable to open block_local_resync_queue tree " ) ;
2021-10-19 14:16:10 +00:00
let endpoint = system
. netapp
. endpoint ( " garage_model/block.rs/Rpc " . to_string ( ) ) ;
2020-04-18 17:21:34 +00:00
2021-10-22 10:09:03 +00:00
let manager_locked = BlockManagerLocked ( ) ;
2020-04-18 17:21:34 +00:00
let block_manager = Arc ::new ( Self {
2020-04-23 17:05:46 +00:00
replication ,
2020-04-22 20:32:58 +00:00
data_dir ,
2021-10-22 10:09:03 +00:00
mutation_lock : Mutex ::new ( manager_locked ) ,
2020-04-11 21:00:26 +00:00
rc ,
2020-04-17 13:36:16 +00:00
resync_queue ,
2020-04-22 20:32:58 +00:00
resync_notify : Notify ::new ( ) ,
2020-04-17 13:36:16 +00:00
system ,
2021-10-14 09:50:12 +00:00
endpoint ,
2020-04-17 15:09:57 +00:00
garage : ArcSwapOption ::from ( None ) ,
2020-04-18 17:21:34 +00:00
} ) ;
2021-10-14 09:50:12 +00:00
block_manager . endpoint . set_handler ( block_manager . clone ( ) ) ;
2020-04-23 14:40:59 +00:00
2021-10-14 09:50:12 +00:00
block_manager
2020-04-23 14:40:59 +00:00
}
2021-10-22 10:09:03 +00:00
// ---- Public interface ----
/// Ask nodes that might have a block for it
pub async fn rpc_get_block ( & self , hash : & Hash ) -> Result < Vec < u8 > , Error > {
2021-10-26 08:20:05 +00:00
let who = self . replication . read_nodes ( hash ) ;
2021-10-22 10:09:03 +00:00
let resps = self
. system
. rpc
. try_call_many (
& self . endpoint ,
& who [ .. ] ,
BlockRpc ::GetBlock ( * hash ) ,
RequestStrategy ::with_priority ( PRIO_NORMAL )
. with_quorum ( 1 )
. with_timeout ( BLOCK_RW_TIMEOUT )
. interrupt_after_quorum ( true ) ,
)
. await ? ;
for resp in resps {
if let BlockRpc ::PutBlock ( msg ) = resp {
return Ok ( msg . data ) ;
}
2020-04-17 18:58:10 +00:00
}
2021-10-22 10:09:03 +00:00
Err ( Error ::Message ( format! (
" Unable to read block {:?}: no valid blocks returned " ,
hash
) ) )
2020-04-11 21:00:26 +00:00
}
2021-10-22 10:09:03 +00:00
/// Send block to nodes that should have it
pub async fn rpc_put_block ( & self , hash : Hash , data : Vec < u8 > ) -> Result < ( ) , Error > {
let who = self . replication . write_nodes ( & hash ) ;
self . system
. rpc
. try_call_many (
& self . endpoint ,
& who [ .. ] ,
BlockRpc ::PutBlock ( PutBlockMessage { hash , data } ) ,
RequestStrategy ::with_priority ( PRIO_NORMAL )
. with_quorum ( self . replication . write_quorum ( ) )
. with_timeout ( BLOCK_RW_TIMEOUT ) ,
)
. await ? ;
Ok ( ( ) )
}
2020-04-11 21:00:26 +00:00
2021-10-22 10:09:03 +00:00
/// Launch the repair procedure on the data store
///
/// This will list all blocks locally present, as well as those
/// that are required because of refcount > 0, and will try
/// to fix any mismatch between the two.
pub async fn repair_data_store ( & self , must_exit : & watch ::Receiver < bool > ) -> Result < ( ) , Error > {
2021-10-28 12:32:55 +00:00
// 1. Repair blocks from RC table.
2021-10-22 10:09:03 +00:00
let garage = self . garage . load_full ( ) . unwrap ( ) ;
let mut last_hash = None ;
for ( i , entry ) in garage . block_ref_table . data . store . iter ( ) . enumerate ( ) {
let ( _k , v_bytes ) = entry ? ;
let block_ref = rmp_serde ::decode ::from_read_ref ::< _ , BlockRef > ( v_bytes . as_ref ( ) ) ? ;
if Some ( & block_ref . block ) = = last_hash . as_ref ( ) {
continue ;
}
if ! block_ref . deleted . get ( ) {
last_hash = Some ( block_ref . block ) ;
self . put_to_resync ( & block_ref . block , Duration ::from_secs ( 0 ) ) ? ;
}
if i & 0xFF = = 0 & & * must_exit . borrow ( ) {
return Ok ( ( ) ) ;
}
}
2020-04-09 21:45:07 +00:00
2021-10-22 10:09:03 +00:00
// 2. Repair blocks actually on disk
2021-06-23 23:34:28 +00:00
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
self . for_each_file (
( ) ,
move | _ , hash | async move { self . put_to_resync ( & hash , Duration ::from_secs ( 0 ) ) } ,
must_exit ,
)
. await
2021-10-22 10:09:03 +00:00
}
2021-10-27 08:36:04 +00:00
/// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
/// this function.
pub async fn scrub_data_store (
& self ,
must_exit : & watch ::Receiver < bool > ,
2021-11-03 17:28:43 +00:00
tranquility : u32 ,
2021-10-27 08:36:04 +00:00
) -> Result < ( ) , Error > {
2021-11-03 17:28:43 +00:00
let tranquilizer = Tranquilizer ::new ( 30 ) ;
2021-10-27 08:36:04 +00:00
self . for_each_file (
2021-11-03 17:28:43 +00:00
tranquilizer ,
move | mut tranquilizer , hash | async move {
let _ = self . read_block ( & hash ) . await ;
tranquilizer . tranquilize ( tranquility ) . await ;
Ok ( tranquilizer )
2021-10-27 08:36:04 +00:00
} ,
must_exit ,
)
. await
}
2021-10-22 10:09:03 +00:00
/// Get lenght of resync queue
pub fn resync_queue_len ( & self ) -> usize {
self . resync_queue . len ( )
}
/// Get number of items in the refcount table
pub fn rc_len ( & self ) -> usize {
self . rc . len ( )
}
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known
pub fn block_incref ( & self , hash : & Hash ) -> Result < ( ) , Error > {
2021-10-28 12:32:55 +00:00
let old_rc = self
. rc
. fetch_and_update ( & hash , | old | RcEntry ::parse_opt ( old ) . increment ( ) . serialize ( ) ) ? ;
let old_rc = RcEntry ::parse_opt ( old_rc ) ;
if old_rc . is_zero ( ) {
// When the reference counter is incremented, there is
// normally a node that is responsible for sending us the
// data of the block. However that operation may fail,
// so in all cases we add the block here to the todo list
// to check later that it arrived correctly, and if not
// we will fecth it from someone.
self . put_to_resync ( hash , 2 * BLOCK_RW_TIMEOUT ) ? ;
2020-04-11 21:00:26 +00:00
}
2021-10-22 10:09:03 +00:00
Ok ( ( ) )
}
2020-04-09 21:45:07 +00:00
2021-10-22 10:09:03 +00:00
/// Decrement the number of time a block is used
pub fn block_decref ( & self , hash : & Hash ) -> Result < ( ) , Error > {
2021-10-28 12:32:55 +00:00
let new_rc = self
. rc
. update_and_fetch ( & hash , | old | RcEntry ::parse_opt ( old ) . decrement ( ) . serialize ( ) ) ? ;
let new_rc = RcEntry ::parse_opt ( new_rc ) ;
if let RcEntry ::Deletable { .. } = new_rc {
self . put_to_resync ( hash , BLOCK_GC_DELAY + Duration ::from_secs ( 10 ) ) ? ;
2021-10-22 10:09:03 +00:00
}
Ok ( ( ) )
}
2020-04-11 21:00:26 +00:00
2021-10-22 10:09:03 +00:00
/// Read a block's reference count
2021-10-28 12:32:55 +00:00
fn get_block_rc ( & self , hash : & Hash ) -> Result < RcEntry , Error > {
Ok ( RcEntry ::parse_opt ( self . rc . get ( hash . as_ref ( ) ) ? ) )
}
/// Delete an entry in the RC table if it is deletable and the
/// deletion time has passed
fn clear_deleted_block_rc ( & self , hash : & Hash ) -> Result < ( ) , Error > {
let now = now_msec ( ) ;
self . rc . update_and_fetch ( & hash , | rcval | {
let updated = match RcEntry ::parse_opt ( rcval ) {
RcEntry ::Deletable { at_time } if now > at_time = > RcEntry ::Absent ,
v = > v ,
} ;
updated . serialize ( )
} ) ? ;
Ok ( ( ) )
2021-10-22 10:09:03 +00:00
}
// ---- Reading and writing blocks locally ----
/// Write a block to disk
async fn write_block ( & self , hash : & Hash , data : & [ u8 ] ) -> Result < BlockRpc , Error > {
self . mutation_lock
. lock ( )
. await
. write_block ( hash , data , self )
. await
2020-04-09 21:45:07 +00:00
}
2021-03-26 20:53:28 +00:00
/// Read block from disk, verifying it's integrity
2021-10-14 09:50:12 +00:00
async fn read_block ( & self , hash : & Hash ) -> Result < BlockRpc , Error > {
2020-04-17 17:20:17 +00:00
let path = self . block_path ( hash ) ;
2020-04-09 21:45:07 +00:00
2020-04-17 16:38:11 +00:00
let mut f = match fs ::File ::open ( & path ) . await {
Ok ( f ) = > f ,
Err ( e ) = > {
// Not found but maybe we should have had it ??
2021-10-28 12:32:55 +00:00
self . put_to_resync ( hash , 2 * BLOCK_RW_TIMEOUT ) ? ;
2020-04-17 16:38:11 +00:00
return Err ( Into ::into ( e ) ) ;
}
} ;
2020-04-11 21:00:26 +00:00
let mut data = vec! [ ] ;
f . read_to_end ( & mut data ) . await ? ;
2020-04-17 13:36:16 +00:00
drop ( f ) ;
2021-03-15 15:21:41 +00:00
if blake2sum ( & data [ .. ] ) ! = * hash {
2021-10-22 10:09:03 +00:00
self . mutation_lock
. lock ( )
. await
. move_block_to_corrupted ( hash , self )
. await ? ;
2021-10-28 12:32:55 +00:00
self . put_to_resync ( hash , Duration ::from_millis ( 0 ) ) ? ;
2020-04-21 17:08:42 +00:00
return Err ( Error ::CorruptData ( * hash ) ) ;
2020-04-17 13:36:16 +00:00
}
2020-04-11 21:00:26 +00:00
2021-10-14 09:50:12 +00:00
Ok ( BlockRpc ::PutBlock ( PutBlockMessage { hash : * hash , data } ) )
2020-04-11 21:00:26 +00:00
}
2020-04-09 21:45:07 +00:00
2021-03-26 20:53:28 +00:00
/// Check if this node should have a block, but don't actually have it
2021-04-06 03:25:28 +00:00
async fn need_block ( & self , hash : & Hash ) -> Result < bool , Error > {
2021-10-22 10:09:03 +00:00
let BlockStatus { exists , needed } = self
. mutation_lock
. lock ( )
. await
. check_block_status ( hash , self )
. await ? ;
2021-10-28 12:32:55 +00:00
Ok ( needed . is_nonzero ( ) & & ! exists )
2020-04-17 17:16:08 +00:00
}
2021-10-22 10:09:03 +00:00
/// Utility: gives the path of the directory in which a block should be found
2020-04-11 21:00:26 +00:00
fn block_dir ( & self , hash : & Hash ) -> PathBuf {
let mut path = self . data_dir . clone ( ) ;
path . push ( hex ::encode ( & hash . as_slice ( ) [ 0 .. 1 ] ) ) ;
path . push ( hex ::encode ( & hash . as_slice ( ) [ 1 .. 2 ] ) ) ;
path
}
2021-10-22 10:09:03 +00:00
/// Utility: give the full path where a block should be found
2020-04-17 17:20:17 +00:00
fn block_path ( & self , hash : & Hash ) -> PathBuf {
let mut path = self . block_dir ( hash ) ;
path . push ( hex ::encode ( hash . as_ref ( ) ) ) ;
path
}
2020-04-09 21:45:07 +00:00
2021-10-22 10:09:03 +00:00
// ---- Resync loop ----
2020-04-09 21:45:07 +00:00
2021-10-22 10:09:03 +00:00
pub fn spawn_background_worker ( self : Arc < Self > ) {
2021-10-28 12:32:55 +00:00
// Launch n simultaneous workers for background resync loop preprocessing
2021-10-22 10:09:03 +00:00
for i in 0 .. BACKGROUND_WORKERS {
let bm2 = self . clone ( ) ;
let background = self . system . background . clone ( ) ;
tokio ::spawn ( async move {
tokio ::time ::sleep ( Duration ::from_secs ( 10 * ( i + 1 ) ) ) . await ;
background . spawn_worker ( format! ( " block resync worker {} " , i ) , move | must_exit | {
bm2 . resync_loop ( must_exit )
} ) ;
} ) ;
2020-04-17 13:36:16 +00:00
}
}
2021-03-15 13:46:37 +00:00
fn put_to_resync ( & self , hash : & Hash , delay : Duration ) -> Result < ( ) , Error > {
let when = now_msec ( ) + delay . as_millis ( ) as u64 ;
2020-04-21 12:54:55 +00:00
trace! ( " Put resync_queue: {} {:?} " , when , hash ) ;
2020-04-17 15:09:57 +00:00
let mut key = u64 ::to_be_bytes ( when ) . to_vec ( ) ;
key . extend ( hash . as_ref ( ) ) ;
self . resync_queue . insert ( key , hash . as_ref ( ) ) ? ;
2021-03-15 21:36:41 +00:00
self . resync_notify . notify_waiters ( ) ;
2020-04-17 15:09:57 +00:00
Ok ( ( ) )
}
2021-03-15 22:14:12 +00:00
async fn resync_loop ( self : Arc < Self > , mut must_exit : watch ::Receiver < bool > ) {
2021-11-03 17:28:43 +00:00
let mut tranquilizer = Tranquilizer ::new ( 30 ) ;
2020-04-17 13:36:16 +00:00
while ! * must_exit . borrow ( ) {
2021-11-03 17:28:43 +00:00
match self . resync_iter ( & mut must_exit ) . await {
Ok ( true ) = > {
tranquilizer . tranquilize ( BACKGROUND_TRANQUILITY ) . await ;
}
Ok ( false ) = > {
tranquilizer . reset ( ) ;
}
Err ( e ) = > {
// The errors that we have here are only Sled errors
// We don't really know how to handle them so just ¯\_(ツ)_/¯
// (there is kind of an assumption that Sled won't error on us,
// if it does there is not much we can do -- TODO should we just panic?)
error! (
" Could not do a resync iteration: {} (this is a very bad error) " ,
e
) ;
tranquilizer . reset ( ) ;
2021-03-15 22:14:12 +00:00
}
2021-03-15 19:09:44 +00:00
}
}
}
2021-11-03 17:28:43 +00:00
async fn resync_iter ( & self , must_exit : & mut watch ::Receiver < bool > ) -> Result < bool , Error > {
2021-10-26 17:13:41 +00:00
if let Some ( ( time_bytes , hash_bytes ) ) = self . resync_queue . pop_min ( ) ? {
2021-11-03 21:07:43 +00:00
let time_msec = u64 ::from_be_bytes ( time_bytes [ 0 .. 8 ] . try_into ( ) . unwrap ( ) ) ;
2021-03-15 19:09:44 +00:00
let now = now_msec ( ) ;
if now > = time_msec {
let hash = Hash ::try_from ( & hash_bytes [ .. ] ) . unwrap ( ) ;
let res = self . resync_block ( & hash ) . await ;
if let Err ( e ) = & res {
warn! ( " Error when resyncing {:?}: {} " , hash , e ) ;
2021-10-28 12:32:55 +00:00
self . put_to_resync ( & hash , RESYNC_RETRY_DELAY ) ? ;
2020-04-22 20:32:58 +00:00
}
2021-11-03 17:28:43 +00:00
Ok ( true )
2020-04-22 20:32:58 +00:00
} else {
2021-10-26 17:13:41 +00:00
self . resync_queue . insert ( time_bytes , hash_bytes ) ? ;
2021-03-15 21:36:41 +00:00
let delay = tokio ::time ::sleep ( Duration ::from_millis ( time_msec - now ) ) ;
2020-04-22 20:32:58 +00:00
select! {
2021-04-23 19:57:32 +00:00
_ = delay . fuse ( ) = > { } ,
_ = self . resync_notify . notified ( ) . fuse ( ) = > { } ,
_ = must_exit . changed ( ) . fuse ( ) = > { } ,
2020-04-17 13:36:16 +00:00
}
2021-11-03 17:28:43 +00:00
Ok ( false )
2020-04-11 21:00:26 +00:00
}
2021-03-15 19:09:44 +00:00
} else {
select! {
2021-04-23 19:57:32 +00:00
_ = self . resync_notify . notified ( ) . fuse ( ) = > { } ,
_ = must_exit . changed ( ) . fuse ( ) = > { } ,
2021-03-15 19:09:44 +00:00
}
2021-11-03 17:28:43 +00:00
Ok ( false )
2020-04-11 21:00:26 +00:00
}
}
2020-04-17 13:36:16 +00:00
2021-03-15 19:09:44 +00:00
async fn resync_block ( & self , hash : & Hash ) -> Result < ( ) , Error > {
2021-10-22 10:09:03 +00:00
let BlockStatus { exists , needed } = self
. mutation_lock
. lock ( )
. await
. check_block_status ( hash , self )
. await ? ;
2020-04-17 13:36:16 +00:00
2021-10-28 12:32:55 +00:00
if exists ! = needed . is_needed ( ) | | exists ! = needed . is_nonzero ( ) {
debug! (
" Resync block {:?}: exists {}, nonzero rc {}, deletable {} " ,
hash ,
exists ,
needed . is_nonzero ( ) ,
needed . is_deletable ( ) ,
2020-04-21 12:54:55 +00:00
) ;
}
2020-04-17 15:09:57 +00:00
2021-10-28 12:32:55 +00:00
if exists & & needed . is_deletable ( ) {
info! ( " Resync block {:?}: offloading and deleting " , hash ) ;
2021-02-24 10:58:03 +00:00
2021-10-26 08:20:05 +00:00
let mut who = self . replication . write_nodes ( hash ) ;
2021-03-16 10:14:27 +00:00
if who . len ( ) < self . replication . write_quorum ( ) {
2021-04-23 19:57:32 +00:00
return Err ( Error ::Message ( " Not trying to offload block because we don't have a quorum of nodes to write to " . to_string ( ) ) ) ;
2021-03-11 18:06:27 +00:00
}
2021-02-24 10:58:03 +00:00
who . retain ( | id | * id ! = self . system . id ) ;
2021-10-14 09:50:12 +00:00
let msg = Arc ::new ( BlockRpc ::NeedBlockQuery ( * hash ) ) ;
2021-02-24 10:58:03 +00:00
let who_needs_fut = who . iter ( ) . map ( | to | {
2021-10-14 09:50:12 +00:00
self . system . rpc . call_arc (
& self . endpoint ,
* to ,
msg . clone ( ) ,
2021-10-26 17:13:41 +00:00
RequestStrategy ::with_priority ( PRIO_BACKGROUND )
2021-10-14 09:50:12 +00:00
. with_timeout ( NEED_BLOCK_QUERY_TIMEOUT ) ,
)
2021-02-24 10:58:03 +00:00
} ) ;
let who_needs_resps = join_all ( who_needs_fut ) . await ;
let mut need_nodes = vec! [ ] ;
for ( node , needed ) in who . iter ( ) . zip ( who_needs_resps . into_iter ( ) ) {
2021-10-26 17:13:41 +00:00
match needed . err_context ( " NeedBlockQuery RPC " ) ? {
2021-10-14 09:50:12 +00:00
BlockRpc ::NeedBlockReply ( needed ) = > {
2021-02-24 10:58:03 +00:00
if needed {
need_nodes . push ( * node ) ;
2020-04-17 17:16:08 +00:00
}
}
2021-02-24 10:58:03 +00:00
_ = > {
2021-04-23 19:57:32 +00:00
return Err ( Error ::Message (
" Unexpected response to NeedBlockQuery RPC " . to_string ( ) ,
) ) ;
2021-02-24 10:58:03 +00:00
}
2020-04-17 17:16:08 +00:00
}
2021-02-24 10:58:03 +00:00
}
2021-04-23 19:57:32 +00:00
if ! need_nodes . is_empty ( ) {
2021-03-05 14:09:18 +00:00
trace! (
" Block {:?} needed by {} nodes, sending " ,
hash ,
need_nodes . len ( )
) ;
2020-04-17 17:16:08 +00:00
2021-10-28 12:32:55 +00:00
let put_block_message = self . read_block ( hash ) . await ? ;
2021-10-14 09:50:12 +00:00
self . system
. rpc
2021-03-12 20:52:19 +00:00
. try_call_many (
2021-10-14 09:50:12 +00:00
& self . endpoint ,
2021-03-12 18:57:37 +00:00
& need_nodes [ .. ] ,
put_block_message ,
2021-10-26 17:13:41 +00:00
RequestStrategy ::with_priority ( PRIO_BACKGROUND )
2021-10-14 09:50:12 +00:00
. with_quorum ( need_nodes . len ( ) )
2021-03-12 20:52:19 +00:00
. with_timeout ( BLOCK_RW_TIMEOUT ) ,
)
2021-10-28 12:32:55 +00:00
. await
. err_context ( " PutBlock RPC " ) ? ;
2020-04-17 15:09:57 +00:00
}
2021-03-16 10:14:27 +00:00
info! (
2021-10-28 12:32:55 +00:00
" Deleting unneeded block {:?}, offload finished ({} / {}) " ,
2021-03-05 14:09:18 +00:00
hash ,
need_nodes . len ( ) ,
who . len ( )
) ;
2021-02-24 10:58:03 +00:00
2021-10-22 10:09:03 +00:00
self . mutation_lock
. lock ( )
. await
. delete_if_unneeded ( hash , self )
. await ? ;
2021-10-28 12:32:55 +00:00
self . clear_deleted_block_rc ( hash ) ? ;
2020-04-17 13:36:16 +00:00
}
2021-10-28 12:32:55 +00:00
if needed . is_nonzero ( ) & & ! exists {
info! (
" Resync block {:?}: fetching absent but needed block (refcount > 0) " ,
hash
) ;
2021-10-26 08:20:05 +00:00
let block_data = self . rpc_get_block ( hash ) . await ? ;
2020-04-17 13:36:16 +00:00
self . write_block ( hash , & block_data [ .. ] ) . await ? ;
}
Ok ( ( ) )
}
2020-04-18 17:21:34 +00:00
2021-10-28 12:32:55 +00:00
// ---- Utility: iteration on files in the data directory ----
2021-06-23 23:34:28 +00:00
async fn for_each_file < F , Fut , State > (
& self ,
state : State ,
mut f : F ,
must_exit : & watch ::Receiver < bool > ,
) -> Result < ( ) , Error >
where
F : FnMut ( State , Hash ) -> Fut + Send ,
Fut : Future < Output = Result < State , Error > > + Send ,
State : Send ,
{
self . for_each_file_rec ( & self . data_dir , state , & mut f , must_exit )
. await
. map ( | _ | ( ) )
}
fn for_each_file_rec < ' a , F , Fut , State > (
2021-03-05 14:09:18 +00:00
& ' a self ,
2021-04-23 19:57:32 +00:00
path : & ' a Path ,
2021-06-23 23:34:28 +00:00
mut state : State ,
f : & ' a mut F ,
2021-03-05 14:09:18 +00:00
must_exit : & ' a watch ::Receiver < bool > ,
2021-06-23 23:34:28 +00:00
) -> BoxFuture < ' a , Result < State , Error > >
where
F : FnMut ( State , Hash ) -> Fut + Send ,
Fut : Future < Output = Result < State , Error > > + Send ,
State : Send + ' a ,
{
2021-02-24 10:58:03 +00:00
async move {
let mut ls_data_dir = fs ::read_dir ( path ) . await ? ;
2021-06-23 23:34:28 +00:00
while let Some ( data_dir_ent ) = ls_data_dir . next_entry ( ) . await ? {
if * must_exit . borrow ( ) {
break ;
}
2021-02-24 10:58:03 +00:00
let name = data_dir_ent . file_name ( ) ;
2021-06-23 23:34:28 +00:00
let name = if let Ok ( n ) = name . into_string ( ) {
n
} else {
continue ;
2020-04-19 20:36:36 +00:00
} ;
2021-02-24 10:58:03 +00:00
let ent_type = data_dir_ent . file_type ( ) . await ? ;
if name . len ( ) = = 2 & & hex ::decode ( & name ) . is_ok ( ) & & ent_type . is_dir ( ) {
2021-06-23 23:34:28 +00:00
state = self
. for_each_file_rec ( & data_dir_ent . path ( ) , state , f , must_exit )
2021-03-05 14:09:18 +00:00
. await ? ;
2021-02-24 10:58:03 +00:00
} else if name . len ( ) = = 64 {
2021-06-23 23:34:28 +00:00
let hash_bytes = if let Ok ( h ) = hex ::decode ( & name ) {
h
} else {
continue ;
2021-02-24 10:58:03 +00:00
} ;
let mut hash = [ 0 u8 ; 32 ] ;
hash . copy_from_slice ( & hash_bytes [ .. ] ) ;
2021-06-23 23:34:28 +00:00
state = f ( state , hash . into ( ) ) . await ? ;
2020-04-19 20:36:36 +00:00
}
}
2021-06-23 23:34:28 +00:00
Ok ( state )
2021-03-05 14:09:18 +00:00
}
. boxed ( )
2020-04-19 20:36:36 +00:00
}
2020-04-17 13:36:16 +00:00
}
2021-10-14 09:50:12 +00:00
#[ async_trait ]
impl EndpointHandler < BlockRpc > for BlockManager {
2021-10-15 09:05:09 +00:00
async fn handle (
self : & Arc < Self > ,
message : & BlockRpc ,
_from : NodeID ,
) -> Result < BlockRpc , Error > {
match message {
BlockRpc ::PutBlock ( m ) = > self . write_block ( & m . hash , & m . data ) . await ,
BlockRpc ::GetBlock ( h ) = > self . read_block ( h ) . await ,
BlockRpc ::NeedBlockQuery ( h ) = > self . need_block ( h ) . await . map ( BlockRpc ::NeedBlockReply ) ,
_ = > Err ( Error ::BadRpc ( " Unexpected RPC message " . to_string ( ) ) ) ,
}
2021-10-14 09:50:12 +00:00
}
}
2021-10-22 10:09:03 +00:00
struct BlockStatus {
exists : bool ,
2021-10-28 12:32:55 +00:00
needed : RcEntry ,
2021-10-22 10:09:03 +00:00
}
impl BlockManagerLocked {
async fn check_block_status (
& self ,
hash : & Hash ,
mgr : & BlockManager ,
) -> Result < BlockStatus , Error > {
let path = mgr . block_path ( hash ) ;
let exists = fs ::metadata ( & path ) . await . is_ok ( ) ;
2021-10-28 12:32:55 +00:00
let needed = mgr . get_block_rc ( hash ) ? ;
2021-10-22 10:09:03 +00:00
Ok ( BlockStatus { exists , needed } )
}
async fn write_block (
& self ,
hash : & Hash ,
data : & [ u8 ] ,
mgr : & BlockManager ,
) -> Result < BlockRpc , Error > {
let mut path = mgr . block_dir ( hash ) ;
fs ::create_dir_all ( & path ) . await ? ;
path . push ( hex ::encode ( hash ) ) ;
if fs ::metadata ( & path ) . await . is_ok ( ) {
return Ok ( BlockRpc ::Ok ) ;
}
let mut path2 = path . clone ( ) ;
path2 . set_extension ( " tmp " ) ;
let mut f = fs ::File ::create ( & path2 ) . await ? ;
f . write_all ( data ) . await ? ;
drop ( f ) ;
fs ::rename ( path2 , path ) . await ? ;
Ok ( BlockRpc ::Ok )
}
async fn move_block_to_corrupted ( & self , hash : & Hash , mgr : & BlockManager ) -> Result < ( ) , Error > {
warn! (
" Block {:?} is corrupted. Renaming to .corrupted and resyncing. " ,
hash
) ;
let path = mgr . block_path ( hash ) ;
let mut path2 = path . clone ( ) ;
2021-06-23 23:34:28 +00:00
path2 . set_extension ( " corrupted " ) ;
2021-10-22 10:09:03 +00:00
fs ::rename ( path , path2 ) . await ? ;
Ok ( ( ) )
}
async fn delete_if_unneeded ( & self , hash : & Hash , mgr : & BlockManager ) -> Result < ( ) , Error > {
let BlockStatus { exists , needed } = self . check_block_status ( hash , mgr ) . await ? ;
2021-10-28 12:32:55 +00:00
if exists & & needed . is_deletable ( ) {
2021-10-22 10:09:03 +00:00
let path = mgr . block_path ( hash ) ;
fs ::remove_file ( path ) . await ? ;
}
Ok ( ( ) )
}
}
2021-10-28 12:32:55 +00:00
/// Describes the state of the reference counter for a block
#[ derive(Clone, Copy, Debug) ]
enum RcEntry {
/// Present: the block has `count` references, with `count` > 0.
///
/// This is stored as u64::to_be_bytes(count)
Present { count : u64 } ,
/// Deletable: the block has zero references, and can be deleted
/// once time (returned by now_msec) is larger than at_time
/// (in millis since Unix epoch)
///
/// This is stored as [0u8; 8] followed by u64::to_be_bytes(at_time),
/// (this allows for the data format to be backwards compatible with
/// previous Garage versions that didn't have this intermediate state)
Deletable { at_time : u64 } ,
/// Absent: the block has zero references, and can be deleted
/// immediately
Absent ,
}
impl RcEntry {
fn parse ( bytes : & [ u8 ] ) -> Self {
if bytes . len ( ) = = 8 {
RcEntry ::Present {
count : u64 ::from_be_bytes ( bytes . try_into ( ) . unwrap ( ) ) ,
}
} else if bytes . len ( ) = = 16 {
RcEntry ::Deletable {
at_time : u64 ::from_be_bytes ( bytes [ 8 .. 16 ] . try_into ( ) . unwrap ( ) ) ,
}
} else {
panic! ( " Invalid RC entry: {:?} , database is corrupted. This is an error Garage is currently unable to recover from. Sorry, and also please report a bug. " ,
bytes
)
}
}
fn parse_opt < V : AsRef < [ u8 ] > > ( bytes : Option < V > ) -> Self {
bytes
. map ( | b | Self ::parse ( b . as_ref ( ) ) )
. unwrap_or ( Self ::Absent )
}
fn serialize ( self ) -> Option < Vec < u8 > > {
match self {
RcEntry ::Present { count } = > Some ( u64 ::to_be_bytes ( count ) . to_vec ( ) ) ,
RcEntry ::Deletable { at_time } = > {
Some ( [ u64 ::to_be_bytes ( 0 ) , u64 ::to_be_bytes ( at_time ) ] . concat ( ) )
}
RcEntry ::Absent = > None ,
}
}
fn increment ( self ) -> Self {
let old_count = match self {
RcEntry ::Present { count } = > count ,
_ = > 0 ,
} ;
RcEntry ::Present {
count : old_count + 1 ,
}
}
fn decrement ( self ) -> Self {
match self {
RcEntry ::Present { count } = > {
if count > 1 {
RcEntry ::Present { count : count - 1 }
} else {
RcEntry ::Deletable {
at_time : now_msec ( ) + BLOCK_GC_DELAY . as_millis ( ) as u64 ,
}
}
}
del = > del ,
}
}
fn is_zero ( & self ) -> bool {
matches! ( self , RcEntry ::Deletable { .. } | RcEntry ::Absent )
}
fn is_nonzero ( & self ) -> bool {
! self . is_zero ( )
}
fn is_deletable ( & self ) -> bool {
match self {
RcEntry ::Present { .. } = > false ,
RcEntry ::Deletable { at_time } = > now_msec ( ) > * at_time ,
RcEntry ::Absent = > true ,
}
}
fn is_needed ( & self ) -> bool {
! self . is_deletable ( )
}
}