Several changes in table_sync:

- separate path for case of offloading a partition we don't store
- use sync::Mutex instead of tokio::Mutex, make less fn's async
This commit is contained in:
Alex 2021-02-23 19:11:02 +01:00
parent 40763fd749
commit 55156cca9d
2 changed files with 191 additions and 119 deletions

View file

@ -435,7 +435,7 @@ where
let syncer = self.syncer.load_full().unwrap();
debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end);
let mut count = 0;
let mut count: usize = 0;
while let Some((key, _value)) = self.store.get_lt(end.as_slice())? {
if key.as_ref() < begin.as_slice() {
break;

View file

@ -1,15 +1,14 @@
use rand::Rng;
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use futures::future::BoxFuture;
use futures::future::join_all;
use futures::{pin_mut, select};
use futures_util::future::*;
use futures_util::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, watch};
use garage_rpc::ring::Ring;
@ -33,7 +32,7 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> {
pub enum SyncRPC {
GetRootChecksumRange(Hash, Hash),
RootChecksumRange(SyncRange),
Checksums(Vec<RangeChecksum>, bool),
Checksums(Vec<RangeChecksum>),
Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
}
@ -43,8 +42,11 @@ pub struct SyncTodo {
#[derive(Debug, Clone)]
struct TodoPartition {
// Partition consists in hashes between begin included and end excluded
begin: Hash,
end: Hash,
// Are we a node that stores this partition or not?
retain: bool,
}
@ -161,7 +163,7 @@ where
new_ring_r = s_ring_recv => {
if let Some(new_ring) = new_ring_r {
debug!("({}) Adding ring difference to syncer todo list", self.table.name);
self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
self.todo.lock().unwrap().add_ring_difference(&self.table, &prev_ring, &new_ring);
prev_ring = new_ring;
}
}
@ -194,7 +196,7 @@ where
}
pub async fn add_full_scan(&self) {
self.todo.lock().await.add_full_scan(&self.table);
self.todo.lock().unwrap().add_full_scan(&self.table);
}
async fn syncer_task(
@ -203,7 +205,8 @@ where
busy_tx: mpsc::UnboundedSender<bool>,
) -> Result<(), Error> {
while !*must_exit.borrow() {
if let Some(partition) = self.todo.lock().await.pop_task() {
let task = self.todo.lock().unwrap().pop_task();
if let Some(partition) = task {
busy_tx.send(true)?;
let res = self
.clone()
@ -228,76 +231,152 @@ where
partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
let my_id = self.table.system.id;
let nodes = self
.table
.replication
.write_nodes(&partition.begin, &self.table.system)
.into_iter()
.filter(|node| *node != my_id)
.collect::<Vec<_>>();
if partition.retain {
let my_id = self.table.system.id;
let nodes = self
.table
.replication
.write_nodes(&partition.begin, &self.table.system)
.into_iter()
.filter(|node| *node != my_id)
.collect::<Vec<_>>();
debug!(
"({}) Preparing to sync {:?} with {:?}...",
self.table.name, partition, nodes
);
let root_cks = self
.root_checksum(&partition.begin, &partition.end, must_exit)
.await?;
debug!(
"({}) Preparing to sync {:?} with {:?}...",
self.table.name, partition, nodes
);
let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?;
let mut sync_futures = nodes
.iter()
.map(|node| {
self.clone().do_sync_with(
partition.clone(),
root_cks.clone(),
*node,
partition.retain,
must_exit.clone(),
)
})
.collect::<FuturesUnordered<_>>();
let mut sync_futures = nodes
.iter()
.map(|node| {
self.clone().do_sync_with(
partition.clone(),
root_cks.clone(),
*node,
must_exit.clone(),
)
})
.collect::<FuturesUnordered<_>>();
let mut n_errors = 0;
while let Some(r) = sync_futures.next().await {
if let Err(e) = r {
n_errors += 1;
warn!("({}) Sync error: {}", self.table.name, e);
let mut n_errors = 0;
while let Some(r) = sync_futures.next().await {
if let Err(e) = r {
n_errors += 1;
warn!("({}) Sync error: {}", self.table.name, e);
}
}
}
if n_errors > self.table.replication.max_write_errors() {
return Err(Error::Message(format!(
"Sync failed with too many nodes (should have been: {:?}).",
nodes
)));
}
if !partition.retain {
self.table
.delete_range(&partition.begin, &partition.end)
if n_errors > self.table.replication.max_write_errors() {
return Err(Error::Message(format!(
"Sync failed with too many nodes (should have been: {:?}).",
nodes
)));
}
} else {
self.offload_partition(&partition.begin, &partition.end, must_exit)
.await?;
}
Ok(())
}
async fn root_checksum(
// Offload partition: this partition is not something we are storing,
// so send it out to all other nodes that store it and delete items locally.
// We don't bother checking if the remote nodes already have the items,
// we just batch-send everything. Offloading isn't supposed to happen very often.
// If any of the nodes that are supposed to store the items is unable to
// save them, we interrupt the process.
async fn offload_partition(
self: &Arc<Self>,
begin: &Hash,
end: &Hash,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
let mut counter: usize = 0;
while !*must_exit.borrow() {
let mut items = Vec::new();
for item in self.table.store.range(begin.to_vec()..end.to_vec()) {
let (key, value) = item?;
items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
if items.len() >= 1024 {
break;
}
}
if items.len() > 0 {
let nodes = self
.table
.replication
.write_nodes(&begin, &self.table.system)
.into_iter()
.collect::<Vec<_>>();
if nodes.contains(&self.table.system.id) {
warn!("Interrupting offload as partitions seem to have changed");
break;
}
counter += 1;
debug!("Offloading items from {:?}..{:?} ({})", begin, end, counter);
self.offload_items(&items, &nodes[..]).await?;
} else {
break;
}
}
Ok(())
}
async fn offload_items(
self: &Arc<Self>,
items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
nodes: &[UUID],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
let update_msg = Arc::new(TableRPC::<F>::Update(values));
for res in join_all(nodes.iter().map(|to| {
self.table
.rpc_client
.call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
}))
.await
{
res?;
}
// All remote nodes have written those items, now we can delete them locally
for (k, v) in items.iter() {
self.table.store.transaction(|tx_db| {
if let Some(curv) = tx_db.get(k)? {
if curv == &v[..] {
tx_db.remove(&k[..])?;
}
}
Ok(())
})?;
}
Ok(())
}
fn root_checksum(
self: &Arc<Self>,
begin: &Hash,
end: &Hash,
must_exit: &mut watch::Receiver<bool>,
) -> Result<RangeChecksum, Error> {
for i in 1..MAX_DEPTH {
let rc = self
.range_checksum(
&SyncRange {
begin: begin.to_vec(),
end: end.to_vec(),
level: i,
},
must_exit,
)
.await?;
let rc = self.range_checksum(
&SyncRange {
begin: begin.to_vec(),
end: end.to_vec(),
level: i,
},
must_exit,
)?;
if rc.found_limit.is_none() {
return Ok(rc);
}
@ -307,7 +386,7 @@ where
)))
}
async fn range_checksum(
fn range_checksum(
self: &Arc<Self>,
range: &SyncRange,
must_exit: &mut watch::Receiver<bool>,
@ -357,9 +436,7 @@ where
};
let mut time = Instant::now();
while !*must_exit.borrow() {
let sub_ck = self
.range_checksum_cached_hash(&sub_range, must_exit)
.await?;
let sub_ck = self.range_checksum_cached_hash(&sub_range, must_exit)?;
if let Some(hash) = sub_ck.hash {
children.push((sub_range.clone(), hash));
@ -397,50 +474,48 @@ where
}
}
fn range_checksum_cached_hash<'a>(
self: &'a Arc<Self>,
range: &'a SyncRange,
must_exit: &'a mut watch::Receiver<bool>,
) -> BoxFuture<'a, Result<RangeChecksumCache, Error>> {
async move {
let mut cache = self.cache[range.level].lock().await;
fn range_checksum_cached_hash(
self: &Arc<Self>,
range: &SyncRange,
must_exit: &mut watch::Receiver<bool>,
) -> Result<RangeChecksumCache, Error> {
{
let mut cache = self.cache[range.level].lock().unwrap();
if let Some(v) = cache.get(&range) {
if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
return Ok(v.clone());
}
}
cache.remove(&range);
drop(cache);
let v = self.range_checksum(&range, must_exit).await?;
trace!(
"({}) New checksum calculated for {}-{}/{}, {} children",
self.table.name,
hex::encode(&range.begin)
.chars()
.take(16)
.collect::<String>(),
hex::encode(&range.end).chars().take(16).collect::<String>(),
range.level,
v.children.len()
);
let hash = if v.children.len() > 0 {
Some(blake2sum(&rmp_to_vec_all_named(&v)?[..]))
} else {
None
};
let cache_entry = RangeChecksumCache {
hash,
found_limit: v.found_limit,
time: v.time,
};
let mut cache = self.cache[range.level].lock().await;
cache.insert(range.clone(), cache_entry.clone());
Ok(cache_entry)
}
.boxed()
let v = self.range_checksum(&range, must_exit)?;
trace!(
"({}) New checksum calculated for {}-{}/{}, {} children",
self.table.name,
hex::encode(&range.begin)
.chars()
.take(16)
.collect::<String>(),
hex::encode(&range.end).chars().take(16).collect::<String>(),
range.level,
v.children.len()
);
let hash = if v.children.len() > 0 {
Some(blake2sum(&rmp_to_vec_all_named(&v)?[..]))
} else {
None
};
let cache_entry = RangeChecksumCache {
hash,
found_limit: v.found_limit,
time: v.time,
};
let mut cache = self.cache[range.level].lock().unwrap();
cache.insert(range.clone(), cache_entry.clone());
Ok(cache_entry)
}
async fn do_sync_with(
@ -448,7 +523,6 @@ where
partition: TodoPartition,
root_ck: RangeChecksum,
who: UUID,
retain: bool,
mut must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let mut todo = VecDeque::new();
@ -468,7 +542,7 @@ where
.await?;
if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp {
if range.level > root_ck.bounds.level {
let their_root_range_ck = self.range_checksum(&range, &mut must_exit).await?;
let their_root_range_ck = self.range_checksum(&range, &mut must_exit)?;
todo.push_back(their_root_range_ck);
} else {
todo.push_back(root_ck);
@ -498,7 +572,7 @@ where
.rpc_client
.call(
who,
TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step, retain)),
TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)),
TABLE_SYNC_RPC_TIMEOUT,
)
.await?;
@ -519,11 +593,11 @@ where
if differing.level == 0 {
items_to_send.push(differing.begin);
} else {
let checksum = self.range_checksum(&differing, &mut must_exit).await?;
let checksum = self.range_checksum(&differing, &mut must_exit)?;
todo.push_back(checksum);
}
}
if retain && diff_items.len() > 0 {
if diff_items.len() > 0 {
self.table.handle_update(&diff_items[..]).await?;
}
if items_to_send.len() > 0 {
@ -575,11 +649,11 @@ where
) -> Result<SyncRPC, Error> {
match message {
SyncRPC::GetRootChecksumRange(begin, end) => {
let root_cks = self.root_checksum(&begin, &end, &mut must_exit).await?;
let root_cks = self.root_checksum(&begin, &end, &mut must_exit)?;
Ok(SyncRPC::RootChecksumRange(root_cks.bounds))
}
SyncRPC::Checksums(checksums, retain) => {
self.handle_checksums_rpc(&checksums[..], *retain, &mut must_exit)
SyncRPC::Checksums(checksums) => {
self.handle_checksums_rpc(&checksums[..], &mut must_exit)
.await
}
_ => Err(Error::Message(format!("Unexpected sync RPC"))),
@ -589,14 +663,13 @@ where
async fn handle_checksums_rpc(
self: &Arc<Self>,
checksums: &[RangeChecksum],
retain: bool,
must_exit: &mut watch::Receiver<bool>,
) -> Result<SyncRPC, Error> {
let mut ret_ranges = vec![];
let mut ret_items = vec![];
for their_ckr in checksums.iter() {
let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit).await?;
let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit)?;
for (their_range, their_hash) in their_ckr.children.iter() {
let differs = match our_ckr
.children
@ -604,9 +677,8 @@ where
{
Err(_) => {
if their_range.level >= 1 {
let cached_hash = self
.range_checksum_cached_hash(&their_range, must_exit)
.await?;
let cached_hash =
self.range_checksum_cached_hash(&their_range, must_exit)?;
cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true)
} else {
true
@ -616,7 +688,7 @@ where
};
if differs {
ret_ranges.push(their_range.clone());
if retain && their_range.level == 0 {
if their_range.level == 0 {
if let Some(item_bytes) =
self.table.store.get(their_range.begin.as_slice())?
{
@ -640,7 +712,7 @@ where
if our_range.level > 0 {
ret_ranges.push(our_range.clone());
}
if retain && our_range.level == 0 {
if our_range.level == 0 {
if let Some(item_bytes) =
self.table.store.get(our_range.begin.as_slice())?
{
@ -673,7 +745,7 @@ where
end: vec![],
level: i,
};
let mut cache = self.cache[i].lock().await;
let mut cache = self.cache[i].lock().unwrap();
if let Some(cache_entry) = cache.range(..=needle).rev().next() {
if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key {
let index = cache_entry.0.clone();