2021-03-11 18:28:03 +01:00
use std ::collections ::VecDeque ;
use std ::convert ::TryInto ;
use std ::sync ::{ Arc , Mutex } ;
use std ::time ::{ Duration , Instant } ;
use futures ::future ::join_all ;
use futures ::{ pin_mut , select } ;
use futures_util ::future ::* ;
use futures_util ::stream ::* ;
use rand ::Rng ;
use serde ::{ Deserialize , Serialize } ;
use serde_bytes ::ByteBuf ;
use tokio ::sync ::{ mpsc , watch } ;
use garage_util ::data ::* ;
use garage_util ::error ::Error ;
2021-03-12 15:05:26 +01:00
use garage_rpc ::ring ::Ring ;
use garage_rpc ::rpc_client ::* ;
use garage_rpc ::rpc_server ::* ;
2021-03-11 18:28:03 +01:00
use crate ::data ::* ;
use crate ::merkle ::* ;
use crate ::replication ::* ;
use crate ::* ;
const TABLE_SYNC_RPC_TIMEOUT : Duration = Duration ::from_secs ( 30 ) ;
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL : Duration = Duration ::from_secs ( 10 * 60 ) ;
pub struct TableSyncer < F : TableSchema , R : TableReplication > {
data : Arc < TableData < F > > ,
2021-03-12 15:07:23 +01:00
aux : Arc < TableAux < R > > ,
2021-03-11 18:28:03 +01:00
todo : Mutex < SyncTodo > ,
2021-03-12 15:05:26 +01:00
rpc_client : Arc < RpcClient < SyncRPC > > ,
2021-03-11 18:28:03 +01:00
}
type RootCk = Vec < ( MerklePartition , Hash ) > ;
#[ derive(Debug, Clone, Copy, Serialize, Deserialize) ]
pub struct PartitionRange {
begin : MerklePartition ,
// if end is None, go all the way to partition 0xFFFF included
end : Option < MerklePartition > ,
}
#[ derive(Serialize, Deserialize) ]
pub ( crate ) enum SyncRPC {
RootCkHash ( PartitionRange , Hash ) ,
RootCkList ( PartitionRange , RootCk ) ,
CkNoDifference ,
GetNode ( MerkleNodeKey ) ,
Node ( MerkleNodeKey , MerkleNode ) ,
2021-03-12 15:05:26 +01:00
Items ( Vec < Arc < ByteBuf > > ) ,
Ok ,
2021-03-11 18:28:03 +01:00
}
2021-03-12 15:05:26 +01:00
impl RpcMessage for SyncRPC { }
2021-03-11 18:28:03 +01:00
struct SyncTodo {
todo : Vec < TodoPartition > ,
}
#[ derive(Debug, Clone) ]
struct TodoPartition {
range : PartitionRange ,
// Are we a node that stores this partition or not?
retain : bool ,
}
impl < F , R > TableSyncer < F , R >
where
F : TableSchema + 'static ,
R : TableReplication + 'static ,
{
2021-03-12 15:05:26 +01:00
pub ( crate ) fn launch (
data : Arc < TableData < F > > ,
2021-03-12 15:07:23 +01:00
aux : Arc < TableAux < R > > ,
2021-03-12 15:05:26 +01:00
rpc_server : & mut RpcServer ,
) -> Arc < Self > {
let rpc_path = format! ( " table_ {} /sync " , data . name ) ;
let rpc_client = aux . system . rpc_client ::< SyncRPC > ( & rpc_path ) ;
2021-03-11 18:28:03 +01:00
let todo = SyncTodo { todo : vec ! [ ] } ;
let syncer = Arc ::new ( Self {
data : data . clone ( ) ,
aux : aux . clone ( ) ,
todo : Mutex ::new ( todo ) ,
2021-03-12 15:05:26 +01:00
rpc_client ,
2021-03-11 18:28:03 +01:00
} ) ;
2021-03-12 15:05:26 +01:00
syncer . register_handler ( rpc_server , rpc_path ) ;
2021-03-11 18:28:03 +01:00
let ( busy_tx , busy_rx ) = mpsc ::unbounded_channel ( ) ;
let s1 = syncer . clone ( ) ;
aux . system . background . spawn_worker (
format! ( " table sync watcher for {} " , data . name ) ,
move | must_exit : watch ::Receiver < bool > | s1 . watcher_task ( must_exit , busy_rx ) ,
) ;
let s2 = syncer . clone ( ) ;
aux . system . background . spawn_worker (
format! ( " table syncer for {} " , data . name ) ,
move | must_exit : watch ::Receiver < bool > | s2 . syncer_task ( must_exit , busy_tx ) ,
) ;
let s3 = syncer . clone ( ) ;
tokio ::spawn ( async move {
tokio ::time ::delay_for ( Duration ::from_secs ( 20 ) ) . await ;
s3 . add_full_sync ( ) ;
} ) ;
syncer
}
2021-03-12 15:05:26 +01:00
fn register_handler ( self : & Arc < Self > , rpc_server : & mut RpcServer , path : String ) {
let self2 = self . clone ( ) ;
rpc_server . add_handler ::< SyncRPC , _ , _ > ( path , move | msg , _addr | {
let self2 = self2 . clone ( ) ;
async move { self2 . handle_rpc ( & msg ) . await }
} ) ;
let self2 = self . clone ( ) ;
self . rpc_client
. set_local_handler ( self . aux . system . id , move | msg | {
let self2 = self2 . clone ( ) ;
async move { self2 . handle_rpc ( & msg ) . await }
} ) ;
}
2021-03-11 18:28:03 +01:00
async fn watcher_task (
self : Arc < Self > ,
mut must_exit : watch ::Receiver < bool > ,
mut busy_rx : mpsc ::UnboundedReceiver < bool > ,
) -> Result < ( ) , Error > {
2021-03-12 14:51:17 +01:00
let mut prev_ring : Arc < Ring > = self . aux . system . ring . borrow ( ) . clone ( ) ;
2021-03-11 18:28:03 +01:00
let mut ring_recv : watch ::Receiver < Arc < Ring > > = self . aux . system . ring . clone ( ) ;
let mut nothing_to_do_since = Some ( Instant ::now ( ) ) ;
while ! * must_exit . borrow ( ) {
let s_ring_recv = ring_recv . recv ( ) . fuse ( ) ;
let s_busy = busy_rx . recv ( ) . fuse ( ) ;
let s_must_exit = must_exit . recv ( ) . fuse ( ) ;
let s_timeout = tokio ::time ::delay_for ( Duration ::from_secs ( 1 ) ) . fuse ( ) ;
pin_mut! ( s_ring_recv , s_busy , s_must_exit , s_timeout ) ;
select! {
new_ring_r = s_ring_recv = > {
2021-03-12 14:51:17 +01:00
if let Some ( new_ring ) = new_ring_r {
if ! Arc ::ptr_eq ( & new_ring , & prev_ring ) {
debug! ( " ({}) Ring changed, adding full sync to syncer todo list " , self . data . name ) ;
self . add_full_sync ( ) ;
prev_ring = new_ring ;
}
2021-03-11 18:28:03 +01:00
}
}
busy_opt = s_busy = > {
if let Some ( busy ) = busy_opt {
if busy {
nothing_to_do_since = None ;
} else {
if nothing_to_do_since . is_none ( ) {
nothing_to_do_since = Some ( Instant ::now ( ) ) ;
}
}
}
}
must_exit_v = s_must_exit = > {
if must_exit_v . unwrap_or ( false ) {
break ;
}
}
_ = s_timeout = > {
if nothing_to_do_since . map ( | t | Instant ::now ( ) - t > = ANTI_ENTROPY_INTERVAL ) . unwrap_or ( false ) {
nothing_to_do_since = None ;
2021-03-11 18:45:26 +01:00
debug! ( " ({}) Interval passed, adding full sync to syncer todo list " , self . data . name ) ;
2021-03-11 18:28:03 +01:00
self . add_full_sync ( ) ;
}
}
}
}
Ok ( ( ) )
}
pub fn add_full_sync ( & self ) {
self . todo
. lock ( )
. unwrap ( )
. add_full_sync ( & self . data , & self . aux ) ;
}
async fn syncer_task (
self : Arc < Self > ,
mut must_exit : watch ::Receiver < bool > ,
busy_tx : mpsc ::UnboundedSender < bool > ,
) -> Result < ( ) , Error > {
while ! * must_exit . borrow ( ) {
let task = self . todo . lock ( ) . unwrap ( ) . pop_task ( ) ;
if let Some ( partition ) = task {
busy_tx . send ( true ) ? ;
let res = self
. clone ( )
. sync_partition ( & partition , & mut must_exit )
. await ;
if let Err ( e ) = res {
warn! (
" ({}) Error while syncing {:?}: {} " ,
self . data . name , partition , e
) ;
}
} else {
busy_tx . send ( false ) ? ;
tokio ::time ::delay_for ( Duration ::from_secs ( 1 ) ) . await ;
}
}
Ok ( ( ) )
}
async fn sync_partition (
self : Arc < Self > ,
partition : & TodoPartition ,
must_exit : & mut watch ::Receiver < bool > ,
) -> Result < ( ) , Error > {
if partition . retain {
let my_id = self . aux . system . id ;
let nodes = self
. aux
. replication
. write_nodes (
& hash_of_merkle_partition ( partition . range . begin ) ,
& self . aux . system ,
)
. into_iter ( )
. filter ( | node | * node ! = my_id )
. collect ::< Vec < _ > > ( ) ;
debug! (
" ({}) Syncing {:?} with {:?}... " ,
self . data . name , partition , nodes
) ;
let mut sync_futures = nodes
. iter ( )
. map ( | node | {
self . clone ( )
. do_sync_with ( partition . clone ( ) , * node , must_exit . clone ( ) )
} )
. collect ::< FuturesUnordered < _ > > ( ) ;
let mut n_errors = 0 ;
while let Some ( r ) = sync_futures . next ( ) . await {
if let Err ( e ) = r {
n_errors + = 1 ;
warn! ( " ({}) Sync error: {} " , self . data . name , e ) ;
}
}
if n_errors > self . aux . replication . max_write_errors ( ) {
return Err ( Error ::Message ( format! (
" Sync failed with too many nodes (should have been: {:?}). " ,
nodes
) ) ) ;
}
} else {
self . offload_partition (
& hash_of_merkle_partition ( partition . range . begin ) ,
& hash_of_merkle_partition_opt ( partition . range . end ) ,
must_exit ,
)
. await ? ;
}
Ok ( ( ) )
}
// Offload partition: this partition is not something we are storing,
// so send it out to all other nodes that store it and delete items locally.
// We don't bother checking if the remote nodes already have the items,
// we just batch-send everything. Offloading isn't supposed to happen very often.
// If any of the nodes that are supposed to store the items is unable to
// save them, we interrupt the process.
async fn offload_partition (
self : & Arc < Self > ,
begin : & Hash ,
end : & Hash ,
must_exit : & mut watch ::Receiver < bool > ,
) -> Result < ( ) , Error > {
let mut counter : usize = 0 ;
while ! * must_exit . borrow ( ) {
let mut items = Vec ::new ( ) ;
for item in self . data . store . range ( begin . to_vec ( ) .. end . to_vec ( ) ) {
let ( key , value ) = item ? ;
items . push ( ( key . to_vec ( ) , Arc ::new ( ByteBuf ::from ( value . as_ref ( ) ) ) ) ) ;
if items . len ( ) > = 1024 {
break ;
}
}
if items . len ( ) > 0 {
let nodes = self
. aux
. replication
. write_nodes ( & begin , & self . aux . system )
. into_iter ( )
. collect ::< Vec < _ > > ( ) ;
if nodes . contains ( & self . aux . system . id ) {
2021-03-12 15:05:26 +01:00
warn! (
" ({}) Interrupting offload as partitions seem to have changed " ,
self . data . name
) ;
2021-03-11 18:28:03 +01:00
break ;
}
2021-03-11 19:06:27 +01:00
if nodes . len ( ) < self . aux . replication . write_quorum ( & self . aux . system ) {
2021-03-12 15:05:26 +01:00
return Err ( Error ::Message ( format! (
" Not offloading as we don't have a quorum of nodes to write to. "
) ) ) ;
2021-03-11 19:06:27 +01:00
}
2021-03-11 18:28:03 +01:00
counter + = 1 ;
2021-03-12 14:37:46 +01:00
info! (
" ({}) Offloading {} items from {:?}..{:?} ({}) " ,
self . data . name ,
2021-03-11 18:28:03 +01:00
items . len ( ) ,
begin ,
end ,
counter
) ;
self . offload_items ( & items , & nodes [ .. ] ) . await ? ;
} else {
break ;
}
}
Ok ( ( ) )
}
async fn offload_items (
self : & Arc < Self > ,
items : & Vec < ( Vec < u8 > , Arc < ByteBuf > ) > ,
nodes : & [ UUID ] ,
) -> Result < ( ) , Error > {
let values = items . iter ( ) . map ( | ( _k , v ) | v . clone ( ) ) . collect ::< Vec < _ > > ( ) ;
2021-03-12 15:05:26 +01:00
let update_msg = Arc ::new ( SyncRPC ::Items ( values ) ) ;
2021-03-11 18:28:03 +01:00
for res in join_all ( nodes . iter ( ) . map ( | to | {
2021-03-12 15:05:26 +01:00
self . rpc_client
2021-03-11 18:28:03 +01:00
. call_arc ( * to , update_msg . clone ( ) , TABLE_SYNC_RPC_TIMEOUT )
} ) )
. await
{
res ? ;
}
// All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0 ;
for ( k , v ) in items . iter ( ) {
if ! self . data . delete_if_equal ( & k [ .. ] , & v [ .. ] ) ? {
not_removed + = 1 ;
}
}
if not_removed > 0 {
2021-03-12 14:37:46 +01:00
debug! ( " ({}) {} items not removed during offload because they changed in between (trying again...) " , self . data . name , not_removed ) ;
2021-03-11 18:28:03 +01:00
}
Ok ( ( ) )
}
// ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ======
2021-03-11 18:45:26 +01:00
// The driver side is only concerned with sending out the item it has
// and the other side might not have. Receiving items that differ from one
// side to the other will happen when the other side syncs with us,
// which they also do regularly.
2021-03-11 18:28:03 +01:00
fn get_root_ck ( & self , range : PartitionRange ) -> Result < RootCk , Error > {
let begin = u16 ::from_be_bytes ( range . begin ) ;
let range_iter = match range . end {
Some ( end ) = > {
let end = u16 ::from_be_bytes ( end ) ;
begin ..= ( end - 1 )
}
None = > begin ..= 0xFFFF ,
} ;
let mut ret = vec! [ ] ;
for i in range_iter {
let key = MerkleNodeKey {
partition : u16 ::to_be_bytes ( i ) ,
prefix : vec ! [ ] ,
} ;
match self . data . merkle_updater . read_node ( & key ) ? {
MerkleNode ::Empty = > ( ) ,
x = > {
ret . push ( ( key . partition , hash_of ( & x ) ? ) ) ;
}
}
}
Ok ( ret )
}
async fn do_sync_with (
self : Arc < Self > ,
partition : TodoPartition ,
who : UUID ,
must_exit : watch ::Receiver < bool > ,
) -> Result < ( ) , Error > {
let root_ck = self . get_root_ck ( partition . range ) ? ;
2021-03-11 19:30:24 +01:00
if root_ck . is_empty ( ) {
debug! (
" ({}) Sync {:?} with {:?}: partition is empty. " ,
self . data . name , partition , who
) ;
2021-03-12 15:05:26 +01:00
return Ok ( ( ) ) ;
2021-03-11 19:30:24 +01:00
}
2021-03-11 18:28:03 +01:00
let root_ck_hash = hash_of ( & root_ck ) ? ;
// If their root checksum has level > than us, use that as a reference
let root_resp = self
. rpc_client
. call (
who ,
2021-03-12 15:05:26 +01:00
SyncRPC ::RootCkHash ( partition . range , root_ck_hash ) ,
2021-03-11 18:28:03 +01:00
TABLE_SYNC_RPC_TIMEOUT ,
)
. await ? ;
let mut todo = match root_resp {
2021-03-12 15:05:26 +01:00
SyncRPC ::CkNoDifference = > {
2021-03-11 18:28:03 +01:00
debug! (
" ({}) Sync {:?} with {:?}: no difference " ,
self . data . name , partition , who
) ;
return Ok ( ( ) ) ;
}
2021-03-12 15:05:26 +01:00
SyncRPC ::RootCkList ( _ , their_root_ck ) = > {
2021-03-11 18:28:03 +01:00
let join = join_ordered ( & root_ck [ .. ] , & their_root_ck [ .. ] ) ;
let mut todo = VecDeque ::new ( ) ;
for ( p , v1 , v2 ) in join . iter ( ) {
let diff = match ( v1 , v2 ) {
( Some ( _ ) , None ) | ( None , Some ( _ ) ) = > true ,
( Some ( a ) , Some ( b ) ) = > a ! = b ,
_ = > false ,
} ;
if diff {
todo . push_back ( MerkleNodeKey {
partition : * * p ,
prefix : vec ! [ ] ,
} ) ;
}
}
debug! (
" ({}) Sync {:?} with {:?}: todo.len() = {} " ,
self . data . name ,
partition ,
who ,
todo . len ( )
) ;
todo
}
x = > {
return Err ( Error ::Message ( format! (
" Invalid respone to RootCkHash RPC: {} " ,
debug_serialize ( x )
) ) ) ;
}
} ;
let mut todo_items = vec! [ ] ;
while ! todo . is_empty ( ) & & ! * must_exit . borrow ( ) {
let key = todo . pop_front ( ) . unwrap ( ) ;
let node = self . data . merkle_updater . read_node ( & key ) ? ;
match node {
MerkleNode ::Empty = > {
// They have items we don't have.
// We don't request those items from them, they will send them.
// We only bother with pushing items that differ
}
2021-03-11 18:50:32 +01:00
MerkleNode ::Leaf ( ik , ivhash ) = > {
2021-03-11 18:28:03 +01:00
// Just send that item directly
2021-03-11 18:50:32 +01:00
if let Some ( val ) = self . data . store . get ( & ik [ .. ] ) ? {
if blake2sum ( & val [ .. ] ) ! = ivhash {
2021-03-12 14:37:46 +01:00
warn! ( " ({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough) " , self . data . name , ik ) ;
2021-03-11 18:50:32 +01:00
}
2021-03-11 18:28:03 +01:00
todo_items . push ( val . to_vec ( ) ) ;
2021-03-11 18:55:17 +01:00
} else {
2021-03-12 14:37:46 +01:00
warn! ( " ({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough) " , self . data . name , ik ) ;
2021-03-11 18:28:03 +01:00
}
}
MerkleNode ::Intermediate ( l ) = > {
2021-03-11 18:55:17 +01:00
// Get Merkle node for this tree position at remote node
// and compare it with local node
2021-03-11 18:28:03 +01:00
let remote_node = match self
. rpc_client
2021-03-12 15:05:26 +01:00
. call ( who , SyncRPC ::GetNode ( key . clone ( ) ) , TABLE_SYNC_RPC_TIMEOUT )
2021-03-11 18:28:03 +01:00
. await ?
{
2021-03-12 15:05:26 +01:00
SyncRPC ::Node ( _ , node ) = > node ,
2021-03-11 18:28:03 +01:00
x = > {
return Err ( Error ::Message ( format! (
" Invalid respone to GetNode RPC: {} " ,
debug_serialize ( x )
) ) ) ;
}
} ;
let int_l2 = match remote_node {
2021-03-11 18:55:17 +01:00
// If they have an intermediate node at this tree position,
// we can compare them to find differences
2021-03-11 18:28:03 +01:00
MerkleNode ::Intermediate ( l2 ) = > l2 ,
2021-03-11 18:55:17 +01:00
// Otherwise, treat it as if they have nothing for this subtree,
// which will have the consequence of sending them everything
2021-03-11 18:28:03 +01:00
_ = > vec! [ ] ,
} ;
let join = join_ordered ( & l [ .. ] , & int_l2 [ .. ] ) ;
for ( p , v1 , v2 ) in join . into_iter ( ) {
let diff = match ( v1 , v2 ) {
( Some ( _ ) , None ) | ( None , Some ( _ ) ) = > true ,
( Some ( a ) , Some ( b ) ) = > a ! = b ,
_ = > false ,
} ;
if diff {
todo . push_back ( key . add_byte ( * p ) ) ;
}
}
}
}
if todo_items . len ( ) > = 256 {
self . send_items ( who , std ::mem ::replace ( & mut todo_items , vec! [ ] ) )
. await ? ;
}
}
if ! todo_items . is_empty ( ) {
self . send_items ( who , todo_items ) . await ? ;
}
Ok ( ( ) )
}
2021-03-11 18:55:17 +01:00
async fn send_items ( & self , who : UUID , item_value_list : Vec < Vec < u8 > > ) -> Result < ( ) , Error > {
2021-03-11 18:28:03 +01:00
info! (
" ({}) Sending {} items to {:?} " ,
self . data . name ,
2021-03-11 18:55:17 +01:00
item_value_list . len ( ) ,
2021-03-11 18:28:03 +01:00
who
) ;
2021-03-12 15:05:26 +01:00
let values = item_value_list
. into_iter ( )
2021-03-11 18:55:17 +01:00
. map ( | x | Arc ::new ( ByteBuf ::from ( x ) ) )
. collect ::< Vec < _ > > ( ) ;
2021-03-11 18:28:03 +01:00
let rpc_resp = self
. rpc_client
2021-03-12 15:05:26 +01:00
. call ( who , SyncRPC ::Items ( values ) , TABLE_SYNC_RPC_TIMEOUT )
2021-03-11 18:28:03 +01:00
. await ? ;
2021-03-12 15:05:26 +01:00
if let SyncRPC ::Ok = rpc_resp {
2021-03-11 18:28:03 +01:00
Ok ( ( ) )
} else {
Err ( Error ::Message ( format! (
" Unexpected response to RPC Update: {} " ,
debug_serialize ( & rpc_resp )
) ) )
}
}
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
pub ( crate ) async fn handle_rpc ( self : & Arc < Self > , message : & SyncRPC ) -> Result < SyncRPC , Error > {
match message {
SyncRPC ::RootCkHash ( range , h ) = > {
let root_ck = self . get_root_ck ( * range ) ? ;
let hash = hash_of ( & root_ck ) ? ;
if hash = = * h {
Ok ( SyncRPC ::CkNoDifference )
} else {
Ok ( SyncRPC ::RootCkList ( * range , root_ck ) )
}
}
SyncRPC ::GetNode ( k ) = > {
let node = self . data . merkle_updater . read_node ( & k ) ? ;
Ok ( SyncRPC ::Node ( k . clone ( ) , node ) )
}
2021-03-12 15:05:26 +01:00
SyncRPC ::Items ( items ) = > {
self . data . update_many ( items ) ? ;
Ok ( SyncRPC ::Ok )
}
2021-03-11 18:28:03 +01:00
_ = > Err ( Error ::Message ( format! ( " Unexpected sync RPC " ) ) ) ,
}
}
}
impl SyncTodo {
fn add_full_sync < F : TableSchema , R : TableReplication > (
& mut self ,
data : & TableData < F > ,
2021-03-12 15:07:23 +01:00
aux : & TableAux < R > ,
2021-03-11 18:28:03 +01:00
) {
let my_id = aux . system . id ;
self . todo . clear ( ) ;
let ring = aux . system . ring . borrow ( ) . clone ( ) ;
let split_points = aux . replication . split_points ( & ring ) ;
for i in 0 .. split_points . len ( ) {
let begin : MerklePartition = {
let b = split_points [ i ] ;
assert_eq! ( b . as_slice ( ) [ 2 .. ] , [ 0 u8 ; 30 ] [ .. ] ) ;
b . as_slice ( ) [ .. 2 ] . try_into ( ) . unwrap ( )
} ;
let end : Option < MerklePartition > = if i + 1 < split_points . len ( ) {
let e = split_points [ i + 1 ] ;
assert_eq! ( e . as_slice ( ) [ 2 .. ] , [ 0 u8 ; 30 ] [ .. ] ) ;
Some ( e . as_slice ( ) [ .. 2 ] . try_into ( ) . unwrap ( ) )
} else {
None
} ;
let begin_hash = hash_of_merkle_partition ( begin ) ;
let end_hash = hash_of_merkle_partition_opt ( end ) ;
let nodes = aux . replication . replication_nodes ( & begin_hash , & ring ) ;
let retain = nodes . contains ( & my_id ) ;
if ! retain {
// Check if we have some data to send, otherwise skip
if data . store . range ( begin_hash .. end_hash ) . next ( ) . is_none ( ) {
continue ;
}
}
self . todo . push ( TodoPartition {
range : PartitionRange { begin , end } ,
retain ,
} ) ;
}
}
fn pop_task ( & mut self ) -> Option < TodoPartition > {
if self . todo . is_empty ( ) {
return None ;
}
let i = rand ::thread_rng ( ) . gen_range ::< usize , _ , _ > ( 0 , self . todo . len ( ) ) ;
if i = = self . todo . len ( ) - 1 {
self . todo . pop ( )
} else {
let replacement = self . todo . pop ( ) . unwrap ( ) ;
let ret = std ::mem ::replace ( & mut self . todo [ i ] , replacement ) ;
Some ( ret )
}
}
}
fn hash_of < T : Serialize > ( x : & T ) -> Result < Hash , Error > {
Ok ( blake2sum ( & rmp_to_vec_all_named ( x ) ? [ .. ] ) )
}
fn join_ordered < ' a , K : Ord + Eq , V1 , V2 > (
x : & ' a [ ( K , V1 ) ] ,
y : & ' a [ ( K , V2 ) ] ,
) -> Vec < ( & ' a K , Option < & ' a V1 > , Option < & ' a V2 > ) > {
let mut ret = vec! [ ] ;
let mut i = 0 ;
let mut j = 0 ;
while i < x . len ( ) | | j < y . len ( ) {
if i < x . len ( ) & & j < y . len ( ) & & x [ i ] . 0 = = y [ j ] . 0 {
ret . push ( ( & x [ i ] . 0 , Some ( & x [ i ] . 1 ) , Some ( & y [ j ] . 1 ) ) ) ;
i + = 1 ;
j + = 1 ;
} else if i < x . len ( ) & & ( j = = y . len ( ) | | x [ i ] . 0 < y [ j ] . 0 ) {
ret . push ( ( & x [ i ] . 0 , Some ( & x [ i ] . 1 ) , None ) ) ;
i + = 1 ;
} else if j < y . len ( ) & & ( i = = x . len ( ) | | x [ i ] . 0 > y [ j ] . 0 ) {
2021-03-11 19:30:24 +01:00
ret . push ( ( & y [ j ] . 0 , None , Some ( & y [ j ] . 1 ) ) ) ;
2021-03-11 18:28:03 +01:00
j + = 1 ;
} else {
unreachable! ( ) ;
}
}
ret
}