garage/src/table/sync.rs
2021-03-12 21:52:19 +01:00

692 lines
17 KiB
Rust

use std::collections::VecDeque;
use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use futures::{pin_mut, select};
use futures_util::future::*;
use futures_util::stream::*;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use tokio::sync::{mpsc, watch};
use garage_util::data::*;
use garage_util::error::Error;
use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use crate::data::*;
use crate::merkle::*;
use crate::replication::*;
use crate::*;
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
data: Arc<TableData<F>>,
aux: Arc<TableAux<R>>,
todo: Mutex<SyncTodo>,
rpc_client: Arc<RpcClient<SyncRPC>>,
}
type RootCk = Vec<(MerklePartition, Hash)>;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct PartitionRange {
begin: MerklePartition,
// if end is None, go all the way to partition 0xFFFF included
end: Option<MerklePartition>,
}
#[derive(Serialize, Deserialize)]
pub(crate) enum SyncRPC {
RootCkHash(PartitionRange, Hash),
RootCkList(PartitionRange, RootCk),
CkNoDifference,
GetNode(MerkleNodeKey),
Node(MerkleNodeKey, MerkleNode),
Items(Vec<Arc<ByteBuf>>),
Ok,
}
impl RpcMessage for SyncRPC {}
struct SyncTodo {
todo: Vec<TodoPartition>,
}
#[derive(Debug, Clone)]
struct TodoPartition {
range: PartitionRange,
// Are we a node that stores this partition or not?
retain: bool,
}
impl<F, R> TableSyncer<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
pub(crate) fn launch(
data: Arc<TableData<F>>,
aux: Arc<TableAux<R>>,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rpc_path = format!("table_{}/sync", data.name);
let rpc_client = aux.system.rpc_client::<SyncRPC>(&rpc_path);
let todo = SyncTodo { todo: vec![] };
let syncer = Arc::new(Self {
data: data.clone(),
aux: aux.clone(),
todo: Mutex::new(todo),
rpc_client,
});
syncer.register_handler(rpc_server, rpc_path);
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone();
aux.system.background.spawn_worker(
format!("table sync watcher for {}", data.name),
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
);
let s2 = syncer.clone();
aux.system.background.spawn_worker(
format!("table syncer for {}", data.name),
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
);
let s3 = syncer.clone();
tokio::spawn(async move {
tokio::time::delay_for(Duration::from_secs(20)).await;
s3.add_full_sync();
});
syncer
}
fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
let self2 = self.clone();
rpc_server.add_handler::<SyncRPC, _, _>(path, move |msg, _addr| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
let self2 = self.clone();
self.rpc_client
.set_local_handler(self.aux.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
}
async fn watcher_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
mut busy_rx: mpsc::UnboundedReceiver<bool>,
) -> Result<(), Error> {
let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
let s_ring_recv = ring_recv.recv().fuse();
let s_busy = busy_rx.recv().fuse();
let s_must_exit = must_exit.recv().fuse();
let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
select! {
new_ring_r = s_ring_recv => {
if let Some(new_ring) = new_ring_r {
if !Arc::ptr_eq(&new_ring, &prev_ring) {
debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name);
self.add_full_sync();
prev_ring = new_ring;
}
}
}
busy_opt = s_busy => {
if let Some(busy) = busy_opt {
if busy {
nothing_to_do_since = None;
} else {
if nothing_to_do_since.is_none() {
nothing_to_do_since = Some(Instant::now());
}
}
}
}
must_exit_v = s_must_exit => {
if must_exit_v.unwrap_or(false) {
break;
}
}
_ = s_timeout => {
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name);
self.add_full_sync();
}
}
}
}
Ok(())
}
pub fn add_full_sync(&self) {
self.todo
.lock()
.unwrap()
.add_full_sync(&self.data, &self.aux);
}
async fn syncer_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
busy_tx: mpsc::UnboundedSender<bool>,
) -> Result<(), Error> {
while !*must_exit.borrow() {
let task = self.todo.lock().unwrap().pop_task();
if let Some(partition) = task {
busy_tx.send(true)?;
let res = self
.clone()
.sync_partition(&partition, &mut must_exit)
.await;
if let Err(e) = res {
warn!(
"({}) Error while syncing {:?}: {}",
self.data.name, partition, e
);
}
} else {
busy_tx.send(false)?;
tokio::time::delay_for(Duration::from_secs(1)).await;
}
}
Ok(())
}
async fn sync_partition(
self: Arc<Self>,
partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
if partition.retain {
let my_id = self.aux.system.id;
let nodes = self
.aux
.replication
.write_nodes(
&hash_of_merkle_partition(partition.range.begin),
&self.aux.system,
)
.into_iter()
.filter(|node| *node != my_id)
.collect::<Vec<_>>();
debug!(
"({}) Syncing {:?} with {:?}...",
self.data.name, partition, nodes
);
let mut sync_futures = nodes
.iter()
.map(|node| {
self.clone()
.do_sync_with(partition.clone(), *node, must_exit.clone())
})
.collect::<FuturesUnordered<_>>();
let mut n_errors = 0;
while let Some(r) = sync_futures.next().await {
if let Err(e) = r {
n_errors += 1;
warn!("({}) Sync error: {}", self.data.name, e);
}
}
if n_errors > self.aux.replication.max_write_errors() {
return Err(Error::Message(format!(
"Sync failed with too many nodes (should have been: {:?}).",
nodes
)));
}
} else {
self.offload_partition(
&hash_of_merkle_partition(partition.range.begin),
&hash_of_merkle_partition_opt(partition.range.end),
must_exit,
)
.await?;
}
Ok(())
}
// Offload partition: this partition is not something we are storing,
// so send it out to all other nodes that store it and delete items locally.
// We don't bother checking if the remote nodes already have the items,
// we just batch-send everything. Offloading isn't supposed to happen very often.
// If any of the nodes that are supposed to store the items is unable to
// save them, we interrupt the process.
async fn offload_partition(
self: &Arc<Self>,
begin: &Hash,
end: &Hash,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
let mut counter: usize = 0;
while !*must_exit.borrow() {
let mut items = Vec::new();
for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
let (key, value) = item?;
items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
if items.len() >= 1024 {
break;
}
}
if items.len() > 0 {
let nodes = self
.aux
.replication
.write_nodes(&begin, &self.aux.system)
.into_iter()
.collect::<Vec<_>>();
if nodes.contains(&self.aux.system.id) {
warn!(
"({}) Interrupting offload as partitions seem to have changed",
self.data.name
);
break;
}
if nodes.len() < self.aux.replication.write_quorum(&self.aux.system) {
return Err(Error::Message(format!(
"Not offloading as we don't have a quorum of nodes to write to."
)));
}
counter += 1;
info!(
"({}) Offloading {} items from {:?}..{:?} ({})",
self.data.name,
items.len(),
begin,
end,
counter
);
self.offload_items(&items, &nodes[..]).await?;
} else {
break;
}
}
Ok(())
}
async fn offload_items(
self: &Arc<Self>,
items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
nodes: &[UUID],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
self.rpc_client
.try_call_many(
&nodes[..],
SyncRPC::Items(values),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
)
.await?;
// All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0;
for (k, v) in items.iter() {
if !self.data.delete_if_equal(&k[..], &v[..])? {
not_removed += 1;
}
}
if not_removed > 0 {
debug!("({}) {} items not removed during offload because they changed in between (trying again...)", self.data.name, not_removed);
}
Ok(())
}
// ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ======
// The driver side is only concerned with sending out the item it has
// and the other side might not have. Receiving items that differ from one
// side to the other will happen when the other side syncs with us,
// which they also do regularly.
fn get_root_ck(&self, range: PartitionRange) -> Result<RootCk, Error> {
let begin = u16::from_be_bytes(range.begin);
let range_iter = match range.end {
Some(end) => {
let end = u16::from_be_bytes(end);
begin..=(end - 1)
}
None => begin..=0xFFFF,
};
let mut ret = vec![];
for i in range_iter {
let key = MerkleNodeKey {
partition: u16::to_be_bytes(i),
prefix: vec![],
};
match self.data.merkle_updater.read_node(&key)? {
MerkleNode::Empty => (),
x => {
ret.push((key.partition, hash_of(&x)?));
}
}
}
Ok(ret)
}
async fn do_sync_with(
self: Arc<Self>,
partition: TodoPartition,
who: UUID,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let root_ck = self.get_root_ck(partition.range)?;
if root_ck.is_empty() {
debug!(
"({}) Sync {:?} with {:?}: partition is empty.",
self.data.name, partition, who
);
return Ok(());
}
let root_ck_hash = hash_of(&root_ck)?;
// If their root checksum has level > than us, use that as a reference
let root_resp = self
.rpc_client
.call(
who,
SyncRPC::RootCkHash(partition.range, root_ck_hash),
TABLE_SYNC_RPC_TIMEOUT,
)
.await?;
let mut todo = match root_resp {
SyncRPC::CkNoDifference => {
debug!(
"({}) Sync {:?} with {:?}: no difference",
self.data.name, partition, who
);
return Ok(());
}
SyncRPC::RootCkList(_, their_root_ck) => {
let join = join_ordered(&root_ck[..], &their_root_ck[..]);
let mut todo = VecDeque::new();
for (p, v1, v2) in join.iter() {
let diff = match (v1, v2) {
(Some(_), None) | (None, Some(_)) => true,
(Some(a), Some(b)) => a != b,
_ => false,
};
if diff {
todo.push_back(MerkleNodeKey {
partition: **p,
prefix: vec![],
});
}
}
debug!(
"({}) Sync {:?} with {:?}: todo.len() = {}",
self.data.name,
partition,
who,
todo.len()
);
todo
}
x => {
return Err(Error::Message(format!(
"Invalid respone to RootCkHash RPC: {}",
debug_serialize(x)
)));
}
};
let mut todo_items = vec![];
while !todo.is_empty() && !*must_exit.borrow() {
let key = todo.pop_front().unwrap();
let node = self.data.merkle_updater.read_node(&key)?;
match node {
MerkleNode::Empty => {
// They have items we don't have.
// We don't request those items from them, they will send them.
// We only bother with pushing items that differ
}
MerkleNode::Leaf(ik, ivhash) => {
// Just send that item directly
if let Some(val) = self.data.store.get(&ik[..])? {
if blake2sum(&val[..]) != ivhash {
warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik);
}
todo_items.push(val.to_vec());
} else {
warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik);
}
}
MerkleNode::Intermediate(l) => {
// Get Merkle node for this tree position at remote node
// and compare it with local node
let remote_node = match self
.rpc_client
.call(who, SyncRPC::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT)
.await?
{
SyncRPC::Node(_, node) => node,
x => {
return Err(Error::Message(format!(
"Invalid respone to GetNode RPC: {}",
debug_serialize(x)
)));
}
};
let int_l2 = match remote_node {
// If they have an intermediate node at this tree position,
// we can compare them to find differences
MerkleNode::Intermediate(l2) => l2,
// Otherwise, treat it as if they have nothing for this subtree,
// which will have the consequence of sending them everything
_ => vec![],
};
let join = join_ordered(&l[..], &int_l2[..]);
for (p, v1, v2) in join.into_iter() {
let diff = match (v1, v2) {
(Some(_), None) | (None, Some(_)) => true,
(Some(a), Some(b)) => a != b,
_ => false,
};
if diff {
todo.push_back(key.add_byte(*p));
}
}
}
}
if todo_items.len() >= 256 {
self.send_items(who, std::mem::replace(&mut todo_items, vec![]))
.await?;
}
}
if !todo_items.is_empty() {
self.send_items(who, todo_items).await?;
}
Ok(())
}
async fn send_items(&self, who: UUID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
self.data.name,
item_value_list.len(),
who
);
let values = item_value_list
.into_iter()
.map(|x| Arc::new(ByteBuf::from(x)))
.collect::<Vec<_>>();
let rpc_resp = self
.rpc_client
.call(who, SyncRPC::Items(values), TABLE_SYNC_RPC_TIMEOUT)
.await?;
if let SyncRPC::Ok = rpc_resp {
Ok(())
} else {
Err(Error::Message(format!(
"Unexpected response to RPC Update: {}",
debug_serialize(&rpc_resp)
)))
}
}
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
match message {
SyncRPC::RootCkHash(range, h) => {
let root_ck = self.get_root_ck(*range)?;
let hash = hash_of(&root_ck)?;
if hash == *h {
Ok(SyncRPC::CkNoDifference)
} else {
Ok(SyncRPC::RootCkList(*range, root_ck))
}
}
SyncRPC::GetNode(k) => {
let node = self.data.merkle_updater.read_node(&k)?;
Ok(SyncRPC::Node(k.clone(), node))
}
SyncRPC::Items(items) => {
self.data.update_many(items)?;
Ok(SyncRPC::Ok)
}
_ => Err(Error::Message(format!("Unexpected sync RPC"))),
}
}
}
impl SyncTodo {
fn add_full_sync<F: TableSchema, R: TableReplication>(
&mut self,
data: &TableData<F>,
aux: &TableAux<R>,
) {
let my_id = aux.system.id;
self.todo.clear();
let ring = aux.system.ring.borrow().clone();
let split_points = aux.replication.split_points(&ring);
for i in 0..split_points.len() {
let begin: MerklePartition = {
let b = split_points[i];
assert_eq!(b.as_slice()[2..], [0u8; 30][..]);
b.as_slice()[..2].try_into().unwrap()
};
let end: Option<MerklePartition> = if i + 1 < split_points.len() {
let e = split_points[i + 1];
assert_eq!(e.as_slice()[2..], [0u8; 30][..]);
Some(e.as_slice()[..2].try_into().unwrap())
} else {
None
};
let begin_hash = hash_of_merkle_partition(begin);
let end_hash = hash_of_merkle_partition_opt(end);
let nodes = aux.replication.replication_nodes(&begin_hash, &ring);
let retain = nodes.contains(&my_id);
if !retain {
// Check if we have some data to send, otherwise skip
if data.store.range(begin_hash..end_hash).next().is_none() {
continue;
}
}
self.todo.push(TodoPartition {
range: PartitionRange { begin, end },
retain,
});
}
}
fn pop_task(&mut self) -> Option<TodoPartition> {
if self.todo.is_empty() {
return None;
}
let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
if i == self.todo.len() - 1 {
self.todo.pop()
} else {
let replacement = self.todo.pop().unwrap();
let ret = std::mem::replace(&mut self.todo[i], replacement);
Some(ret)
}
}
}
fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
}
fn join_ordered<'a, K: Ord + Eq, V1, V2>(
x: &'a [(K, V1)],
y: &'a [(K, V2)],
) -> Vec<(&'a K, Option<&'a V1>, Option<&'a V2>)> {
let mut ret = vec![];
let mut i = 0;
let mut j = 0;
while i < x.len() || j < y.len() {
if i < x.len() && j < y.len() && x[i].0 == y[j].0 {
ret.push((&x[i].0, Some(&x[i].1), Some(&y[j].1)));
i += 1;
j += 1;
} else if i < x.len() && (j == y.len() || x[i].0 < y[j].0) {
ret.push((&x[i].0, Some(&x[i].1), None));
i += 1;
} else if j < y.len() && (i == x.len() || x[i].0 > y[j].0) {
ret.push((&y[j].0, None, Some(&y[j].1)));
j += 1;
} else {
unreachable!();
}
}
ret
}