Make sync send data both ways

This commit is contained in:
Alex 2020-04-17 18:27:29 +02:00
parent 69f1d8fef2
commit b780f6485d
3 changed files with 59 additions and 25 deletions

View file

@ -25,7 +25,7 @@ impl Eq for FixedBytes32 {}
impl fmt::Debug for FixedBytes32 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", hex::encode(self.0))
write!(f, "{}", hex::encode(&self.0[..8]))
}
}

View file

@ -6,7 +6,7 @@ use async_trait::async_trait;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use tokio::sync::RwLock;
use arc_swap::ArcSwapOption;
use crate::data::*;
use crate::error::Error;
@ -22,7 +22,7 @@ pub struct Table<F: TableSchema> {
pub system: Arc<System>,
pub store: sled::Tree,
pub syncer: RwLock<Option<Arc<TableSyncer<F>>>>,
pub syncer: ArcSwapOption<TableSyncer<F>>,
pub param: TableReplicationParams,
}
@ -142,10 +142,10 @@ impl<F: TableSchema + 'static> Table<F> {
system,
store,
param,
syncer: RwLock::new(None),
syncer: ArcSwapOption::from(None),
});
let syncer = TableSyncer::launch(table.clone()).await;
*table.syncer.write().await = Some(syncer);
table.syncer.swap(Some(syncer));
table
}
@ -389,7 +389,7 @@ impl<F: TableSchema + 'static> Table<F> {
Ok(TableRPC::Ok)
}
TableRPC::SyncRPC(rpc) => {
let syncer = self.syncer.read().await.as_ref().unwrap().clone();
let syncer = self.syncer.load_full().unwrap();
let response = syncer
.handle_rpc(&rpc, self.system.background.stop_signal.clone())
.await?;
@ -408,7 +408,7 @@ impl<F: TableSchema + 'static> Table<F> {
}
}
async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
pub async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
for update_bytes in entries.drain(..) {
let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?;
@ -437,7 +437,7 @@ impl<F: TableSchema + 'static> Table<F> {
if old_entry != new_entry {
self.instance.updated(old_entry, new_entry).await;
let syncer = self.syncer.read().await.as_ref().unwrap().clone();
let syncer = self.syncer.load_full().unwrap();
self.system.background.spawn(syncer.invalidate(tree_key));
}
}

View file

@ -30,7 +30,7 @@ pub struct TableSyncer<F: TableSchema> {
#[derive(Serialize, Deserialize)]
pub enum SyncRPC {
Checksums(Vec<RangeChecksum>),
DifferentSet(Vec<SyncRange>),
Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
}
pub struct SyncTodo {
@ -172,10 +172,12 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.root_checksum(&partition.begin, &partition.end, must_exit)
.await?;
let my_id = self.table.system.id.clone();
let ring = self.table.system.ring.borrow().clone();
let nodes = ring.walk_ring(&partition.begin, self.table.param.replication_factor);
let mut sync_futures = nodes
.iter()
.filter(|node| **node != my_id)
.map(|node| {
self.clone()
.do_sync_with(root_cks.clone(), node.clone(), must_exit.clone())
@ -364,21 +366,25 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.table
.rpc_call(&who, &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)))
.await?;
if let TableRPC::<F>::SyncRPC(SyncRPC::DifferentSet(mut s)) = rpc_resp {
let mut items = vec![];
for differing in s.drain(..) {
if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = rpc_resp {
eprintln!("({}) Sync with {:?}: difference {} ranges, {} items", self.table.name, who, diff_ranges.len(), diff_items.len());
let mut items_to_send = vec![];
for differing in diff_ranges.drain(..) {
if differing.level == 0 {
items.push(differing.begin);
items_to_send.push(differing.begin);
} else {
let checksum = self.range_checksum(&differing, &mut must_exit).await?;
todo.push_back(checksum);
}
}
if items.len() > 0 {
if diff_items.len() > 0 {
self.table.handle_update(diff_items).await?;
}
if items_to_send.len() > 0 {
self.table
.system
.background
.spawn(self.clone().send_items(who.clone(), items));
.spawn(self.clone().send_items(who.clone(), items_to_send));
}
} else {
return Err(Error::Message(format!(
@ -424,20 +430,47 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
mut must_exit: watch::Receiver<bool>,
) -> Result<SyncRPC, Error> {
if let SyncRPC::Checksums(checksums) = message {
let mut ret = vec![];
let mut ret_ranges = vec![];
let mut ret_items = vec![];
for ckr in checksums.iter() {
let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?;
for (range, hash) in ckr.children.iter() {
match our_ckr
// Only consider items that are in the intersection of the two ranges
// (other ranges will be exchanged at some point)
if our_ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) {
break;
}
let differs = match our_ckr
.children
.binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
{
Err(_) => {
ret.push(range.clone());
Err(_) => true,
Ok(i) => our_ckr.children[i].1 != *hash,
};
if differs {
ret_ranges.push(range.clone());
if range.level == 0 {
if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? {
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
}
Ok(i) => {
if our_ckr.children[i].1 != *hash {
ret.push(range.clone());
}
}
for (range, _hash) in our_ckr.children.iter() {
if ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) {
break;
}
let not_present = ckr
.children
.binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin))
.is_err();
if not_present {
ret_ranges.push(range.clone());
if range.level == 0 {
if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? {
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
}
}
@ -448,12 +481,13 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.map(|x| x.children.len())
.fold(0, |x, y| x + y);
eprintln!(
"({}) Checksum comparison RPC: {} different out of {}",
"({}) Checksum comparison RPC: {} different + {} items for {} received",
self.table.name,
ret.len(),
ret_ranges.len(),
ret_items.len(),
n_checksums
);
return Ok(SyncRPC::DifferentSet(ret));
return Ok(SyncRPC::Difference(ret_ranges, ret_items));
}
Err(Error::Message(format!("Unexpected sync RPC")))
}