forked from Deuxfleurs/garage
626 lines
16 KiB
Rust
626 lines
16 KiB
Rust
use std::collections::VecDeque;
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use arc_swap::ArcSwapOption;
|
|
use async_trait::async_trait;
|
|
use futures_util::stream::*;
|
|
use opentelemetry::KeyValue;
|
|
use rand::prelude::*;
|
|
use serde::{Deserialize, Serialize};
|
|
use serde_bytes::ByteBuf;
|
|
use tokio::select;
|
|
use tokio::sync::{mpsc, watch, Notify};
|
|
|
|
use garage_util::background::*;
|
|
use garage_util::data::*;
|
|
use garage_util::encode::{debug_serialize, nonversioned_encode};
|
|
use garage_util::error::{Error, OkOrMessage};
|
|
|
|
use garage_rpc::layout::*;
|
|
use garage_rpc::system::System;
|
|
use garage_rpc::*;
|
|
|
|
use crate::data::*;
|
|
use crate::merkle::*;
|
|
use crate::replication::*;
|
|
use crate::*;
|
|
|
|
// Do anti-entropy every 10 minutes
|
|
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
|
|
|
|
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
|
|
system: Arc<System>,
|
|
data: Arc<TableData<F, R>>,
|
|
merkle: Arc<MerkleUpdater<F, R>>,
|
|
|
|
add_full_sync_tx: ArcSwapOption<mpsc::UnboundedSender<()>>,
|
|
endpoint: Arc<Endpoint<SyncRpc, Self>>,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
pub(crate) enum SyncRpc {
|
|
RootCkHash(Partition, Hash),
|
|
RootCkDifferent(bool),
|
|
GetNode(MerkleNodeKey),
|
|
Node(MerkleNodeKey, MerkleNode),
|
|
Items(Vec<Arc<ByteBuf>>),
|
|
Ok,
|
|
}
|
|
|
|
impl Rpc for SyncRpc {
|
|
type Response = Result<SyncRpc, Error>;
|
|
}
|
|
|
|
impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
|
|
pub(crate) fn new(
|
|
system: Arc<System>,
|
|
data: Arc<TableData<F, R>>,
|
|
merkle: Arc<MerkleUpdater<F, R>>,
|
|
) -> Arc<Self> {
|
|
let endpoint = system
|
|
.netapp
|
|
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
|
|
|
|
let syncer = Arc::new(Self {
|
|
system,
|
|
data,
|
|
merkle,
|
|
add_full_sync_tx: ArcSwapOption::new(None),
|
|
endpoint,
|
|
});
|
|
syncer.endpoint.set_handler(syncer.clone());
|
|
|
|
syncer
|
|
}
|
|
|
|
pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
|
|
let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
|
|
self.add_full_sync_tx
|
|
.store(Some(Arc::new(add_full_sync_tx)));
|
|
|
|
bg.spawn_worker(SyncWorker {
|
|
syncer: self.clone(),
|
|
layout_notify: self.system.layout_notify(),
|
|
layout_versions: self.system.cluster_layout().sync_versions(),
|
|
add_full_sync_rx,
|
|
todo: None,
|
|
next_full_sync: Instant::now() + Duration::from_secs(20),
|
|
});
|
|
}
|
|
|
|
pub fn add_full_sync(&self) -> Result<(), Error> {
|
|
let tx = self.add_full_sync_tx.load();
|
|
let tx = tx
|
|
.as_ref()
|
|
.ok_or_message("table sync worker is not running")?;
|
|
tx.send(()).ok_or_message("send error")?;
|
|
Ok(())
|
|
}
|
|
|
|
// ----
|
|
|
|
async fn sync_partition(
|
|
self: &Arc<Self>,
|
|
partition: &SyncPartition,
|
|
must_exit: &mut watch::Receiver<bool>,
|
|
) -> Result<(), Error> {
|
|
let my_id = self.system.id;
|
|
let retain = partition.storage_nodes.contains(&my_id);
|
|
|
|
if retain {
|
|
debug!(
|
|
"({}) Syncing {:?} with {:?}...",
|
|
F::TABLE_NAME,
|
|
partition,
|
|
partition.storage_nodes
|
|
);
|
|
let mut sync_futures = partition
|
|
.storage_nodes
|
|
.iter()
|
|
.filter(|node| **node != my_id)
|
|
.map(|node| {
|
|
self.clone()
|
|
.do_sync_with(&partition, *node, must_exit.clone())
|
|
})
|
|
.collect::<FuturesUnordered<_>>();
|
|
|
|
let mut n_errors = 0;
|
|
while let Some(r) = sync_futures.next().await {
|
|
if let Err(e) = r {
|
|
n_errors += 1;
|
|
warn!("({}) Sync error: {}", F::TABLE_NAME, e);
|
|
}
|
|
}
|
|
if n_errors > 0 {
|
|
return Err(Error::Message(format!(
|
|
"Sync failed with {} nodes.",
|
|
n_errors
|
|
)));
|
|
}
|
|
} else {
|
|
self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit)
|
|
.await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Offload partition: this partition is not something we are storing,
|
|
// so send it out to all other nodes that store it and delete items locally.
|
|
// We don't bother checking if the remote nodes already have the items,
|
|
// we just batch-send everything. Offloading isn't supposed to happen very often.
|
|
// If any of the nodes that are supposed to store the items is unable to
|
|
// save them, we interrupt the process.
|
|
async fn offload_partition(
|
|
self: &Arc<Self>,
|
|
begin: &Hash,
|
|
end: &Hash,
|
|
must_exit: &mut watch::Receiver<bool>,
|
|
) -> Result<(), Error> {
|
|
let mut counter: usize = 0;
|
|
|
|
while !*must_exit.borrow() {
|
|
let mut items = Vec::new();
|
|
|
|
for item in self.data.store.range(begin.to_vec()..end.to_vec())? {
|
|
let (key, value) = item?;
|
|
items.push((key.to_vec(), Arc::new(ByteBuf::from(value))));
|
|
|
|
if items.len() >= 1024 {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if !items.is_empty() {
|
|
let nodes = self
|
|
.data
|
|
.replication
|
|
.write_nodes(begin)
|
|
.into_iter()
|
|
.collect::<Vec<_>>();
|
|
if nodes.contains(&self.system.id) {
|
|
warn!(
|
|
"({}) Interrupting offload as partitions seem to have changed",
|
|
F::TABLE_NAME
|
|
);
|
|
break;
|
|
}
|
|
if nodes.len() < self.data.replication.write_quorum() {
|
|
return Err(Error::Message(
|
|
"Not offloading as we don't have a quorum of nodes to write to."
|
|
.to_string(),
|
|
));
|
|
}
|
|
|
|
counter += 1;
|
|
info!(
|
|
"({}) Offloading {} items from {:?}..{:?} ({})",
|
|
F::TABLE_NAME,
|
|
items.len(),
|
|
begin,
|
|
end,
|
|
counter
|
|
);
|
|
self.offload_items(&items, &nodes[..]).await?;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn offload_items(
|
|
self: &Arc<Self>,
|
|
items: &[(Vec<u8>, Arc<ByteBuf>)],
|
|
nodes: &[Uuid],
|
|
) -> Result<(), Error> {
|
|
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
|
|
|
|
for to in nodes.iter() {
|
|
self.data.metrics.sync_items_sent.add(
|
|
values.len() as u64,
|
|
&[
|
|
KeyValue::new("table_name", F::TABLE_NAME),
|
|
KeyValue::new("to", format!("{:?}", to)),
|
|
],
|
|
);
|
|
}
|
|
|
|
self.system
|
|
.rpc_helper()
|
|
.try_call_many(
|
|
&self.endpoint,
|
|
nodes,
|
|
SyncRpc::Items(values),
|
|
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
|
|
)
|
|
.await?;
|
|
|
|
// All remote nodes have written those items, now we can delete them locally
|
|
let mut not_removed = 0;
|
|
for (k, v) in items.iter() {
|
|
if !self.data.delete_if_equal(&k[..], &v[..])? {
|
|
not_removed += 1;
|
|
}
|
|
}
|
|
|
|
if not_removed > 0 {
|
|
debug!("({}) {} items not removed during offload because they changed in between (trying again...)", F::TABLE_NAME, not_removed);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ======
|
|
// The driver side is only concerned with sending out the item it has
|
|
// and the other side might not have. Receiving items that differ from one
|
|
// side to the other will happen when the other side syncs with us,
|
|
// which they also do regularly.
|
|
|
|
fn get_root_ck(&self, partition: Partition) -> Result<(MerkleNodeKey, MerkleNode), Error> {
|
|
let key = MerkleNodeKey {
|
|
partition,
|
|
prefix: vec![],
|
|
};
|
|
let node = self.merkle.read_node(&key)?;
|
|
Ok((key, node))
|
|
}
|
|
|
|
async fn do_sync_with(
|
|
self: Arc<Self>,
|
|
partition: &SyncPartition,
|
|
who: Uuid,
|
|
must_exit: watch::Receiver<bool>,
|
|
) -> Result<(), Error> {
|
|
let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
|
|
if root_ck.is_empty() {
|
|
debug!(
|
|
"({}) Sync {:?} with {:?}: partition is empty.",
|
|
F::TABLE_NAME,
|
|
partition,
|
|
who
|
|
);
|
|
return Ok(());
|
|
}
|
|
let root_ck_hash = hash_of_merkle_node(&root_ck)?;
|
|
|
|
// Check if they have the same root checksum
|
|
// If so, do nothing.
|
|
let root_resp = self
|
|
.system
|
|
.rpc_helper()
|
|
.call(
|
|
&self.endpoint,
|
|
who,
|
|
SyncRpc::RootCkHash(partition.partition, root_ck_hash),
|
|
RequestStrategy::with_priority(PRIO_BACKGROUND),
|
|
)
|
|
.await?;
|
|
|
|
let mut todo = match root_resp {
|
|
SyncRpc::RootCkDifferent(false) => {
|
|
debug!(
|
|
"({}) Sync {:?} with {:?}: no difference",
|
|
F::TABLE_NAME,
|
|
partition,
|
|
who
|
|
);
|
|
return Ok(());
|
|
}
|
|
SyncRpc::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]),
|
|
x => {
|
|
return Err(Error::Message(format!(
|
|
"Invalid respone to RootCkHash RPC: {}",
|
|
debug_serialize(x)
|
|
)));
|
|
}
|
|
};
|
|
|
|
let mut todo_items = vec![];
|
|
|
|
while !todo.is_empty() && !*must_exit.borrow() {
|
|
let key = todo.pop_front().unwrap();
|
|
let node = self.merkle.read_node(&key)?;
|
|
|
|
match node {
|
|
MerkleNode::Empty => {
|
|
// They have items we don't have.
|
|
// We don't request those items from them, they will send them.
|
|
// We only bother with pushing items that differ
|
|
}
|
|
MerkleNode::Leaf(ik, ivhash) => {
|
|
// Just send that item directly
|
|
if let Some(val) = self.data.store.get(&ik[..])? {
|
|
if blake2sum(&val[..]) != ivhash {
|
|
debug!("({}) Hashes differ between stored value and Merkle tree, key: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik));
|
|
}
|
|
todo_items.push(val.to_vec());
|
|
} else {
|
|
debug!("({}) Item from Merkle tree not found in store: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik));
|
|
}
|
|
}
|
|
MerkleNode::Intermediate(l) => {
|
|
// Get Merkle node for this tree position at remote node
|
|
// and compare it with local node
|
|
let remote_node = match self
|
|
.system
|
|
.rpc_helper()
|
|
.call(
|
|
&self.endpoint,
|
|
who,
|
|
SyncRpc::GetNode(key.clone()),
|
|
RequestStrategy::with_priority(PRIO_BACKGROUND),
|
|
)
|
|
.await?
|
|
{
|
|
SyncRpc::Node(_, node) => node,
|
|
x => {
|
|
return Err(Error::Message(format!(
|
|
"Invalid respone to GetNode RPC: {}",
|
|
debug_serialize(x)
|
|
)));
|
|
}
|
|
};
|
|
let int_l2 = match remote_node {
|
|
// If they have an intermediate node at this tree position,
|
|
// we can compare them to find differences
|
|
MerkleNode::Intermediate(l2) => l2,
|
|
// Otherwise, treat it as if they have nothing for this subtree,
|
|
// which will have the consequence of sending them everything
|
|
_ => vec![],
|
|
};
|
|
|
|
let join = join_ordered(&l[..], &int_l2[..]);
|
|
for (p, v1, v2) in join.into_iter() {
|
|
let diff = match (v1, v2) {
|
|
(Some(_), None) | (None, Some(_)) => true,
|
|
(Some(a), Some(b)) => a != b,
|
|
_ => false,
|
|
};
|
|
if diff {
|
|
todo.push_back(key.add_byte(*p));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if todo_items.len() >= 256 {
|
|
self.send_items(who, std::mem::take(&mut todo_items))
|
|
.await?;
|
|
}
|
|
}
|
|
|
|
if !todo_items.is_empty() {
|
|
self.send_items(who, todo_items).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
|
|
info!(
|
|
"({}) Sending {} items to {:?}",
|
|
F::TABLE_NAME,
|
|
item_value_list.len(),
|
|
who
|
|
);
|
|
|
|
let values = item_value_list
|
|
.into_iter()
|
|
.map(|x| Arc::new(ByteBuf::from(x)))
|
|
.collect::<Vec<_>>();
|
|
|
|
self.data.metrics.sync_items_sent.add(
|
|
values.len() as u64,
|
|
&[
|
|
KeyValue::new("table_name", F::TABLE_NAME),
|
|
KeyValue::new("to", format!("{:?}", who)),
|
|
],
|
|
);
|
|
|
|
let rpc_resp = self
|
|
.system
|
|
.rpc_helper()
|
|
.call(
|
|
&self.endpoint,
|
|
who,
|
|
SyncRpc::Items(values),
|
|
RequestStrategy::with_priority(PRIO_BACKGROUND),
|
|
)
|
|
.await?;
|
|
if let SyncRpc::Ok = rpc_resp {
|
|
Ok(())
|
|
} else {
|
|
Err(Error::unexpected_rpc_message(rpc_resp))
|
|
}
|
|
}
|
|
}
|
|
|
|
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
|
|
|
|
#[async_trait]
|
|
impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
|
|
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
|
|
match message {
|
|
SyncRpc::RootCkHash(range, h) => {
|
|
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
|
|
let hash = hash_of_merkle_node(&root_ck)?;
|
|
Ok(SyncRpc::RootCkDifferent(hash != *h))
|
|
}
|
|
SyncRpc::GetNode(k) => {
|
|
let node = self.merkle.read_node(k)?;
|
|
Ok(SyncRpc::Node(k.clone(), node))
|
|
}
|
|
SyncRpc::Items(items) => {
|
|
self.data.metrics.sync_items_received.add(
|
|
items.len() as u64,
|
|
&[
|
|
KeyValue::new("table_name", F::TABLE_NAME),
|
|
KeyValue::new(
|
|
"from",
|
|
format!("{:?}", Uuid::try_from(from.as_ref()).unwrap()),
|
|
),
|
|
],
|
|
);
|
|
|
|
self.data.update_many(items)?;
|
|
Ok(SyncRpc::Ok)
|
|
}
|
|
m => Err(Error::unexpected_rpc_message(m)),
|
|
}
|
|
}
|
|
}
|
|
|
|
// -------- Sync Worker ---------
|
|
|
|
struct SyncWorker<F: TableSchema, R: TableReplication> {
|
|
syncer: Arc<TableSyncer<F, R>>,
|
|
|
|
layout_notify: Arc<Notify>,
|
|
layout_versions: (u64, u64, u64),
|
|
|
|
add_full_sync_rx: mpsc::UnboundedReceiver<()>,
|
|
next_full_sync: Instant,
|
|
|
|
todo: Option<SyncPartitions>,
|
|
}
|
|
|
|
impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
|
|
fn check_add_full_sync(&mut self) {
|
|
let layout_versions = self.syncer.system.cluster_layout().sync_versions();
|
|
if layout_versions != self.layout_versions {
|
|
self.layout_versions = layout_versions;
|
|
info!(
|
|
"({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list",
|
|
F::TABLE_NAME,
|
|
layout_versions.0,
|
|
layout_versions.1,
|
|
layout_versions.2
|
|
);
|
|
self.add_full_sync();
|
|
}
|
|
}
|
|
|
|
fn add_full_sync(&mut self) {
|
|
let mut partitions = self.syncer.data.replication.sync_partitions();
|
|
info!(
|
|
"{}: Adding full sync for ack layout version {}",
|
|
F::TABLE_NAME,
|
|
partitions.layout_version
|
|
);
|
|
|
|
partitions.partitions.shuffle(&mut thread_rng());
|
|
self.todo = Some(partitions);
|
|
self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
|
|
fn name(&self) -> String {
|
|
format!("{} sync", F::TABLE_NAME)
|
|
}
|
|
|
|
fn status(&self) -> WorkerStatus {
|
|
WorkerStatus {
|
|
queue_length: Some(self.todo.as_ref().map(|x| x.partitions.len()).unwrap_or(0) as u64),
|
|
..Default::default()
|
|
}
|
|
}
|
|
|
|
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
|
|
self.check_add_full_sync();
|
|
|
|
if let Some(todo) = &mut self.todo {
|
|
let partition = todo.partitions.pop().unwrap();
|
|
|
|
// process partition
|
|
if let Err(e) = self.syncer.sync_partition(&partition, must_exit).await {
|
|
error!(
|
|
"{}: Failed to sync partition {:?}: {}",
|
|
F::TABLE_NAME,
|
|
partition,
|
|
e
|
|
);
|
|
// if error, put partition back at the other side of the queue,
|
|
// so that other partitions will be tried in the meantime
|
|
todo.partitions.insert(0, partition);
|
|
// TODO: returning an error here will cause the background job worker
|
|
// to delay this task for some time, but maybe we don't want to
|
|
// delay it if there are lots of failures from nodes that are gone
|
|
// (we also don't want zero delays as that will cause lots of useless retries)
|
|
return Err(e);
|
|
}
|
|
|
|
if todo.partitions.is_empty() {
|
|
info!(
|
|
"{}: Completed full sync for ack layout version {}",
|
|
F::TABLE_NAME,
|
|
todo.layout_version
|
|
);
|
|
self.syncer
|
|
.system
|
|
.layout_manager
|
|
.sync_table_until(F::TABLE_NAME, todo.layout_version);
|
|
self.todo = None;
|
|
}
|
|
|
|
Ok(WorkerState::Busy)
|
|
} else {
|
|
Ok(WorkerState::Idle)
|
|
}
|
|
}
|
|
|
|
async fn wait_for_work(&mut self) -> WorkerState {
|
|
select! {
|
|
s = self.add_full_sync_rx.recv() => {
|
|
if let Some(()) = s {
|
|
self.add_full_sync();
|
|
}
|
|
},
|
|
_ = self.layout_notify.notified() => {
|
|
self.check_add_full_sync();
|
|
},
|
|
_ = tokio::time::sleep_until(self.next_full_sync.into()) => {
|
|
self.add_full_sync();
|
|
}
|
|
}
|
|
match self.todo.is_some() {
|
|
true => WorkerState::Busy,
|
|
false => WorkerState::Idle,
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---- UTIL ----
|
|
|
|
fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> {
|
|
Ok(blake2sum(&nonversioned_encode(x)?[..]))
|
|
}
|
|
|
|
fn join_ordered<'a, K: Ord + Eq, V1, V2>(
|
|
x: &'a [(K, V1)],
|
|
y: &'a [(K, V2)],
|
|
) -> Vec<(&'a K, Option<&'a V1>, Option<&'a V2>)> {
|
|
let mut ret = vec![];
|
|
let mut i = 0;
|
|
let mut j = 0;
|
|
while i < x.len() || j < y.len() {
|
|
if i < x.len() && j < y.len() && x[i].0 == y[j].0 {
|
|
ret.push((&x[i].0, Some(&x[i].1), Some(&y[j].1)));
|
|
i += 1;
|
|
j += 1;
|
|
} else if i < x.len() && (j == y.len() || x[i].0 < y[j].0) {
|
|
ret.push((&x[i].0, Some(&x[i].1), None));
|
|
i += 1;
|
|
} else if j < y.len() && (i == x.len() || x[i].0 > y[j].0) {
|
|
ret.push((&y[j].0, None, Some(&y[j].1)));
|
|
j += 1;
|
|
} else {
|
|
unreachable!();
|
|
}
|
|
}
|
|
ret
|
|
}
|