diff --git a/src/data.rs b/src/data.rs index c1665d2a..a3b7b23b 100644 --- a/src/data.rs +++ b/src/data.rs @@ -25,7 +25,7 @@ impl Eq for FixedBytes32 {} impl fmt::Debug for FixedBytes32 { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", hex::encode(self.0)) + write!(f, "{}…", hex::encode(&self.0[..8])) } } diff --git a/src/table.rs b/src/table.rs index 40114aec..bd26a79d 100644 --- a/src/table.rs +++ b/src/table.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use tokio::sync::RwLock; +use arc_swap::ArcSwapOption; use crate::data::*; use crate::error::Error; @@ -22,7 +22,7 @@ pub struct Table { pub system: Arc, pub store: sled::Tree, - pub syncer: RwLock>>>, + pub syncer: ArcSwapOption>, pub param: TableReplicationParams, } @@ -142,10 +142,10 @@ impl Table { system, store, param, - syncer: RwLock::new(None), + syncer: ArcSwapOption::from(None), }); let syncer = TableSyncer::launch(table.clone()).await; - *table.syncer.write().await = Some(syncer); + table.syncer.swap(Some(syncer)); table } @@ -389,7 +389,7 @@ impl Table { Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { - let syncer = self.syncer.read().await.as_ref().unwrap().clone(); + let syncer = self.syncer.load_full().unwrap(); let response = syncer .handle_rpc(&rpc, self.system.background.stop_signal.clone()) .await?; @@ -408,7 +408,7 @@ impl Table { } } - async fn handle_update(self: &Arc, mut entries: Vec>) -> Result<(), Error> { + pub async fn handle_update(self: &Arc, mut entries: Vec>) -> Result<(), Error> { for update_bytes in entries.drain(..) { let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; @@ -437,7 +437,7 @@ impl Table { if old_entry != new_entry { self.instance.updated(old_entry, new_entry).await; - let syncer = self.syncer.read().await.as_ref().unwrap().clone(); + let syncer = self.syncer.load_full().unwrap(); self.system.background.spawn(syncer.invalidate(tree_key)); } } diff --git a/src/table_sync.rs b/src/table_sync.rs index 5ef13d6d..f96e45ff 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -30,7 +30,7 @@ pub struct TableSyncer { #[derive(Serialize, Deserialize)] pub enum SyncRPC { Checksums(Vec), - DifferentSet(Vec), + Difference(Vec, Vec>), } pub struct SyncTodo { @@ -172,10 +172,12 @@ impl TableSyncer { .root_checksum(&partition.begin, &partition.end, must_exit) .await?; + let my_id = self.table.system.id.clone(); let ring = self.table.system.ring.borrow().clone(); let nodes = ring.walk_ring(&partition.begin, self.table.param.replication_factor); let mut sync_futures = nodes .iter() + .filter(|node| **node != my_id) .map(|node| { self.clone() .do_sync_with(root_cks.clone(), node.clone(), must_exit.clone()) @@ -364,21 +366,25 @@ impl TableSyncer { .table .rpc_call(&who, &TableRPC::::SyncRPC(SyncRPC::Checksums(step))) .await?; - if let TableRPC::::SyncRPC(SyncRPC::DifferentSet(mut s)) = rpc_resp { - let mut items = vec![]; - for differing in s.drain(..) { + if let TableRPC::::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = rpc_resp { + eprintln!("({}) Sync with {:?}: difference {} ranges, {} items", self.table.name, who, diff_ranges.len(), diff_items.len()); + let mut items_to_send = vec![]; + for differing in diff_ranges.drain(..) { if differing.level == 0 { - items.push(differing.begin); + items_to_send.push(differing.begin); } else { let checksum = self.range_checksum(&differing, &mut must_exit).await?; todo.push_back(checksum); } } - if items.len() > 0 { + if diff_items.len() > 0 { + self.table.handle_update(diff_items).await?; + } + if items_to_send.len() > 0 { self.table .system .background - .spawn(self.clone().send_items(who.clone(), items)); + .spawn(self.clone().send_items(who.clone(), items_to_send)); } } else { return Err(Error::Message(format!( @@ -424,20 +430,47 @@ impl TableSyncer { mut must_exit: watch::Receiver, ) -> Result { if let SyncRPC::Checksums(checksums) = message { - let mut ret = vec![]; + let mut ret_ranges = vec![]; + let mut ret_items = vec![]; for ckr in checksums.iter() { let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?; for (range, hash) in ckr.children.iter() { - match our_ckr + // Only consider items that are in the intersection of the two ranges + // (other ranges will be exchanged at some point) + if our_ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) { + break; + } + + let differs = match our_ckr .children .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) { - Err(_) => { - ret.push(range.clone()); + Err(_) => true, + Ok(i) => our_ckr.children[i].1 != *hash, + }; + if differs { + ret_ranges.push(range.clone()); + if range.level == 0 { + if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { + ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); + } } - Ok(i) => { - if our_ckr.children[i].1 != *hash { - ret.push(range.clone()); + } + } + for (range, _hash) in our_ckr.children.iter() { + if ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) { + break; + } + + let not_present = ckr + .children + .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin)) + .is_err(); + if not_present { + ret_ranges.push(range.clone()); + if range.level == 0 { + if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { + ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } } } @@ -448,12 +481,13 @@ impl TableSyncer { .map(|x| x.children.len()) .fold(0, |x, y| x + y); eprintln!( - "({}) Checksum comparison RPC: {} different out of {}", + "({}) Checksum comparison RPC: {} different + {} items for {} received", self.table.name, - ret.len(), + ret_ranges.len(), + ret_items.len(), n_checksums ); - return Ok(SyncRPC::DifferentSet(ret)); + return Ok(SyncRPC::Difference(ret_ranges, ret_items)); } Err(Error::Message(format!("Unexpected sync RPC"))) }