forked from Deuxfleurs/garage
656 lines
17 KiB
Rust
656 lines
17 KiB
Rust
use std::collections::VecDeque;
|
|
use std::convert::TryInto;
|
|
use std::sync::{Arc, Mutex};
|
|
use std::time::{Duration, Instant};
|
|
|
|
use futures::future::join_all;
|
|
use futures::{pin_mut, select};
|
|
use futures_util::future::*;
|
|
use futures_util::stream::*;
|
|
use rand::Rng;
|
|
use serde::{Deserialize, Serialize};
|
|
use serde_bytes::ByteBuf;
|
|
use tokio::sync::{mpsc, watch};
|
|
|
|
use garage_rpc::ring::Ring;
|
|
use garage_util::data::*;
|
|
use garage_util::error::Error;
|
|
|
|
use crate::data::*;
|
|
use crate::merkle::*;
|
|
use crate::replication::*;
|
|
use crate::*;
|
|
|
|
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
|
|
|
|
// Do anti-entropy every 10 minutes
|
|
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
|
|
|
|
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
|
|
data: Arc<TableData<F>>,
|
|
aux: Arc<TableAux<F, R>>,
|
|
|
|
todo: Mutex<SyncTodo>,
|
|
}
|
|
|
|
type RootCk = Vec<(MerklePartition, Hash)>;
|
|
|
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
|
pub struct PartitionRange {
|
|
begin: MerklePartition,
|
|
// if end is None, go all the way to partition 0xFFFF included
|
|
end: Option<MerklePartition>,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
pub(crate) enum SyncRPC {
|
|
RootCkHash(PartitionRange, Hash),
|
|
RootCkList(PartitionRange, RootCk),
|
|
CkNoDifference,
|
|
GetNode(MerkleNodeKey),
|
|
Node(MerkleNodeKey, MerkleNode),
|
|
}
|
|
|
|
struct SyncTodo {
|
|
todo: Vec<TodoPartition>,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct TodoPartition {
|
|
range: PartitionRange,
|
|
|
|
// Are we a node that stores this partition or not?
|
|
retain: bool,
|
|
}
|
|
|
|
impl<F, R> TableSyncer<F, R>
|
|
where
|
|
F: TableSchema + 'static,
|
|
R: TableReplication + 'static,
|
|
{
|
|
pub(crate) fn launch(data: Arc<TableData<F>>, aux: Arc<TableAux<F, R>>) -> Arc<Self> {
|
|
let todo = SyncTodo { todo: vec![] };
|
|
|
|
let syncer = Arc::new(Self {
|
|
data: data.clone(),
|
|
aux: aux.clone(),
|
|
todo: Mutex::new(todo),
|
|
});
|
|
|
|
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
|
|
|
|
let s1 = syncer.clone();
|
|
aux.system.background.spawn_worker(
|
|
format!("table sync watcher for {}", data.name),
|
|
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
|
|
);
|
|
|
|
let s2 = syncer.clone();
|
|
aux.system.background.spawn_worker(
|
|
format!("table syncer for {}", data.name),
|
|
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
|
|
);
|
|
|
|
let s3 = syncer.clone();
|
|
tokio::spawn(async move {
|
|
tokio::time::delay_for(Duration::from_secs(20)).await;
|
|
s3.add_full_sync();
|
|
});
|
|
|
|
syncer
|
|
}
|
|
|
|
async fn watcher_task(
|
|
self: Arc<Self>,
|
|
mut must_exit: watch::Receiver<bool>,
|
|
mut busy_rx: mpsc::UnboundedReceiver<bool>,
|
|
) -> Result<(), Error> {
|
|
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
|
|
let mut nothing_to_do_since = Some(Instant::now());
|
|
|
|
while !*must_exit.borrow() {
|
|
let s_ring_recv = ring_recv.recv().fuse();
|
|
let s_busy = busy_rx.recv().fuse();
|
|
let s_must_exit = must_exit.recv().fuse();
|
|
let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
|
|
pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
|
|
|
|
select! {
|
|
new_ring_r = s_ring_recv => {
|
|
if new_ring_r.is_some() {
|
|
debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name);
|
|
self.add_full_sync();
|
|
}
|
|
}
|
|
busy_opt = s_busy => {
|
|
if let Some(busy) = busy_opt {
|
|
if busy {
|
|
nothing_to_do_since = None;
|
|
} else {
|
|
if nothing_to_do_since.is_none() {
|
|
nothing_to_do_since = Some(Instant::now());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
must_exit_v = s_must_exit => {
|
|
if must_exit_v.unwrap_or(false) {
|
|
break;
|
|
}
|
|
}
|
|
_ = s_timeout => {
|
|
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
|
|
nothing_to_do_since = None;
|
|
debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name);
|
|
self.add_full_sync();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub fn add_full_sync(&self) {
|
|
self.todo
|
|
.lock()
|
|
.unwrap()
|
|
.add_full_sync(&self.data, &self.aux);
|
|
}
|
|
|
|
async fn syncer_task(
|
|
self: Arc<Self>,
|
|
mut must_exit: watch::Receiver<bool>,
|
|
busy_tx: mpsc::UnboundedSender<bool>,
|
|
) -> Result<(), Error> {
|
|
while !*must_exit.borrow() {
|
|
let task = self.todo.lock().unwrap().pop_task();
|
|
if let Some(partition) = task {
|
|
busy_tx.send(true)?;
|
|
let res = self
|
|
.clone()
|
|
.sync_partition(&partition, &mut must_exit)
|
|
.await;
|
|
if let Err(e) = res {
|
|
warn!(
|
|
"({}) Error while syncing {:?}: {}",
|
|
self.data.name, partition, e
|
|
);
|
|
}
|
|
} else {
|
|
busy_tx.send(false)?;
|
|
tokio::time::delay_for(Duration::from_secs(1)).await;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn sync_partition(
|
|
self: Arc<Self>,
|
|
partition: &TodoPartition,
|
|
must_exit: &mut watch::Receiver<bool>,
|
|
) -> Result<(), Error> {
|
|
if partition.retain {
|
|
let my_id = self.aux.system.id;
|
|
|
|
let nodes = self
|
|
.aux
|
|
.replication
|
|
.write_nodes(
|
|
&hash_of_merkle_partition(partition.range.begin),
|
|
&self.aux.system,
|
|
)
|
|
.into_iter()
|
|
.filter(|node| *node != my_id)
|
|
.collect::<Vec<_>>();
|
|
|
|
debug!(
|
|
"({}) Syncing {:?} with {:?}...",
|
|
self.data.name, partition, nodes
|
|
);
|
|
let mut sync_futures = nodes
|
|
.iter()
|
|
.map(|node| {
|
|
self.clone()
|
|
.do_sync_with(partition.clone(), *node, must_exit.clone())
|
|
})
|
|
.collect::<FuturesUnordered<_>>();
|
|
|
|
let mut n_errors = 0;
|
|
while let Some(r) = sync_futures.next().await {
|
|
if let Err(e) = r {
|
|
n_errors += 1;
|
|
warn!("({}) Sync error: {}", self.data.name, e);
|
|
}
|
|
}
|
|
if n_errors > self.aux.replication.max_write_errors() {
|
|
return Err(Error::Message(format!(
|
|
"Sync failed with too many nodes (should have been: {:?}).",
|
|
nodes
|
|
)));
|
|
}
|
|
} else {
|
|
self.offload_partition(
|
|
&hash_of_merkle_partition(partition.range.begin),
|
|
&hash_of_merkle_partition_opt(partition.range.end),
|
|
must_exit,
|
|
)
|
|
.await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Offload partition: this partition is not something we are storing,
|
|
// so send it out to all other nodes that store it and delete items locally.
|
|
// We don't bother checking if the remote nodes already have the items,
|
|
// we just batch-send everything. Offloading isn't supposed to happen very often.
|
|
// If any of the nodes that are supposed to store the items is unable to
|
|
// save them, we interrupt the process.
|
|
async fn offload_partition(
|
|
self: &Arc<Self>,
|
|
begin: &Hash,
|
|
end: &Hash,
|
|
must_exit: &mut watch::Receiver<bool>,
|
|
) -> Result<(), Error> {
|
|
let mut counter: usize = 0;
|
|
|
|
while !*must_exit.borrow() {
|
|
let mut items = Vec::new();
|
|
|
|
for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
|
|
let (key, value) = item?;
|
|
items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
|
|
|
|
if items.len() >= 1024 {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if items.len() > 0 {
|
|
let nodes = self
|
|
.aux
|
|
.replication
|
|
.write_nodes(&begin, &self.aux.system)
|
|
.into_iter()
|
|
.collect::<Vec<_>>();
|
|
if nodes.contains(&self.aux.system.id) {
|
|
warn!("({}) Interrupting offload as partitions seem to have changed", self.data.name);
|
|
break;
|
|
}
|
|
if nodes.len() < self.aux.replication.write_quorum(&self.aux.system) {
|
|
return Err(Error::Message(format!("Not offloading as we don't have a quorum of nodes to write to.")));
|
|
}
|
|
|
|
counter += 1;
|
|
info!(
|
|
"({}) Offloading {} items from {:?}..{:?} ({})",
|
|
self.data.name,
|
|
items.len(),
|
|
begin,
|
|
end,
|
|
counter
|
|
);
|
|
self.offload_items(&items, &nodes[..]).await?;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn offload_items(
|
|
self: &Arc<Self>,
|
|
items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
|
|
nodes: &[UUID],
|
|
) -> Result<(), Error> {
|
|
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
|
|
let update_msg = Arc::new(TableRPC::<F>::Update(values));
|
|
|
|
for res in join_all(nodes.iter().map(|to| {
|
|
self.aux
|
|
.rpc_client
|
|
.call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
|
|
}))
|
|
.await
|
|
{
|
|
res?;
|
|
}
|
|
|
|
// All remote nodes have written those items, now we can delete them locally
|
|
let mut not_removed = 0;
|
|
for (k, v) in items.iter() {
|
|
if !self.data.delete_if_equal(&k[..], &v[..])? {
|
|
not_removed += 1;
|
|
}
|
|
}
|
|
|
|
if not_removed > 0 {
|
|
debug!("({}) {} items not removed during offload because they changed in between (trying again...)", self.data.name, not_removed);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ======
|
|
// The driver side is only concerned with sending out the item it has
|
|
// and the other side might not have. Receiving items that differ from one
|
|
// side to the other will happen when the other side syncs with us,
|
|
// which they also do regularly.
|
|
|
|
fn get_root_ck(&self, range: PartitionRange) -> Result<RootCk, Error> {
|
|
let begin = u16::from_be_bytes(range.begin);
|
|
let range_iter = match range.end {
|
|
Some(end) => {
|
|
let end = u16::from_be_bytes(end);
|
|
begin..=(end - 1)
|
|
}
|
|
None => begin..=0xFFFF,
|
|
};
|
|
|
|
let mut ret = vec![];
|
|
for i in range_iter {
|
|
let key = MerkleNodeKey {
|
|
partition: u16::to_be_bytes(i),
|
|
prefix: vec![],
|
|
};
|
|
match self.data.merkle_updater.read_node(&key)? {
|
|
MerkleNode::Empty => (),
|
|
x => {
|
|
ret.push((key.partition, hash_of(&x)?));
|
|
}
|
|
}
|
|
}
|
|
Ok(ret)
|
|
}
|
|
|
|
async fn do_sync_with(
|
|
self: Arc<Self>,
|
|
partition: TodoPartition,
|
|
who: UUID,
|
|
must_exit: watch::Receiver<bool>,
|
|
) -> Result<(), Error> {
|
|
let root_ck = self.get_root_ck(partition.range)?;
|
|
if root_ck.is_empty() {
|
|
debug!(
|
|
"({}) Sync {:?} with {:?}: partition is empty.",
|
|
self.data.name, partition, who
|
|
);
|
|
return Ok(())
|
|
}
|
|
|
|
let root_ck_hash = hash_of(&root_ck)?;
|
|
|
|
// If their root checksum has level > than us, use that as a reference
|
|
let root_resp = self
|
|
.aux
|
|
.rpc_client
|
|
.call(
|
|
who,
|
|
TableRPC::<F>::SyncRPC(SyncRPC::RootCkHash(partition.range, root_ck_hash)),
|
|
TABLE_SYNC_RPC_TIMEOUT,
|
|
)
|
|
.await?;
|
|
|
|
let mut todo = match root_resp {
|
|
TableRPC::<F>::SyncRPC(SyncRPC::CkNoDifference) => {
|
|
debug!(
|
|
"({}) Sync {:?} with {:?}: no difference",
|
|
self.data.name, partition, who
|
|
);
|
|
return Ok(());
|
|
}
|
|
TableRPC::<F>::SyncRPC(SyncRPC::RootCkList(_, their_root_ck)) => {
|
|
let join = join_ordered(&root_ck[..], &their_root_ck[..]);
|
|
let mut todo = VecDeque::new();
|
|
for (p, v1, v2) in join.iter() {
|
|
let diff = match (v1, v2) {
|
|
(Some(_), None) | (None, Some(_)) => true,
|
|
(Some(a), Some(b)) => a != b,
|
|
_ => false,
|
|
};
|
|
if diff {
|
|
todo.push_back(MerkleNodeKey {
|
|
partition: **p,
|
|
prefix: vec![],
|
|
});
|
|
}
|
|
}
|
|
debug!(
|
|
"({}) Sync {:?} with {:?}: todo.len() = {}",
|
|
self.data.name,
|
|
partition,
|
|
who,
|
|
todo.len()
|
|
);
|
|
todo
|
|
}
|
|
x => {
|
|
return Err(Error::Message(format!(
|
|
"Invalid respone to RootCkHash RPC: {}",
|
|
debug_serialize(x)
|
|
)));
|
|
}
|
|
};
|
|
|
|
let mut todo_items = vec![];
|
|
|
|
while !todo.is_empty() && !*must_exit.borrow() {
|
|
let key = todo.pop_front().unwrap();
|
|
let node = self.data.merkle_updater.read_node(&key)?;
|
|
|
|
match node {
|
|
MerkleNode::Empty => {
|
|
// They have items we don't have.
|
|
// We don't request those items from them, they will send them.
|
|
// We only bother with pushing items that differ
|
|
}
|
|
MerkleNode::Leaf(ik, ivhash) => {
|
|
// Just send that item directly
|
|
if let Some(val) = self.data.store.get(&ik[..])? {
|
|
if blake2sum(&val[..]) != ivhash {
|
|
warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik);
|
|
}
|
|
todo_items.push(val.to_vec());
|
|
} else {
|
|
warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", self.data.name, ik);
|
|
}
|
|
}
|
|
MerkleNode::Intermediate(l) => {
|
|
// Get Merkle node for this tree position at remote node
|
|
// and compare it with local node
|
|
let remote_node = match self
|
|
.aux
|
|
.rpc_client
|
|
.call(
|
|
who,
|
|
TableRPC::<F>::SyncRPC(SyncRPC::GetNode(key.clone())),
|
|
TABLE_SYNC_RPC_TIMEOUT,
|
|
)
|
|
.await?
|
|
{
|
|
TableRPC::<F>::SyncRPC(SyncRPC::Node(_, node)) => node,
|
|
x => {
|
|
return Err(Error::Message(format!(
|
|
"Invalid respone to GetNode RPC: {}",
|
|
debug_serialize(x)
|
|
)));
|
|
}
|
|
};
|
|
let int_l2 = match remote_node {
|
|
// If they have an intermediate node at this tree position,
|
|
// we can compare them to find differences
|
|
MerkleNode::Intermediate(l2) => l2,
|
|
// Otherwise, treat it as if they have nothing for this subtree,
|
|
// which will have the consequence of sending them everything
|
|
_ => vec![],
|
|
};
|
|
|
|
let join = join_ordered(&l[..], &int_l2[..]);
|
|
for (p, v1, v2) in join.into_iter() {
|
|
let diff = match (v1, v2) {
|
|
(Some(_), None) | (None, Some(_)) => true,
|
|
(Some(a), Some(b)) => a != b,
|
|
_ => false,
|
|
};
|
|
if diff {
|
|
todo.push_back(key.add_byte(*p));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if todo_items.len() >= 256 {
|
|
self.send_items(who, std::mem::replace(&mut todo_items, vec![]))
|
|
.await?;
|
|
}
|
|
}
|
|
|
|
if !todo_items.is_empty() {
|
|
self.send_items(who, todo_items).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_items(&self, who: UUID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
|
|
info!(
|
|
"({}) Sending {} items to {:?}",
|
|
self.data.name,
|
|
item_value_list.len(),
|
|
who
|
|
);
|
|
|
|
let values = item_value_list.into_iter()
|
|
.map(|x| Arc::new(ByteBuf::from(x)))
|
|
.collect::<Vec<_>>();
|
|
|
|
let rpc_resp = self
|
|
.aux
|
|
.rpc_client
|
|
.call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
|
|
.await?;
|
|
if let TableRPC::<F>::Ok = rpc_resp {
|
|
Ok(())
|
|
} else {
|
|
Err(Error::Message(format!(
|
|
"Unexpected response to RPC Update: {}",
|
|
debug_serialize(&rpc_resp)
|
|
)))
|
|
}
|
|
}
|
|
|
|
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
|
|
|
|
pub(crate) async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
|
|
match message {
|
|
SyncRPC::RootCkHash(range, h) => {
|
|
let root_ck = self.get_root_ck(*range)?;
|
|
let hash = hash_of(&root_ck)?;
|
|
if hash == *h {
|
|
Ok(SyncRPC::CkNoDifference)
|
|
} else {
|
|
Ok(SyncRPC::RootCkList(*range, root_ck))
|
|
}
|
|
}
|
|
SyncRPC::GetNode(k) => {
|
|
let node = self.data.merkle_updater.read_node(&k)?;
|
|
Ok(SyncRPC::Node(k.clone(), node))
|
|
}
|
|
_ => Err(Error::Message(format!("Unexpected sync RPC"))),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl SyncTodo {
|
|
fn add_full_sync<F: TableSchema, R: TableReplication>(
|
|
&mut self,
|
|
data: &TableData<F>,
|
|
aux: &TableAux<F, R>,
|
|
) {
|
|
let my_id = aux.system.id;
|
|
|
|
self.todo.clear();
|
|
|
|
let ring = aux.system.ring.borrow().clone();
|
|
let split_points = aux.replication.split_points(&ring);
|
|
|
|
for i in 0..split_points.len() {
|
|
let begin: MerklePartition = {
|
|
let b = split_points[i];
|
|
assert_eq!(b.as_slice()[2..], [0u8; 30][..]);
|
|
b.as_slice()[..2].try_into().unwrap()
|
|
};
|
|
|
|
let end: Option<MerklePartition> = if i + 1 < split_points.len() {
|
|
let e = split_points[i + 1];
|
|
assert_eq!(e.as_slice()[2..], [0u8; 30][..]);
|
|
Some(e.as_slice()[..2].try_into().unwrap())
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let begin_hash = hash_of_merkle_partition(begin);
|
|
let end_hash = hash_of_merkle_partition_opt(end);
|
|
|
|
let nodes = aux.replication.replication_nodes(&begin_hash, &ring);
|
|
|
|
let retain = nodes.contains(&my_id);
|
|
if !retain {
|
|
// Check if we have some data to send, otherwise skip
|
|
if data.store.range(begin_hash..end_hash).next().is_none() {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
self.todo.push(TodoPartition {
|
|
range: PartitionRange { begin, end },
|
|
retain,
|
|
});
|
|
}
|
|
}
|
|
|
|
fn pop_task(&mut self) -> Option<TodoPartition> {
|
|
if self.todo.is_empty() {
|
|
return None;
|
|
}
|
|
|
|
let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
|
|
if i == self.todo.len() - 1 {
|
|
self.todo.pop()
|
|
} else {
|
|
let replacement = self.todo.pop().unwrap();
|
|
let ret = std::mem::replace(&mut self.todo[i], replacement);
|
|
Some(ret)
|
|
}
|
|
}
|
|
}
|
|
|
|
fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
|
|
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
|
|
}
|
|
|
|
fn join_ordered<'a, K: Ord + Eq, V1, V2>(
|
|
x: &'a [(K, V1)],
|
|
y: &'a [(K, V2)],
|
|
) -> Vec<(&'a K, Option<&'a V1>, Option<&'a V2>)> {
|
|
let mut ret = vec![];
|
|
let mut i = 0;
|
|
let mut j = 0;
|
|
while i < x.len() || j < y.len() {
|
|
if i < x.len() && j < y.len() && x[i].0 == y[j].0 {
|
|
ret.push((&x[i].0, Some(&x[i].1), Some(&y[j].1)));
|
|
i += 1;
|
|
j += 1;
|
|
} else if i < x.len() && (j == y.len() || x[i].0 < y[j].0) {
|
|
ret.push((&x[i].0, Some(&x[i].1), None));
|
|
i += 1;
|
|
} else if j < y.len() && (i == x.len() || x[i].0 > y[j].0) {
|
|
ret.push((&y[j].0, None, Some(&y[j].1)));
|
|
j += 1;
|
|
} else {
|
|
unreachable!();
|
|
}
|
|
}
|
|
ret
|
|
}
|