diff --git a/src/table/table.rs b/src/table/table.rs index 300e400f..31241530 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -435,7 +435,7 @@ where let syncer = self.syncer.load_full().unwrap(); debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end); - let mut count = 0; + let mut count: usize = 0; while let Some((key, _value)) = self.store.get_lt(end.as_slice())? { if key.as_ref() < begin.as_slice() { break; diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 073540d4..1a1b328b 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -1,15 +1,14 @@ use rand::Rng; use std::collections::{BTreeMap, VecDeque}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use futures::future::BoxFuture; +use futures::future::join_all; use futures::{pin_mut, select}; use futures_util::future::*; use futures_util::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use tokio::sync::Mutex; use tokio::sync::{mpsc, watch}; use garage_rpc::ring::Ring; @@ -33,7 +32,7 @@ pub struct TableSyncer { pub enum SyncRPC { GetRootChecksumRange(Hash, Hash), RootChecksumRange(SyncRange), - Checksums(Vec, bool), + Checksums(Vec), Difference(Vec, Vec>), } @@ -43,8 +42,11 @@ pub struct SyncTodo { #[derive(Debug, Clone)] struct TodoPartition { + // Partition consists in hashes between begin included and end excluded begin: Hash, end: Hash, + + // Are we a node that stores this partition or not? retain: bool, } @@ -161,7 +163,7 @@ where new_ring_r = s_ring_recv => { if let Some(new_ring) = new_ring_r { debug!("({}) Adding ring difference to syncer todo list", self.table.name); - self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring); + self.todo.lock().unwrap().add_ring_difference(&self.table, &prev_ring, &new_ring); prev_ring = new_ring; } } @@ -194,7 +196,7 @@ where } pub async fn add_full_scan(&self) { - self.todo.lock().await.add_full_scan(&self.table); + self.todo.lock().unwrap().add_full_scan(&self.table); } async fn syncer_task( @@ -203,7 +205,8 @@ where busy_tx: mpsc::UnboundedSender, ) -> Result<(), Error> { while !*must_exit.borrow() { - if let Some(partition) = self.todo.lock().await.pop_task() { + let task = self.todo.lock().unwrap().pop_task(); + if let Some(partition) = task { busy_tx.send(true)?; let res = self .clone() @@ -228,76 +231,152 @@ where partition: &TodoPartition, must_exit: &mut watch::Receiver, ) -> Result<(), Error> { - let my_id = self.table.system.id; - let nodes = self - .table - .replication - .write_nodes(&partition.begin, &self.table.system) - .into_iter() - .filter(|node| *node != my_id) - .collect::>(); + if partition.retain { + let my_id = self.table.system.id; + let nodes = self + .table + .replication + .write_nodes(&partition.begin, &self.table.system) + .into_iter() + .filter(|node| *node != my_id) + .collect::>(); - debug!( - "({}) Preparing to sync {:?} with {:?}...", - self.table.name, partition, nodes - ); - let root_cks = self - .root_checksum(&partition.begin, &partition.end, must_exit) - .await?; + debug!( + "({}) Preparing to sync {:?} with {:?}...", + self.table.name, partition, nodes + ); + let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?; - let mut sync_futures = nodes - .iter() - .map(|node| { - self.clone().do_sync_with( - partition.clone(), - root_cks.clone(), - *node, - partition.retain, - must_exit.clone(), - ) - }) - .collect::>(); + let mut sync_futures = nodes + .iter() + .map(|node| { + self.clone().do_sync_with( + partition.clone(), + root_cks.clone(), + *node, + must_exit.clone(), + ) + }) + .collect::>(); - let mut n_errors = 0; - while let Some(r) = sync_futures.next().await { - if let Err(e) = r { - n_errors += 1; - warn!("({}) Sync error: {}", self.table.name, e); + let mut n_errors = 0; + while let Some(r) = sync_futures.next().await { + if let Err(e) = r { + n_errors += 1; + warn!("({}) Sync error: {}", self.table.name, e); + } } - } - if n_errors > self.table.replication.max_write_errors() { - return Err(Error::Message(format!( - "Sync failed with too many nodes (should have been: {:?}).", - nodes - ))); - } - - if !partition.retain { - self.table - .delete_range(&partition.begin, &partition.end) + if n_errors > self.table.replication.max_write_errors() { + return Err(Error::Message(format!( + "Sync failed with too many nodes (should have been: {:?}).", + nodes + ))); + } + } else { + self.offload_partition(&partition.begin, &partition.end, must_exit) .await?; } Ok(()) } - async fn root_checksum( + // Offload partition: this partition is not something we are storing, + // so send it out to all other nodes that store it and delete items locally. + // We don't bother checking if the remote nodes already have the items, + // we just batch-send everything. Offloading isn't supposed to happen very often. + // If any of the nodes that are supposed to store the items is unable to + // save them, we interrupt the process. + async fn offload_partition( + self: &Arc, + begin: &Hash, + end: &Hash, + must_exit: &mut watch::Receiver, + ) -> Result<(), Error> { + let mut counter: usize = 0; + + while !*must_exit.borrow() { + let mut items = Vec::new(); + + for item in self.table.store.range(begin.to_vec()..end.to_vec()) { + let (key, value) = item?; + items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); + + if items.len() >= 1024 { + break; + } + } + + if items.len() > 0 { + let nodes = self + .table + .replication + .write_nodes(&begin, &self.table.system) + .into_iter() + .collect::>(); + if nodes.contains(&self.table.system.id) { + warn!("Interrupting offload as partitions seem to have changed"); + break; + } + + counter += 1; + debug!("Offloading items from {:?}..{:?} ({})", begin, end, counter); + self.offload_items(&items, &nodes[..]).await?; + } else { + break; + } + } + + Ok(()) + } + + async fn offload_items( + self: &Arc, + items: &Vec<(Vec, Arc)>, + nodes: &[UUID], + ) -> Result<(), Error> { + let values = items.iter().map(|(_k, v)| v.clone()).collect::>(); + let update_msg = Arc::new(TableRPC::::Update(values)); + + for res in join_all(nodes.iter().map(|to| { + self.table + .rpc_client + .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) + })) + .await + { + res?; + } + + // All remote nodes have written those items, now we can delete them locally + for (k, v) in items.iter() { + self.table.store.transaction(|tx_db| { + if let Some(curv) = tx_db.get(k)? { + if curv == &v[..] { + tx_db.remove(&k[..])?; + } + } + Ok(()) + })?; + } + + Ok(()) + } + + fn root_checksum( self: &Arc, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver, ) -> Result { for i in 1..MAX_DEPTH { - let rc = self - .range_checksum( - &SyncRange { - begin: begin.to_vec(), - end: end.to_vec(), - level: i, - }, - must_exit, - ) - .await?; + let rc = self.range_checksum( + &SyncRange { + begin: begin.to_vec(), + end: end.to_vec(), + level: i, + }, + must_exit, + )?; if rc.found_limit.is_none() { return Ok(rc); } @@ -307,7 +386,7 @@ where ))) } - async fn range_checksum( + fn range_checksum( self: &Arc, range: &SyncRange, must_exit: &mut watch::Receiver, @@ -357,9 +436,7 @@ where }; let mut time = Instant::now(); while !*must_exit.borrow() { - let sub_ck = self - .range_checksum_cached_hash(&sub_range, must_exit) - .await?; + let sub_ck = self.range_checksum_cached_hash(&sub_range, must_exit)?; if let Some(hash) = sub_ck.hash { children.push((sub_range.clone(), hash)); @@ -397,50 +474,48 @@ where } } - fn range_checksum_cached_hash<'a>( - self: &'a Arc, - range: &'a SyncRange, - must_exit: &'a mut watch::Receiver, - ) -> BoxFuture<'a, Result> { - async move { - let mut cache = self.cache[range.level].lock().await; + fn range_checksum_cached_hash( + self: &Arc, + range: &SyncRange, + must_exit: &mut watch::Receiver, + ) -> Result { + { + let mut cache = self.cache[range.level].lock().unwrap(); if let Some(v) = cache.get(&range) { if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT { return Ok(v.clone()); } } cache.remove(&range); - drop(cache); - - let v = self.range_checksum(&range, must_exit).await?; - trace!( - "({}) New checksum calculated for {}-{}/{}, {} children", - self.table.name, - hex::encode(&range.begin) - .chars() - .take(16) - .collect::(), - hex::encode(&range.end).chars().take(16).collect::(), - range.level, - v.children.len() - ); - - let hash = if v.children.len() > 0 { - Some(blake2sum(&rmp_to_vec_all_named(&v)?[..])) - } else { - None - }; - let cache_entry = RangeChecksumCache { - hash, - found_limit: v.found_limit, - time: v.time, - }; - - let mut cache = self.cache[range.level].lock().await; - cache.insert(range.clone(), cache_entry.clone()); - Ok(cache_entry) } - .boxed() + + let v = self.range_checksum(&range, must_exit)?; + trace!( + "({}) New checksum calculated for {}-{}/{}, {} children", + self.table.name, + hex::encode(&range.begin) + .chars() + .take(16) + .collect::(), + hex::encode(&range.end).chars().take(16).collect::(), + range.level, + v.children.len() + ); + + let hash = if v.children.len() > 0 { + Some(blake2sum(&rmp_to_vec_all_named(&v)?[..])) + } else { + None + }; + let cache_entry = RangeChecksumCache { + hash, + found_limit: v.found_limit, + time: v.time, + }; + + let mut cache = self.cache[range.level].lock().unwrap(); + cache.insert(range.clone(), cache_entry.clone()); + Ok(cache_entry) } async fn do_sync_with( @@ -448,7 +523,6 @@ where partition: TodoPartition, root_ck: RangeChecksum, who: UUID, - retain: bool, mut must_exit: watch::Receiver, ) -> Result<(), Error> { let mut todo = VecDeque::new(); @@ -468,7 +542,7 @@ where .await?; if let TableRPC::::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp { if range.level > root_ck.bounds.level { - let their_root_range_ck = self.range_checksum(&range, &mut must_exit).await?; + let their_root_range_ck = self.range_checksum(&range, &mut must_exit)?; todo.push_back(their_root_range_ck); } else { todo.push_back(root_ck); @@ -498,7 +572,7 @@ where .rpc_client .call( who, - TableRPC::::SyncRPC(SyncRPC::Checksums(step, retain)), + TableRPC::::SyncRPC(SyncRPC::Checksums(step)), TABLE_SYNC_RPC_TIMEOUT, ) .await?; @@ -519,11 +593,11 @@ where if differing.level == 0 { items_to_send.push(differing.begin); } else { - let checksum = self.range_checksum(&differing, &mut must_exit).await?; + let checksum = self.range_checksum(&differing, &mut must_exit)?; todo.push_back(checksum); } } - if retain && diff_items.len() > 0 { + if diff_items.len() > 0 { self.table.handle_update(&diff_items[..]).await?; } if items_to_send.len() > 0 { @@ -575,11 +649,11 @@ where ) -> Result { match message { SyncRPC::GetRootChecksumRange(begin, end) => { - let root_cks = self.root_checksum(&begin, &end, &mut must_exit).await?; + let root_cks = self.root_checksum(&begin, &end, &mut must_exit)?; Ok(SyncRPC::RootChecksumRange(root_cks.bounds)) } - SyncRPC::Checksums(checksums, retain) => { - self.handle_checksums_rpc(&checksums[..], *retain, &mut must_exit) + SyncRPC::Checksums(checksums) => { + self.handle_checksums_rpc(&checksums[..], &mut must_exit) .await } _ => Err(Error::Message(format!("Unexpected sync RPC"))), @@ -589,14 +663,13 @@ where async fn handle_checksums_rpc( self: &Arc, checksums: &[RangeChecksum], - retain: bool, must_exit: &mut watch::Receiver, ) -> Result { let mut ret_ranges = vec![]; let mut ret_items = vec![]; for their_ckr in checksums.iter() { - let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit).await?; + let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit)?; for (their_range, their_hash) in their_ckr.children.iter() { let differs = match our_ckr .children @@ -604,9 +677,8 @@ where { Err(_) => { if their_range.level >= 1 { - let cached_hash = self - .range_checksum_cached_hash(&their_range, must_exit) - .await?; + let cached_hash = + self.range_checksum_cached_hash(&their_range, must_exit)?; cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true) } else { true @@ -616,7 +688,7 @@ where }; if differs { ret_ranges.push(their_range.clone()); - if retain && their_range.level == 0 { + if their_range.level == 0 { if let Some(item_bytes) = self.table.store.get(their_range.begin.as_slice())? { @@ -640,7 +712,7 @@ where if our_range.level > 0 { ret_ranges.push(our_range.clone()); } - if retain && our_range.level == 0 { + if our_range.level == 0 { if let Some(item_bytes) = self.table.store.get(our_range.begin.as_slice())? { @@ -673,7 +745,7 @@ where end: vec![], level: i, }; - let mut cache = self.cache[i].lock().await; + let mut cache = self.cache[i].lock().unwrap(); if let Some(cache_entry) = cache.range(..=needle).rev().next() { if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key { let index = cache_entry.0.clone();