2021-03-11 17:28:03 +00:00
use std ::collections ::VecDeque ;
2022-07-08 11:30:26 +00:00
use std ::sync ::Arc ;
2021-03-11 17:28:03 +00:00
use std ::time ::{ Duration , Instant } ;
2022-12-14 11:28:07 +00:00
use arc_swap ::ArcSwapOption ;
2021-10-14 09:50:12 +00:00
use async_trait ::async_trait ;
2021-03-11 17:28:03 +00:00
use futures_util ::stream ::* ;
2022-02-16 13:23:04 +00:00
use opentelemetry ::KeyValue ;
2021-03-11 17:28:03 +00:00
use rand ::Rng ;
use serde ::{ Deserialize , Serialize } ;
use serde_bytes ::ByteBuf ;
2022-07-08 11:30:26 +00:00
use tokio ::select ;
2021-03-11 17:28:03 +00:00
use tokio ::sync ::{ mpsc , watch } ;
2022-07-08 11:30:26 +00:00
use garage_util ::background ::* ;
2021-03-11 17:28:03 +00:00
use garage_util ::data ::* ;
2022-12-14 11:28:07 +00:00
use garage_util ::error ::{ Error , OkOrMessage } ;
2021-03-11 17:28:03 +00:00
2021-03-16 11:18:03 +00:00
use garage_rpc ::ring ::* ;
2021-10-14 09:50:12 +00:00
use garage_rpc ::system ::System ;
use garage_rpc ::* ;
2021-03-12 14:05:26 +00:00
2021-03-11 17:28:03 +00:00
use crate ::data ::* ;
use crate ::merkle ::* ;
use crate ::replication ::* ;
use crate ::* ;
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL : Duration = Duration ::from_secs ( 10 * 60 ) ;
2021-10-14 09:50:12 +00:00
pub struct TableSyncer < F : TableSchema + 'static , R : TableReplication + 'static > {
2021-03-16 10:43:58 +00:00
system : Arc < System > ,
data : Arc < TableData < F , R > > ,
merkle : Arc < MerkleUpdater < F , R > > ,
2021-03-11 17:28:03 +00:00
2022-12-14 11:28:07 +00:00
add_full_sync_tx : ArcSwapOption < mpsc ::UnboundedSender < ( ) > > ,
2021-10-14 09:50:12 +00:00
endpoint : Arc < Endpoint < SyncRpc , Self > > ,
2021-03-11 17:28:03 +00:00
}
#[ derive(Serialize, Deserialize) ]
2021-05-02 21:13:08 +00:00
pub ( crate ) enum SyncRpc {
2021-03-16 11:18:03 +00:00
RootCkHash ( Partition , Hash ) ,
RootCkDifferent ( bool ) ,
2021-03-11 17:28:03 +00:00
GetNode ( MerkleNodeKey ) ,
Node ( MerkleNodeKey , MerkleNode ) ,
2021-03-12 14:05:26 +00:00
Items ( Vec < Arc < ByteBuf > > ) ,
Ok ,
2021-03-11 17:28:03 +00:00
}
2021-10-15 09:05:09 +00:00
impl Rpc for SyncRpc {
type Response = Result < SyncRpc , Error > ;
2021-10-14 09:50:12 +00:00
}
2021-03-12 14:05:26 +00:00
2021-03-11 17:28:03 +00:00
#[ derive(Debug, Clone) ]
struct TodoPartition {
2021-03-16 11:18:03 +00:00
partition : Partition ,
begin : Hash ,
end : Hash ,
2021-03-11 17:28:03 +00:00
// Are we a node that stores this partition or not?
retain : bool ,
}
impl < F , R > TableSyncer < F , R >
where
F : TableSchema + 'static ,
R : TableReplication + 'static ,
{
2022-12-14 11:28:07 +00:00
pub ( crate ) fn new (
2021-03-16 10:43:58 +00:00
system : Arc < System > ,
data : Arc < TableData < F , R > > ,
merkle : Arc < MerkleUpdater < F , R > > ,
2021-03-12 14:05:26 +00:00
) -> Arc < Self > {
2021-10-14 09:50:12 +00:00
let endpoint = system
. netapp
2021-12-14 11:34:01 +00:00
. endpoint ( format! ( " garage_table/sync.rs/Rpc: {} " , F ::TABLE_NAME ) ) ;
2021-03-12 14:05:26 +00:00
2021-03-11 17:28:03 +00:00
let syncer = Arc ::new ( Self {
2022-12-14 11:28:07 +00:00
system ,
2021-12-14 11:34:01 +00:00
data ,
2021-03-16 10:43:58 +00:00
merkle ,
2022-12-14 11:28:07 +00:00
add_full_sync_tx : ArcSwapOption ::new ( None ) ,
2021-10-14 09:50:12 +00:00
endpoint ,
2021-03-11 17:28:03 +00:00
} ) ;
2021-10-14 09:50:12 +00:00
syncer . endpoint . set_handler ( syncer . clone ( ) ) ;
2021-03-12 14:05:26 +00:00
2022-12-14 11:28:07 +00:00
syncer
}
2022-12-14 11:51:16 +00:00
pub ( crate ) fn spawn_workers ( self : & Arc < Self > , bg : & BackgroundRunner ) {
2022-12-14 11:28:07 +00:00
let ( add_full_sync_tx , add_full_sync_rx ) = mpsc ::unbounded_channel ( ) ;
self . add_full_sync_tx
. store ( Some ( Arc ::new ( add_full_sync_tx ) ) ) ;
2022-12-14 11:51:16 +00:00
bg . spawn_worker ( SyncWorker {
2022-12-14 11:28:07 +00:00
syncer : self . clone ( ) ,
ring_recv : self . system . ring . clone ( ) ,
ring : self . system . ring . borrow ( ) . clone ( ) ,
2022-07-08 11:30:26 +00:00
add_full_sync_rx ,
todo : vec ! [ ] ,
next_full_sync : Instant ::now ( ) + Duration ::from_secs ( 20 ) ,
2021-03-11 17:28:03 +00:00
} ) ;
}
2022-12-14 11:28:07 +00:00
pub fn add_full_sync ( & self ) -> Result < ( ) , Error > {
let tx = self . add_full_sync_tx . load ( ) ;
let tx = tx
. as_ref ( )
. ok_or_message ( " table sync worker is not running " ) ? ;
tx . send ( ( ) ) . ok_or_message ( " send error " ) ? ;
Ok ( ( ) )
2021-03-11 17:28:03 +00:00
}
2022-07-08 11:30:26 +00:00
// ----
2021-03-11 17:28:03 +00:00
async fn sync_partition (
2022-07-08 11:30:26 +00:00
self : & Arc < Self > ,
2021-03-11 17:28:03 +00:00
partition : & TodoPartition ,
must_exit : & mut watch ::Receiver < bool > ,
) -> Result < ( ) , Error > {
if partition . retain {
2021-03-16 10:43:58 +00:00
let my_id = self . system . id ;
2021-03-11 17:28:03 +00:00
let nodes = self
2021-03-16 10:43:58 +00:00
. data
2021-03-11 17:28:03 +00:00
. replication
2021-03-16 11:18:03 +00:00
. write_nodes ( & partition . begin )
2021-03-11 17:28:03 +00:00
. into_iter ( )
. filter ( | node | * node ! = my_id )
. collect ::< Vec < _ > > ( ) ;
debug! (
" ({}) Syncing {:?} with {:?}... " ,
2021-12-14 11:34:01 +00:00
F ::TABLE_NAME ,
partition ,
nodes
2021-03-11 17:28:03 +00:00
) ;
let mut sync_futures = nodes
. iter ( )
. map ( | node | {
self . clone ( )
. do_sync_with ( partition . clone ( ) , * node , must_exit . clone ( ) )
} )
. collect ::< FuturesUnordered < _ > > ( ) ;
let mut n_errors = 0 ;
while let Some ( r ) = sync_futures . next ( ) . await {
if let Err ( e ) = r {
n_errors + = 1 ;
2021-12-14 11:34:01 +00:00
warn! ( " ({}) Sync error: {} " , F ::TABLE_NAME , e ) ;
2021-03-11 17:28:03 +00:00
}
}
2021-03-16 10:43:58 +00:00
if n_errors > self . data . replication . max_write_errors ( ) {
2021-03-11 17:28:03 +00:00
return Err ( Error ::Message ( format! (
" Sync failed with too many nodes (should have been: {:?}). " ,
nodes
) ) ) ;
}
} else {
2021-03-16 14:58:40 +00:00
self . offload_partition ( & partition . begin , & partition . end , must_exit )
. await ? ;
2021-03-11 17:28:03 +00:00
}
Ok ( ( ) )
}
// Offload partition: this partition is not something we are storing,
// so send it out to all other nodes that store it and delete items locally.
// We don't bother checking if the remote nodes already have the items,
// we just batch-send everything. Offloading isn't supposed to happen very often.
// If any of the nodes that are supposed to store the items is unable to
// save them, we interrupt the process.
async fn offload_partition (
self : & Arc < Self > ,
begin : & Hash ,
end : & Hash ,
must_exit : & mut watch ::Receiver < bool > ,
) -> Result < ( ) , Error > {
let mut counter : usize = 0 ;
while ! * must_exit . borrow ( ) {
let mut items = Vec ::new ( ) ;
2022-06-08 08:01:44 +00:00
for item in self . data . store . range ( begin . to_vec ( ) .. end . to_vec ( ) ) ? {
2021-03-11 17:28:03 +00:00
let ( key , value ) = item ? ;
2022-06-08 08:01:44 +00:00
items . push ( ( key . to_vec ( ) , Arc ::new ( ByteBuf ::from ( value ) ) ) ) ;
2021-03-11 17:28:03 +00:00
if items . len ( ) > = 1024 {
break ;
}
}
2021-04-23 19:42:52 +00:00
if ! items . is_empty ( ) {
2021-03-11 17:28:03 +00:00
let nodes = self
2021-03-16 10:43:58 +00:00
. data
2021-03-11 17:28:03 +00:00
. replication
2021-10-26 08:20:05 +00:00
. write_nodes ( begin )
2021-03-11 17:28:03 +00:00
. into_iter ( )
. collect ::< Vec < _ > > ( ) ;
2021-03-16 10:43:58 +00:00
if nodes . contains ( & self . system . id ) {
2021-03-12 14:05:26 +00:00
warn! (
" ({}) Interrupting offload as partitions seem to have changed " ,
2021-12-14 11:34:01 +00:00
F ::TABLE_NAME
2021-03-12 14:05:26 +00:00
) ;
2021-03-11 17:28:03 +00:00
break ;
}
2021-03-16 10:43:58 +00:00
if nodes . len ( ) < self . data . replication . write_quorum ( ) {
2021-04-23 19:42:52 +00:00
return Err ( Error ::Message (
2021-03-12 14:05:26 +00:00
" Not offloading as we don't have a quorum of nodes to write to. "
2021-04-23 19:42:52 +00:00
. to_string ( ) ,
) ) ;
2021-03-11 18:06:27 +00:00
}
2021-03-11 17:28:03 +00:00
counter + = 1 ;
2021-03-12 13:37:46 +00:00
info! (
" ({}) Offloading {} items from {:?}..{:?} ({}) " ,
2021-12-14 11:34:01 +00:00
F ::TABLE_NAME ,
2021-03-11 17:28:03 +00:00
items . len ( ) ,
begin ,
end ,
counter
) ;
self . offload_items ( & items , & nodes [ .. ] ) . await ? ;
} else {
break ;
}
}
Ok ( ( ) )
}
async fn offload_items (
self : & Arc < Self > ,
2021-04-23 19:42:52 +00:00
items : & [ ( Vec < u8 > , Arc < ByteBuf > ) ] ,
2021-10-15 09:05:09 +00:00
nodes : & [ Uuid ] ,
2021-03-11 17:28:03 +00:00
) -> Result < ( ) , Error > {
let values = items . iter ( ) . map ( | ( _k , v ) | v . clone ( ) ) . collect ::< Vec < _ > > ( ) ;
2021-03-12 18:57:37 +00:00
2022-02-16 13:23:04 +00:00
for to in nodes . iter ( ) {
self . data . metrics . sync_items_sent . add (
values . len ( ) as u64 ,
& [
KeyValue ::new ( " table_name " , F ::TABLE_NAME ) ,
KeyValue ::new ( " to " , format! ( " {:?} " , to ) ) ,
] ,
) ;
}
2021-10-14 09:50:12 +00:00
self . system
. rpc
2021-03-12 20:52:19 +00:00
. try_call_many (
2021-10-14 09:50:12 +00:00
& self . endpoint ,
2021-04-23 19:42:52 +00:00
nodes ,
2021-05-02 21:13:08 +00:00
SyncRpc ::Items ( values ) ,
2022-09-19 18:12:19 +00:00
RequestStrategy ::with_priority ( PRIO_BACKGROUND ) . with_quorum ( nodes . len ( ) ) ,
2021-03-12 20:52:19 +00:00
)
. await ? ;
2021-03-11 17:28:03 +00:00
// All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0 ;
for ( k , v ) in items . iter ( ) {
if ! self . data . delete_if_equal ( & k [ .. ] , & v [ .. ] ) ? {
not_removed + = 1 ;
}
}
if not_removed > 0 {
2021-12-14 11:34:01 +00:00
debug! ( " ({}) {} items not removed during offload because they changed in between (trying again...) " , F ::TABLE_NAME , not_removed ) ;
2021-03-11 17:28:03 +00:00
}
Ok ( ( ) )
}
// ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ======
2021-03-11 17:45:26 +00:00
// The driver side is only concerned with sending out the item it has
// and the other side might not have. Receiving items that differ from one
// side to the other will happen when the other side syncs with us,
// which they also do regularly.
2021-03-11 17:28:03 +00:00
2021-03-16 11:18:03 +00:00
fn get_root_ck ( & self , partition : Partition ) -> Result < ( MerkleNodeKey , MerkleNode ) , Error > {
let key = MerkleNodeKey {
partition ,
prefix : vec ! [ ] ,
2021-03-11 17:28:03 +00:00
} ;
2021-03-16 11:18:03 +00:00
let node = self . merkle . read_node ( & key ) ? ;
Ok ( ( key , node ) )
2021-03-11 17:28:03 +00:00
}
async fn do_sync_with (
self : Arc < Self > ,
partition : TodoPartition ,
2021-10-15 09:05:09 +00:00
who : Uuid ,
2021-03-11 17:28:03 +00:00
must_exit : watch ::Receiver < bool > ,
) -> Result < ( ) , Error > {
2021-03-16 11:18:03 +00:00
let ( root_ck_key , root_ck ) = self . get_root_ck ( partition . partition ) ? ;
2021-03-11 18:30:24 +00:00
if root_ck . is_empty ( ) {
debug! (
" ({}) Sync {:?} with {:?}: partition is empty. " ,
2021-12-14 11:34:01 +00:00
F ::TABLE_NAME ,
partition ,
who
2021-03-11 18:30:24 +00:00
) ;
2021-03-12 14:05:26 +00:00
return Ok ( ( ) ) ;
2021-03-11 18:30:24 +00:00
}
2021-03-16 11:18:03 +00:00
let root_ck_hash = hash_of ::< MerkleNode > ( & root_ck ) ? ;
2021-03-11 18:30:24 +00:00
2021-03-16 11:18:03 +00:00
// Check if they have the same root checksum
// If so, do nothing.
2021-03-11 17:28:03 +00:00
let root_resp = self
2021-10-14 09:50:12 +00:00
. system
. rpc
2021-03-11 17:28:03 +00:00
. call (
2021-10-14 09:50:12 +00:00
& self . endpoint ,
2021-03-11 17:28:03 +00:00
who ,
2021-05-02 21:13:08 +00:00
SyncRpc ::RootCkHash ( partition . partition , root_ck_hash ) ,
2022-09-19 18:12:19 +00:00
RequestStrategy ::with_priority ( PRIO_BACKGROUND ) ,
2021-03-11 17:28:03 +00:00
)
. await ? ;
let mut todo = match root_resp {
2021-05-02 21:13:08 +00:00
SyncRpc ::RootCkDifferent ( false ) = > {
2021-03-11 17:28:03 +00:00
debug! (
" ({}) Sync {:?} with {:?}: no difference " ,
2021-12-14 11:34:01 +00:00
F ::TABLE_NAME ,
partition ,
who
2021-03-11 17:28:03 +00:00
) ;
return Ok ( ( ) ) ;
}
2021-05-02 21:13:08 +00:00
SyncRpc ::RootCkDifferent ( true ) = > VecDeque ::from ( vec! [ root_ck_key ] ) ,
2021-03-11 17:28:03 +00:00
x = > {
return Err ( Error ::Message ( format! (
" Invalid respone to RootCkHash RPC: {} " ,
debug_serialize ( x )
) ) ) ;
}
} ;
let mut todo_items = vec! [ ] ;
while ! todo . is_empty ( ) & & ! * must_exit . borrow ( ) {
let key = todo . pop_front ( ) . unwrap ( ) ;
2021-03-16 10:43:58 +00:00
let node = self . merkle . read_node ( & key ) ? ;
2021-03-11 17:28:03 +00:00
match node {
MerkleNode ::Empty = > {
// They have items we don't have.
// We don't request those items from them, they will send them.
// We only bother with pushing items that differ
}
2021-03-11 17:50:32 +00:00
MerkleNode ::Leaf ( ik , ivhash ) = > {
2021-03-11 17:28:03 +00:00
// Just send that item directly
2021-03-11 17:50:32 +00:00
if let Some ( val ) = self . data . store . get ( & ik [ .. ] ) ? {
if blake2sum ( & val [ .. ] ) ! = ivhash {
2022-09-20 09:49:48 +00:00
debug! ( " ({}) Hashes differ between stored value and Merkle tree, key: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough) " , F ::TABLE_NAME , hex ::encode ( ik ) ) ;
2021-03-11 17:50:32 +00:00
}
2021-03-11 17:28:03 +00:00
todo_items . push ( val . to_vec ( ) ) ;
2021-03-11 17:55:17 +00:00
} else {
2022-09-20 09:49:48 +00:00
debug! ( " ({}) Item from Merkle tree not found in store: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough) " , F ::TABLE_NAME , hex ::encode ( ik ) ) ;
2021-03-11 17:28:03 +00:00
}
}
MerkleNode ::Intermediate ( l ) = > {
2021-03-11 17:55:17 +00:00
// Get Merkle node for this tree position at remote node
// and compare it with local node
2021-03-11 17:28:03 +00:00
let remote_node = match self
2021-10-14 09:50:12 +00:00
. system
. rpc
. call (
& self . endpoint ,
who ,
SyncRpc ::GetNode ( key . clone ( ) ) ,
2022-09-19 18:12:19 +00:00
RequestStrategy ::with_priority ( PRIO_BACKGROUND ) ,
2021-10-14 09:50:12 +00:00
)
2021-03-11 17:28:03 +00:00
. await ?
{
2021-05-02 21:13:08 +00:00
SyncRpc ::Node ( _ , node ) = > node ,
2021-03-11 17:28:03 +00:00
x = > {
return Err ( Error ::Message ( format! (
" Invalid respone to GetNode RPC: {} " ,
debug_serialize ( x )
) ) ) ;
}
} ;
let int_l2 = match remote_node {
2021-03-11 17:55:17 +00:00
// If they have an intermediate node at this tree position,
// we can compare them to find differences
2021-03-11 17:28:03 +00:00
MerkleNode ::Intermediate ( l2 ) = > l2 ,
2021-03-11 17:55:17 +00:00
// Otherwise, treat it as if they have nothing for this subtree,
// which will have the consequence of sending them everything
2021-03-11 17:28:03 +00:00
_ = > vec! [ ] ,
} ;
let join = join_ordered ( & l [ .. ] , & int_l2 [ .. ] ) ;
for ( p , v1 , v2 ) in join . into_iter ( ) {
let diff = match ( v1 , v2 ) {
( Some ( _ ) , None ) | ( None , Some ( _ ) ) = > true ,
( Some ( a ) , Some ( b ) ) = > a ! = b ,
_ = > false ,
} ;
if diff {
todo . push_back ( key . add_byte ( * p ) ) ;
}
}
}
}
if todo_items . len ( ) > = 256 {
2021-04-23 19:42:52 +00:00
self . send_items ( who , std ::mem ::take ( & mut todo_items ) )
2021-03-11 17:28:03 +00:00
. await ? ;
}
}
if ! todo_items . is_empty ( ) {
self . send_items ( who , todo_items ) . await ? ;
}
Ok ( ( ) )
}
2021-10-15 09:05:09 +00:00
async fn send_items ( & self , who : Uuid , item_value_list : Vec < Vec < u8 > > ) -> Result < ( ) , Error > {
2021-03-11 17:28:03 +00:00
info! (
" ({}) Sending {} items to {:?} " ,
2021-12-14 11:34:01 +00:00
F ::TABLE_NAME ,
2021-03-11 17:55:17 +00:00
item_value_list . len ( ) ,
2021-03-11 17:28:03 +00:00
who
) ;
2021-03-12 14:05:26 +00:00
let values = item_value_list
. into_iter ( )
2021-03-11 17:55:17 +00:00
. map ( | x | Arc ::new ( ByteBuf ::from ( x ) ) )
. collect ::< Vec < _ > > ( ) ;
2022-02-16 13:23:04 +00:00
self . data . metrics . sync_items_sent . add (
values . len ( ) as u64 ,
& [
KeyValue ::new ( " table_name " , F ::TABLE_NAME ) ,
KeyValue ::new ( " to " , format! ( " {:?} " , who ) ) ,
] ,
) ;
2021-03-11 17:28:03 +00:00
let rpc_resp = self
2021-10-14 09:50:12 +00:00
. system
. rpc
. call (
& self . endpoint ,
who ,
SyncRpc ::Items ( values ) ,
2022-09-19 18:12:19 +00:00
RequestStrategy ::with_priority ( PRIO_BACKGROUND ) ,
2021-10-14 09:50:12 +00:00
)
2021-03-11 17:28:03 +00:00
. await ? ;
2021-05-02 21:13:08 +00:00
if let SyncRpc ::Ok = rpc_resp {
2021-03-11 17:28:03 +00:00
Ok ( ( ) )
} else {
2022-01-03 12:58:05 +00:00
Err ( Error ::unexpected_rpc_message ( rpc_resp ) )
2021-03-11 17:28:03 +00:00
}
}
2021-10-15 09:05:09 +00:00
}
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
2021-03-11 17:28:03 +00:00
2021-10-15 09:05:09 +00:00
#[ async_trait ]
impl < F , R > EndpointHandler < SyncRpc > for TableSyncer < F , R >
where
F : TableSchema + 'static ,
R : TableReplication + 'static ,
{
2022-02-16 13:23:04 +00:00
async fn handle ( self : & Arc < Self > , message : & SyncRpc , from : NodeID ) -> Result < SyncRpc , Error > {
2021-03-11 17:28:03 +00:00
match message {
2021-05-02 21:13:08 +00:00
SyncRpc ::RootCkHash ( range , h ) = > {
2021-03-16 11:18:03 +00:00
let ( _root_ck_key , root_ck ) = self . get_root_ck ( * range ) ? ;
let hash = hash_of ::< MerkleNode > ( & root_ck ) ? ;
2021-05-02 21:13:08 +00:00
Ok ( SyncRpc ::RootCkDifferent ( hash ! = * h ) )
2021-03-11 17:28:03 +00:00
}
2021-05-02 21:13:08 +00:00
SyncRpc ::GetNode ( k ) = > {
2021-10-26 08:20:05 +00:00
let node = self . merkle . read_node ( k ) ? ;
2021-05-02 21:13:08 +00:00
Ok ( SyncRpc ::Node ( k . clone ( ) , node ) )
2021-03-11 17:28:03 +00:00
}
2021-05-02 21:13:08 +00:00
SyncRpc ::Items ( items ) = > {
2022-02-16 13:23:04 +00:00
self . data . metrics . sync_items_received . add (
items . len ( ) as u64 ,
& [
KeyValue ::new ( " table_name " , F ::TABLE_NAME ) ,
KeyValue ::new (
" from " ,
format! ( " {:?} " , Uuid ::try_from ( from . as_ref ( ) ) . unwrap ( ) ) ,
) ,
] ,
) ;
2021-03-12 14:05:26 +00:00
self . data . update_many ( items ) ? ;
2021-05-02 21:13:08 +00:00
Ok ( SyncRpc ::Ok )
2021-03-12 14:05:26 +00:00
}
2022-01-03 12:58:05 +00:00
m = > Err ( Error ::unexpected_rpc_message ( m ) ) ,
2021-03-11 17:28:03 +00:00
}
}
}
2022-07-08 11:30:26 +00:00
// -------- Sync Worker ---------
struct SyncWorker < F : TableSchema + 'static , R : TableReplication + 'static > {
syncer : Arc < TableSyncer < F , R > > ,
ring_recv : watch ::Receiver < Arc < Ring > > ,
ring : Arc < Ring > ,
add_full_sync_rx : mpsc ::UnboundedReceiver < ( ) > ,
todo : Vec < TodoPartition > ,
next_full_sync : Instant ,
}
impl < F : TableSchema + 'static , R : TableReplication + 'static > SyncWorker < F , R > {
fn add_full_sync ( & mut self ) {
let system = & self . syncer . system ;
let data = & self . syncer . data ;
2021-03-16 10:43:58 +00:00
let my_id = system . id ;
2021-03-11 17:28:03 +00:00
self . todo . clear ( ) ;
2021-03-16 11:18:03 +00:00
let partitions = data . replication . partitions ( ) ;
2021-03-11 17:28:03 +00:00
2021-03-16 11:18:03 +00:00
for i in 0 .. partitions . len ( ) {
let begin = partitions [ i ] . 1 ;
2021-03-11 17:28:03 +00:00
2021-03-16 11:18:03 +00:00
let end = if i + 1 < partitions . len ( ) {
2021-03-16 14:58:40 +00:00
partitions [ i + 1 ] . 1
2021-03-11 17:28:03 +00:00
} else {
2021-03-16 11:18:03 +00:00
[ 0xFF u8 ; 32 ] . into ( )
2021-03-11 17:28:03 +00:00
} ;
2021-03-16 11:18:03 +00:00
let nodes = data . replication . write_nodes ( & begin ) ;
2021-03-11 17:28:03 +00:00
let retain = nodes . contains ( & my_id ) ;
if ! retain {
// Check if we have some data to send, otherwise skip
2022-06-08 08:01:44 +00:00
match data . store . range ( begin .. end ) {
Ok ( mut iter ) = > {
if iter . next ( ) . is_none ( ) {
continue ;
}
}
Err ( e ) = > {
warn! ( " DB error in add_full_sync: {} " , e ) ;
continue ;
}
2021-03-11 17:28:03 +00:00
}
}
self . todo . push ( TodoPartition {
2021-03-16 11:18:03 +00:00
partition : partitions [ i ] . 0 ,
begin ,
end ,
2021-03-11 17:28:03 +00:00
retain ,
} ) ;
}
2022-07-08 11:30:26 +00:00
self . next_full_sync = Instant ::now ( ) + ANTI_ENTROPY_INTERVAL ;
2021-03-11 17:28:03 +00:00
}
fn pop_task ( & mut self ) -> Option < TodoPartition > {
if self . todo . is_empty ( ) {
return None ;
}
2021-03-16 14:58:40 +00:00
let i = rand ::thread_rng ( ) . gen_range ( 0 .. self . todo . len ( ) ) ;
2021-03-11 17:28:03 +00:00
if i = = self . todo . len ( ) - 1 {
self . todo . pop ( )
} else {
let replacement = self . todo . pop ( ) . unwrap ( ) ;
let ret = std ::mem ::replace ( & mut self . todo [ i ] , replacement ) ;
Some ( ret )
}
}
}
2022-07-08 11:30:26 +00:00
#[ async_trait ]
impl < F : TableSchema + 'static , R : TableReplication + 'static > Worker for SyncWorker < F , R > {
fn name ( & self ) -> String {
format! ( " {} sync " , F ::TABLE_NAME )
}
2022-12-12 16:16:49 +00:00
fn status ( & self ) -> WorkerStatus {
WorkerStatus {
queue_length : Some ( self . todo . len ( ) as u64 ) ,
.. Default ::default ( )
2022-07-08 11:30:26 +00:00
}
}
async fn work ( & mut self , must_exit : & mut watch ::Receiver < bool > ) -> Result < WorkerState , Error > {
if let Some ( partition ) = self . pop_task ( ) {
self . syncer . sync_partition ( & partition , must_exit ) . await ? ;
Ok ( WorkerState ::Busy )
} else {
Ok ( WorkerState ::Idle )
}
}
2022-12-14 14:25:29 +00:00
async fn wait_for_work ( & mut self ) -> WorkerState {
2022-07-08 11:30:26 +00:00
select! {
s = self . add_full_sync_rx . recv ( ) = > {
if let Some ( ( ) ) = s {
self . add_full_sync ( ) ;
}
} ,
_ = self . ring_recv . changed ( ) = > {
let new_ring = self . ring_recv . borrow ( ) ;
if ! Arc ::ptr_eq ( & new_ring , & self . ring ) {
self . ring = new_ring . clone ( ) ;
drop ( new_ring ) ;
debug! ( " ({}) Ring changed, adding full sync to syncer todo list " , F ::TABLE_NAME ) ;
self . add_full_sync ( ) ;
}
} ,
2022-09-29 13:53:54 +00:00
_ = tokio ::time ::sleep_until ( self . next_full_sync . into ( ) ) = > {
2022-07-08 11:30:26 +00:00
self . add_full_sync ( ) ;
}
}
match self . todo . is_empty ( ) {
false = > WorkerState ::Busy ,
true = > WorkerState ::Idle ,
}
}
}
// ---- UTIL ----
2021-03-11 17:28:03 +00:00
fn hash_of < T : Serialize > ( x : & T ) -> Result < Hash , Error > {
Ok ( blake2sum ( & rmp_to_vec_all_named ( x ) ? [ .. ] ) )
}
fn join_ordered < ' a , K : Ord + Eq , V1 , V2 > (
x : & ' a [ ( K , V1 ) ] ,
y : & ' a [ ( K , V2 ) ] ,
) -> Vec < ( & ' a K , Option < & ' a V1 > , Option < & ' a V2 > ) > {
let mut ret = vec! [ ] ;
let mut i = 0 ;
let mut j = 0 ;
while i < x . len ( ) | | j < y . len ( ) {
if i < x . len ( ) & & j < y . len ( ) & & x [ i ] . 0 = = y [ j ] . 0 {
ret . push ( ( & x [ i ] . 0 , Some ( & x [ i ] . 1 ) , Some ( & y [ j ] . 1 ) ) ) ;
i + = 1 ;
j + = 1 ;
} else if i < x . len ( ) & & ( j = = y . len ( ) | | x [ i ] . 0 < y [ j ] . 0 ) {
ret . push ( ( & x [ i ] . 0 , Some ( & x [ i ] . 1 ) , None ) ) ;
i + = 1 ;
} else if j < y . len ( ) & & ( i = = x . len ( ) | | x [ i ] . 0 > y [ j ] . 0 ) {
2021-03-11 18:30:24 +00:00
ret . push ( ( & y [ j ] . 0 , None , Some ( & y [ j ] . 1 ) ) ) ;
2021-03-11 17:28:03 +00:00
j + = 1 ;
} else {
unreachable! ( ) ;
}
}
ret
}