(not well tested) use merkle tree for sync
This commit is contained in:
parent
94f3d28774
commit
046b649bcc
11 changed files with 765 additions and 985 deletions
|
@ -25,26 +25,11 @@ impl Repair {
|
||||||
|
|
||||||
if todo(RepairWhat::Tables) {
|
if todo(RepairWhat::Tables) {
|
||||||
info!("Launching a full sync of tables");
|
info!("Launching a full sync of tables");
|
||||||
self.garage
|
self.garage.bucket_table.syncer.add_full_sync();
|
||||||
.bucket_table
|
self.garage.object_table.syncer.add_full_sync();
|
||||||
.syncer
|
self.garage.version_table.syncer.add_full_sync();
|
||||||
.add_full_scan();
|
self.garage.block_ref_table.syncer.add_full_sync();
|
||||||
self.garage
|
self.garage.key_table.syncer.add_full_sync();
|
||||||
.object_table
|
|
||||||
.syncer
|
|
||||||
.add_full_scan();
|
|
||||||
self.garage
|
|
||||||
.version_table
|
|
||||||
.syncer
|
|
||||||
.add_full_scan();
|
|
||||||
self.garage
|
|
||||||
.block_ref_table
|
|
||||||
.syncer
|
|
||||||
.add_full_scan();
|
|
||||||
self.garage
|
|
||||||
.key_table
|
|
||||||
.syncer
|
|
||||||
.add_full_scan();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: wait for full sync to finish before proceeding to the rest?
|
// TODO: wait for full sync to finish before proceeding to the rest?
|
||||||
|
@ -78,7 +63,9 @@ impl Repair {
|
||||||
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
|
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
|
||||||
let mut pos = vec![];
|
let mut pos = vec![];
|
||||||
|
|
||||||
while let Some((item_key, item_bytes)) = self.garage.version_table.data.store.get_gt(&pos)? {
|
while let Some((item_key, item_bytes)) =
|
||||||
|
self.garage.version_table.data.store.get_gt(&pos)?
|
||||||
|
{
|
||||||
pos = item_key.to_vec();
|
pos = item_key.to_vec();
|
||||||
|
|
||||||
let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
|
let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
|
||||||
|
@ -126,7 +113,9 @@ impl Repair {
|
||||||
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
|
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
|
||||||
let mut pos = vec![];
|
let mut pos = vec![];
|
||||||
|
|
||||||
while let Some((item_key, item_bytes)) = self.garage.block_ref_table.data.store.get_gt(&pos)? {
|
while let Some((item_key, item_bytes)) =
|
||||||
|
self.garage.block_ref_table.data.store.get_gt(&pos)?
|
||||||
|
{
|
||||||
pos = item_key.to_vec();
|
pos = item_key.to_vec();
|
||||||
|
|
||||||
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
|
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
|
||||||
|
|
|
@ -7,8 +7,8 @@ use garage_rpc::membership::System;
|
||||||
use garage_rpc::rpc_client::RpcHttpClient;
|
use garage_rpc::rpc_client::RpcHttpClient;
|
||||||
use garage_rpc::rpc_server::RpcServer;
|
use garage_rpc::rpc_server::RpcServer;
|
||||||
|
|
||||||
use garage_table::replication::sharded::*;
|
|
||||||
use garage_table::replication::fullcopy::*;
|
use garage_table::replication::fullcopy::*;
|
||||||
|
use garage_table::replication::sharded::*;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
use crate::block::*;
|
use crate::block::*;
|
||||||
|
|
|
@ -183,7 +183,7 @@ impl Ring {
|
||||||
|
|
||||||
let partition_top =
|
let partition_top =
|
||||||
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
|
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
|
||||||
assert!(partition_top & PARTITION_MASK_U16 == top & PARTITION_MASK_U16);
|
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
|
||||||
|
|
||||||
assert!(n <= partition.nodes.len());
|
assert!(n <= partition.nodes.len());
|
||||||
partition.nodes[..n].iter().cloned().collect::<Vec<_>>()
|
partition.nodes[..n].iter().cloned().collect::<Vec<_>>()
|
||||||
|
|
|
@ -1,16 +1,16 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use log::warn;
|
use log::warn;
|
||||||
use sled::Transactional;
|
|
||||||
use serde_bytes::ByteBuf;
|
use serde_bytes::ByteBuf;
|
||||||
|
use sled::Transactional;
|
||||||
|
|
||||||
|
use garage_util::background::BackgroundRunner;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
use garage_util::background::BackgroundRunner;
|
|
||||||
|
|
||||||
use crate::schema::*;
|
|
||||||
use crate::merkle::*;
|
|
||||||
use crate::crdt::CRDT;
|
use crate::crdt::CRDT;
|
||||||
|
use crate::merkle::*;
|
||||||
|
use crate::schema::*;
|
||||||
|
|
||||||
pub struct TableData<F: TableSchema> {
|
pub struct TableData<F: TableSchema> {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
|
@ -20,7 +20,10 @@ pub struct TableData<F: TableSchema> {
|
||||||
pub(crate) merkle_updater: Arc<MerkleUpdater>,
|
pub(crate) merkle_updater: Arc<MerkleUpdater>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F> TableData<F> where F: TableSchema {
|
impl<F> TableData<F>
|
||||||
|
where
|
||||||
|
F: TableSchema,
|
||||||
|
{
|
||||||
pub fn new(
|
pub fn new(
|
||||||
name: String,
|
name: String,
|
||||||
instance: F,
|
instance: F,
|
||||||
|
@ -45,7 +48,7 @@ impl<F> TableData<F> where F: TableSchema {
|
||||||
merkle_tree_store,
|
merkle_tree_store,
|
||||||
);
|
);
|
||||||
|
|
||||||
Arc::new(Self{
|
Arc::new(Self {
|
||||||
name,
|
name,
|
||||||
instance,
|
instance,
|
||||||
store,
|
store,
|
||||||
|
|
|
@ -7,11 +7,11 @@ pub mod crdt;
|
||||||
pub mod schema;
|
pub mod schema;
|
||||||
pub mod util;
|
pub mod util;
|
||||||
|
|
||||||
|
pub mod data;
|
||||||
pub mod merkle;
|
pub mod merkle;
|
||||||
pub mod replication;
|
pub mod replication;
|
||||||
pub mod data;
|
pub mod sync;
|
||||||
pub mod table;
|
pub mod table;
|
||||||
pub mod table_sync;
|
|
||||||
|
|
||||||
pub use schema::*;
|
pub use schema::*;
|
||||||
pub use table::*;
|
pub use table::*;
|
||||||
|
|
|
@ -15,6 +15,19 @@ use garage_util::background::BackgroundRunner;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
|
||||||
|
pub type MerklePartition = [u8; 2];
|
||||||
|
|
||||||
|
pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash {
|
||||||
|
let mut partition_pos = [0u8; 32];
|
||||||
|
partition_pos[0..2].copy_from_slice(&p[..]);
|
||||||
|
partition_pos.into()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn hash_of_merkle_partition_opt(p: Option<MerklePartition>) -> Hash {
|
||||||
|
p.map(hash_of_merkle_partition)
|
||||||
|
.unwrap_or([0xFFu8; 32].into())
|
||||||
|
}
|
||||||
|
|
||||||
// This modules partitions the data in 2**16 partitions, based on the top
|
// This modules partitions the data in 2**16 partitions, based on the top
|
||||||
// 16 bits (two bytes) of item's partition keys' hashes.
|
// 16 bits (two bytes) of item's partition keys' hashes.
|
||||||
// It builds one Merkle tree for each of these 2**16 partitions.
|
// It builds one Merkle tree for each of these 2**16 partitions.
|
||||||
|
@ -37,10 +50,10 @@ pub(crate) struct MerkleUpdater {
|
||||||
empty_node_hash: Hash,
|
empty_node_hash: Hash,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone, Serialize, Deserialize)]
|
||||||
pub struct MerkleNodeKey {
|
pub struct MerkleNodeKey {
|
||||||
// partition: first 16 bits (two bytes) of the partition_key's hash
|
// partition: first 16 bits (two bytes) of the partition_key's hash
|
||||||
pub partition: [u8; 2],
|
pub partition: MerklePartition,
|
||||||
|
|
||||||
// prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key)
|
// prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key)
|
||||||
pub prefix: Vec<u8>,
|
pub prefix: Vec<u8>,
|
||||||
|
@ -214,8 +227,11 @@ impl MerkleUpdater {
|
||||||
// insertion and replace current node by an intermediary node
|
// insertion and replace current node by an intermediary node
|
||||||
let (pos1, h1) = {
|
let (pos1, h1) = {
|
||||||
let key2 = key.next_key(blake2sum(&exlf_key[..]));
|
let key2 = key.next_key(blake2sum(&exlf_key[..]));
|
||||||
let subhash =
|
let subhash = self.put_node_txn(
|
||||||
self.put_node_txn(tx, &key2, &MerkleNode::Leaf(exlf_key, exlf_hash))?;
|
tx,
|
||||||
|
&key2,
|
||||||
|
&MerkleNode::Leaf(exlf_key, exlf_hash),
|
||||||
|
)?;
|
||||||
(key2.prefix[i], subhash)
|
(key2.prefix[i], subhash)
|
||||||
};
|
};
|
||||||
let (pos2, h2) = {
|
let (pos2, h2) = {
|
||||||
|
@ -280,14 +296,11 @@ impl MerkleUpdater {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Access a node in the Merkle tree, used by the sync protocol
|
// Access a node in the Merkle tree, used by the sync protocol
|
||||||
pub(crate) fn read_node(
|
pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> {
|
||||||
&self,
|
|
||||||
k: &MerkleNodeKey,
|
|
||||||
) -> Result<MerkleNode, Error> {
|
|
||||||
let ent = self.merkle_tree.get(k.encode())?;
|
let ent = self.merkle_tree.get(k.encode())?;
|
||||||
match ent {
|
match ent {
|
||||||
None => Ok(MerkleNode::Empty),
|
None => Ok(MerkleNode::Empty),
|
||||||
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?)
|
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -341,29 +354,75 @@ fn test_intermediate_aux() {
|
||||||
let mut v = vec![];
|
let mut v = vec![];
|
||||||
|
|
||||||
intermediate_set_child(&mut v, 12u8, [12u8; 32].into());
|
intermediate_set_child(&mut v, 12u8, [12u8; 32].into());
|
||||||
assert!(v == vec![(12u8, [12u8; 32].into())]);
|
assert_eq!(v, vec![(12u8, [12u8; 32].into())]);
|
||||||
|
|
||||||
intermediate_set_child(&mut v, 42u8, [42u8; 32].into());
|
intermediate_set_child(&mut v, 42u8, [42u8; 32].into());
|
||||||
assert!(v == vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]);
|
assert_eq!(
|
||||||
|
v,
|
||||||
|
vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]
|
||||||
|
);
|
||||||
|
|
||||||
intermediate_set_child(&mut v, 4u8, [4u8; 32].into());
|
intermediate_set_child(&mut v, 4u8, [4u8; 32].into());
|
||||||
assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]);
|
assert_eq!(
|
||||||
|
v,
|
||||||
|
vec![
|
||||||
|
(4u8, [4u8; 32].into()),
|
||||||
|
(12u8, [12u8; 32].into()),
|
||||||
|
(42u8, [42u8; 32].into())
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
intermediate_set_child(&mut v, 12u8, [8u8; 32].into());
|
intermediate_set_child(&mut v, 12u8, [8u8; 32].into());
|
||||||
assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]);
|
assert_eq!(
|
||||||
|
v,
|
||||||
|
vec![
|
||||||
|
(4u8, [4u8; 32].into()),
|
||||||
|
(12u8, [8u8; 32].into()),
|
||||||
|
(42u8, [42u8; 32].into())
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
intermediate_set_child(&mut v, 6u8, [6u8; 32].into());
|
intermediate_set_child(&mut v, 6u8, [6u8; 32].into());
|
||||||
assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]);
|
assert_eq!(
|
||||||
|
v,
|
||||||
|
vec![
|
||||||
|
(4u8, [4u8; 32].into()),
|
||||||
|
(6u8, [6u8; 32].into()),
|
||||||
|
(12u8, [8u8; 32].into()),
|
||||||
|
(42u8, [42u8; 32].into())
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
intermediate_rm_child(&mut v, 42u8);
|
intermediate_rm_child(&mut v, 42u8);
|
||||||
assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]);
|
assert_eq!(
|
||||||
|
v,
|
||||||
|
vec![
|
||||||
|
(4u8, [4u8; 32].into()),
|
||||||
|
(6u8, [6u8; 32].into()),
|
||||||
|
(12u8, [8u8; 32].into())
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
intermediate_rm_child(&mut v, 11u8);
|
intermediate_rm_child(&mut v, 11u8);
|
||||||
assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]);
|
assert_eq!(
|
||||||
|
v,
|
||||||
|
vec![
|
||||||
|
(4u8, [4u8; 32].into()),
|
||||||
|
(6u8, [6u8; 32].into()),
|
||||||
|
(12u8, [8u8; 32].into())
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
intermediate_rm_child(&mut v, 6u8);
|
intermediate_rm_child(&mut v, 6u8);
|
||||||
assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]);
|
assert_eq!(v, vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]);
|
||||||
|
|
||||||
intermediate_set_child(&mut v, 6u8, [7u8; 32].into());
|
intermediate_set_child(&mut v, 6u8, [7u8; 32].into());
|
||||||
assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [7u8; 32].into()), (12u8, [8u8; 32].into())]);
|
assert_eq!(
|
||||||
|
v,
|
||||||
|
vec![
|
||||||
|
(4u8, [4u8; 32].into()),
|
||||||
|
(6u8, [7u8; 32].into()),
|
||||||
|
(12u8, [8u8; 32].into())
|
||||||
|
]
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,6 @@ impl TableReplication for TableFullReplication {
|
||||||
fn split_points(&self, _ring: &Ring) -> Vec<Hash> {
|
fn split_points(&self, _ring: &Ring) -> Vec<Hash> {
|
||||||
let mut ret = vec![];
|
let mut ret = vec![];
|
||||||
ret.push([0u8; 32].into());
|
ret.push([0u8; 32].into());
|
||||||
ret.push([0xFFu8; 32].into());
|
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,11 +44,13 @@ impl TableReplication for TableShardedReplication {
|
||||||
fn split_points(&self, ring: &Ring) -> Vec<Hash> {
|
fn split_points(&self, ring: &Ring) -> Vec<Hash> {
|
||||||
let mut ret = vec![];
|
let mut ret = vec![];
|
||||||
|
|
||||||
ret.push([0u8; 32].into());
|
|
||||||
for entry in ring.ring.iter() {
|
for entry in ring.ring.iter() {
|
||||||
ret.push(entry.location);
|
ret.push(entry.location);
|
||||||
}
|
}
|
||||||
ret.push([0xFFu8; 32].into());
|
if ret.len() > 0 {
|
||||||
|
assert_eq!(ret[0], [0u8; 32].into());
|
||||||
|
}
|
||||||
|
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
632
src/table/sync.rs
Normal file
632
src/table/sync.rs
Normal file
|
@ -0,0 +1,632 @@
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::convert::TryInto;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use futures::future::join_all;
|
||||||
|
use futures::{pin_mut, select};
|
||||||
|
use futures_util::future::*;
|
||||||
|
use futures_util::stream::*;
|
||||||
|
use rand::Rng;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_bytes::ByteBuf;
|
||||||
|
use tokio::sync::{mpsc, watch};
|
||||||
|
|
||||||
|
use garage_rpc::ring::Ring;
|
||||||
|
use garage_util::data::*;
|
||||||
|
use garage_util::error::Error;
|
||||||
|
|
||||||
|
use crate::data::*;
|
||||||
|
use crate::merkle::*;
|
||||||
|
use crate::replication::*;
|
||||||
|
use crate::*;
|
||||||
|
|
||||||
|
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
|
||||||
|
// Do anti-entropy every 10 minutes
|
||||||
|
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
|
||||||
|
|
||||||
|
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
|
||||||
|
data: Arc<TableData<F>>,
|
||||||
|
aux: Arc<TableAux<F, R>>,
|
||||||
|
|
||||||
|
todo: Mutex<SyncTodo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
type RootCk = Vec<(MerklePartition, Hash)>;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||||
|
pub struct PartitionRange {
|
||||||
|
begin: MerklePartition,
|
||||||
|
// if end is None, go all the way to partition 0xFFFF included
|
||||||
|
end: Option<MerklePartition>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub(crate) enum SyncRPC {
|
||||||
|
RootCkHash(PartitionRange, Hash),
|
||||||
|
RootCkList(PartitionRange, RootCk),
|
||||||
|
CkNoDifference,
|
||||||
|
GetNode(MerkleNodeKey),
|
||||||
|
Node(MerkleNodeKey, MerkleNode),
|
||||||
|
Items(Vec<Arc<ByteBuf>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SyncTodo {
|
||||||
|
todo: Vec<TodoPartition>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct TodoPartition {
|
||||||
|
range: PartitionRange,
|
||||||
|
|
||||||
|
// Are we a node that stores this partition or not?
|
||||||
|
retain: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F, R> TableSyncer<F, R>
|
||||||
|
where
|
||||||
|
F: TableSchema + 'static,
|
||||||
|
R: TableReplication + 'static,
|
||||||
|
{
|
||||||
|
pub(crate) fn launch(data: Arc<TableData<F>>, aux: Arc<TableAux<F, R>>) -> Arc<Self> {
|
||||||
|
let todo = SyncTodo { todo: vec![] };
|
||||||
|
|
||||||
|
let syncer = Arc::new(Self {
|
||||||
|
data: data.clone(),
|
||||||
|
aux: aux.clone(),
|
||||||
|
todo: Mutex::new(todo),
|
||||||
|
});
|
||||||
|
|
||||||
|
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let s1 = syncer.clone();
|
||||||
|
aux.system.background.spawn_worker(
|
||||||
|
format!("table sync watcher for {}", data.name),
|
||||||
|
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
|
||||||
|
);
|
||||||
|
|
||||||
|
let s2 = syncer.clone();
|
||||||
|
aux.system.background.spawn_worker(
|
||||||
|
format!("table syncer for {}", data.name),
|
||||||
|
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
|
||||||
|
);
|
||||||
|
|
||||||
|
let s3 = syncer.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
tokio::time::delay_for(Duration::from_secs(20)).await;
|
||||||
|
s3.add_full_sync();
|
||||||
|
});
|
||||||
|
|
||||||
|
syncer
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn watcher_task(
|
||||||
|
self: Arc<Self>,
|
||||||
|
mut must_exit: watch::Receiver<bool>,
|
||||||
|
mut busy_rx: mpsc::UnboundedReceiver<bool>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
|
||||||
|
let mut nothing_to_do_since = Some(Instant::now());
|
||||||
|
|
||||||
|
while !*must_exit.borrow() {
|
||||||
|
let s_ring_recv = ring_recv.recv().fuse();
|
||||||
|
let s_busy = busy_rx.recv().fuse();
|
||||||
|
let s_must_exit = must_exit.recv().fuse();
|
||||||
|
let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
|
||||||
|
pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
|
||||||
|
|
||||||
|
select! {
|
||||||
|
new_ring_r = s_ring_recv => {
|
||||||
|
if new_ring_r.is_some() {
|
||||||
|
debug!("({}) Adding ring difference to syncer todo list", self.data.name);
|
||||||
|
self.add_full_sync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
busy_opt = s_busy => {
|
||||||
|
if let Some(busy) = busy_opt {
|
||||||
|
if busy {
|
||||||
|
nothing_to_do_since = None;
|
||||||
|
} else {
|
||||||
|
if nothing_to_do_since.is_none() {
|
||||||
|
nothing_to_do_since = Some(Instant::now());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
must_exit_v = s_must_exit => {
|
||||||
|
if must_exit_v.unwrap_or(false) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = s_timeout => {
|
||||||
|
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
|
||||||
|
nothing_to_do_since = None;
|
||||||
|
debug!("({}) Adding full sync to syncer todo list", self.data.name);
|
||||||
|
self.add_full_sync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_full_sync(&self) {
|
||||||
|
self.todo
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.add_full_sync(&self.data, &self.aux);
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn syncer_task(
|
||||||
|
self: Arc<Self>,
|
||||||
|
mut must_exit: watch::Receiver<bool>,
|
||||||
|
busy_tx: mpsc::UnboundedSender<bool>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
while !*must_exit.borrow() {
|
||||||
|
let task = self.todo.lock().unwrap().pop_task();
|
||||||
|
if let Some(partition) = task {
|
||||||
|
busy_tx.send(true)?;
|
||||||
|
let res = self
|
||||||
|
.clone()
|
||||||
|
.sync_partition(&partition, &mut must_exit)
|
||||||
|
.await;
|
||||||
|
if let Err(e) = res {
|
||||||
|
warn!(
|
||||||
|
"({}) Error while syncing {:?}: {}",
|
||||||
|
self.data.name, partition, e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
busy_tx.send(false)?;
|
||||||
|
tokio::time::delay_for(Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn sync_partition(
|
||||||
|
self: Arc<Self>,
|
||||||
|
partition: &TodoPartition,
|
||||||
|
must_exit: &mut watch::Receiver<bool>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
if partition.retain {
|
||||||
|
let my_id = self.aux.system.id;
|
||||||
|
|
||||||
|
let nodes = self
|
||||||
|
.aux
|
||||||
|
.replication
|
||||||
|
.write_nodes(
|
||||||
|
&hash_of_merkle_partition(partition.range.begin),
|
||||||
|
&self.aux.system,
|
||||||
|
)
|
||||||
|
.into_iter()
|
||||||
|
.filter(|node| *node != my_id)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"({}) Syncing {:?} with {:?}...",
|
||||||
|
self.data.name, partition, nodes
|
||||||
|
);
|
||||||
|
let mut sync_futures = nodes
|
||||||
|
.iter()
|
||||||
|
.map(|node| {
|
||||||
|
self.clone()
|
||||||
|
.do_sync_with(partition.clone(), *node, must_exit.clone())
|
||||||
|
})
|
||||||
|
.collect::<FuturesUnordered<_>>();
|
||||||
|
|
||||||
|
let mut n_errors = 0;
|
||||||
|
while let Some(r) = sync_futures.next().await {
|
||||||
|
if let Err(e) = r {
|
||||||
|
n_errors += 1;
|
||||||
|
warn!("({}) Sync error: {}", self.data.name, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if n_errors > self.aux.replication.max_write_errors() {
|
||||||
|
return Err(Error::Message(format!(
|
||||||
|
"Sync failed with too many nodes (should have been: {:?}).",
|
||||||
|
nodes
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.offload_partition(
|
||||||
|
&hash_of_merkle_partition(partition.range.begin),
|
||||||
|
&hash_of_merkle_partition_opt(partition.range.end),
|
||||||
|
must_exit,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Offload partition: this partition is not something we are storing,
|
||||||
|
// so send it out to all other nodes that store it and delete items locally.
|
||||||
|
// We don't bother checking if the remote nodes already have the items,
|
||||||
|
// we just batch-send everything. Offloading isn't supposed to happen very often.
|
||||||
|
// If any of the nodes that are supposed to store the items is unable to
|
||||||
|
// save them, we interrupt the process.
|
||||||
|
async fn offload_partition(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
begin: &Hash,
|
||||||
|
end: &Hash,
|
||||||
|
must_exit: &mut watch::Receiver<bool>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut counter: usize = 0;
|
||||||
|
|
||||||
|
while !*must_exit.borrow() {
|
||||||
|
let mut items = Vec::new();
|
||||||
|
|
||||||
|
for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
|
||||||
|
let (key, value) = item?;
|
||||||
|
items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
|
||||||
|
|
||||||
|
if items.len() >= 1024 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if items.len() > 0 {
|
||||||
|
let nodes = self
|
||||||
|
.aux
|
||||||
|
.replication
|
||||||
|
.write_nodes(&begin, &self.aux.system)
|
||||||
|
.into_iter()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
if nodes.contains(&self.aux.system.id) {
|
||||||
|
warn!("Interrupting offload as partitions seem to have changed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
counter += 1;
|
||||||
|
debug!(
|
||||||
|
"Offloading {} items from {:?}..{:?} ({})",
|
||||||
|
items.len(),
|
||||||
|
begin,
|
||||||
|
end,
|
||||||
|
counter
|
||||||
|
);
|
||||||
|
self.offload_items(&items, &nodes[..]).await?;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn offload_items(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
|
||||||
|
nodes: &[UUID],
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
|
||||||
|
let update_msg = Arc::new(TableRPC::<F>::Update(values));
|
||||||
|
|
||||||
|
for res in join_all(nodes.iter().map(|to| {
|
||||||
|
self.aux
|
||||||
|
.rpc_client
|
||||||
|
.call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
res?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// All remote nodes have written those items, now we can delete them locally
|
||||||
|
let mut not_removed = 0;
|
||||||
|
for (k, v) in items.iter() {
|
||||||
|
if !self.data.delete_if_equal(&k[..], &v[..])? {
|
||||||
|
not_removed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if not_removed > 0 {
|
||||||
|
debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ======
|
||||||
|
|
||||||
|
fn get_root_ck(&self, range: PartitionRange) -> Result<RootCk, Error> {
|
||||||
|
let begin = u16::from_be_bytes(range.begin);
|
||||||
|
let range_iter = match range.end {
|
||||||
|
Some(end) => {
|
||||||
|
let end = u16::from_be_bytes(end);
|
||||||
|
begin..=(end - 1)
|
||||||
|
}
|
||||||
|
None => begin..=0xFFFF,
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut ret = vec![];
|
||||||
|
for i in range_iter {
|
||||||
|
let key = MerkleNodeKey {
|
||||||
|
partition: u16::to_be_bytes(i),
|
||||||
|
prefix: vec![],
|
||||||
|
};
|
||||||
|
match self.data.merkle_updater.read_node(&key)? {
|
||||||
|
MerkleNode::Empty => (),
|
||||||
|
x => {
|
||||||
|
ret.push((key.partition, hash_of(&x)?));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn do_sync_with(
|
||||||
|
self: Arc<Self>,
|
||||||
|
partition: TodoPartition,
|
||||||
|
who: UUID,
|
||||||
|
must_exit: watch::Receiver<bool>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let root_ck = self.get_root_ck(partition.range)?;
|
||||||
|
let root_ck_hash = hash_of(&root_ck)?;
|
||||||
|
|
||||||
|
// If their root checksum has level > than us, use that as a reference
|
||||||
|
let root_resp = self
|
||||||
|
.aux
|
||||||
|
.rpc_client
|
||||||
|
.call(
|
||||||
|
who,
|
||||||
|
TableRPC::<F>::SyncRPC(SyncRPC::RootCkHash(partition.range, root_ck_hash)),
|
||||||
|
TABLE_SYNC_RPC_TIMEOUT,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut todo = match root_resp {
|
||||||
|
TableRPC::<F>::SyncRPC(SyncRPC::CkNoDifference) => {
|
||||||
|
debug!(
|
||||||
|
"({}) Sync {:?} with {:?}: no difference",
|
||||||
|
self.data.name, partition, who
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
TableRPC::<F>::SyncRPC(SyncRPC::RootCkList(_, their_root_ck)) => {
|
||||||
|
let join = join_ordered(&root_ck[..], &their_root_ck[..]);
|
||||||
|
let mut todo = VecDeque::new();
|
||||||
|
for (p, v1, v2) in join.iter() {
|
||||||
|
let diff = match (v1, v2) {
|
||||||
|
(Some(_), None) | (None, Some(_)) => true,
|
||||||
|
(Some(a), Some(b)) => a != b,
|
||||||
|
_ => false,
|
||||||
|
};
|
||||||
|
if diff {
|
||||||
|
todo.push_back(MerkleNodeKey {
|
||||||
|
partition: **p,
|
||||||
|
prefix: vec![],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
debug!(
|
||||||
|
"({}) Sync {:?} with {:?}: todo.len() = {}",
|
||||||
|
self.data.name,
|
||||||
|
partition,
|
||||||
|
who,
|
||||||
|
todo.len()
|
||||||
|
);
|
||||||
|
todo
|
||||||
|
}
|
||||||
|
x => {
|
||||||
|
return Err(Error::Message(format!(
|
||||||
|
"Invalid respone to RootCkHash RPC: {}",
|
||||||
|
debug_serialize(x)
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut todo_items = vec![];
|
||||||
|
|
||||||
|
while !todo.is_empty() && !*must_exit.borrow() {
|
||||||
|
let key = todo.pop_front().unwrap();
|
||||||
|
let node = self.data.merkle_updater.read_node(&key)?;
|
||||||
|
|
||||||
|
match node {
|
||||||
|
MerkleNode::Empty => {
|
||||||
|
// They have items we don't have.
|
||||||
|
// We don't request those items from them, they will send them.
|
||||||
|
// We only bother with pushing items that differ
|
||||||
|
}
|
||||||
|
MerkleNode::Leaf(ik, _) => {
|
||||||
|
// Just send that item directly
|
||||||
|
if let Some(val) = self.data.store.get(ik)? {
|
||||||
|
todo_items.push(val.to_vec());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MerkleNode::Intermediate(l) => {
|
||||||
|
let remote_node = match self
|
||||||
|
.aux
|
||||||
|
.rpc_client
|
||||||
|
.call(
|
||||||
|
who,
|
||||||
|
TableRPC::<F>::SyncRPC(SyncRPC::GetNode(key.clone())),
|
||||||
|
TABLE_SYNC_RPC_TIMEOUT,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
TableRPC::<F>::SyncRPC(SyncRPC::Node(_, node)) => node,
|
||||||
|
x => {
|
||||||
|
return Err(Error::Message(format!(
|
||||||
|
"Invalid respone to GetNode RPC: {}",
|
||||||
|
debug_serialize(x)
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let int_l2 = match remote_node {
|
||||||
|
MerkleNode::Intermediate(l2) => l2,
|
||||||
|
_ => vec![],
|
||||||
|
};
|
||||||
|
|
||||||
|
let join = join_ordered(&l[..], &int_l2[..]);
|
||||||
|
for (p, v1, v2) in join.into_iter() {
|
||||||
|
let diff = match (v1, v2) {
|
||||||
|
(Some(_), None) | (None, Some(_)) => true,
|
||||||
|
(Some(a), Some(b)) => a != b,
|
||||||
|
_ => false,
|
||||||
|
};
|
||||||
|
if diff {
|
||||||
|
todo.push_back(key.add_byte(*p));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if todo_items.len() >= 256 {
|
||||||
|
self.send_items(who, std::mem::replace(&mut todo_items, vec![]))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !todo_items.is_empty() {
|
||||||
|
self.send_items(who, todo_items).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
|
||||||
|
info!(
|
||||||
|
"({}) Sending {} items to {:?}",
|
||||||
|
self.data.name,
|
||||||
|
item_list.len(),
|
||||||
|
who
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut values = vec![];
|
||||||
|
for item in item_list.iter() {
|
||||||
|
if let Some(v) = self.data.store.get(&item[..])? {
|
||||||
|
values.push(Arc::new(ByteBuf::from(v.as_ref())));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let rpc_resp = self
|
||||||
|
.aux
|
||||||
|
.rpc_client
|
||||||
|
.call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
|
||||||
|
.await?;
|
||||||
|
if let TableRPC::<F>::Ok = rpc_resp {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(Error::Message(format!(
|
||||||
|
"Unexpected response to RPC Update: {}",
|
||||||
|
debug_serialize(&rpc_resp)
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
|
||||||
|
|
||||||
|
pub(crate) async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
|
||||||
|
match message {
|
||||||
|
SyncRPC::RootCkHash(range, h) => {
|
||||||
|
let root_ck = self.get_root_ck(*range)?;
|
||||||
|
let hash = hash_of(&root_ck)?;
|
||||||
|
if hash == *h {
|
||||||
|
Ok(SyncRPC::CkNoDifference)
|
||||||
|
} else {
|
||||||
|
Ok(SyncRPC::RootCkList(*range, root_ck))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SyncRPC::GetNode(k) => {
|
||||||
|
let node = self.data.merkle_updater.read_node(&k)?;
|
||||||
|
Ok(SyncRPC::Node(k.clone(), node))
|
||||||
|
}
|
||||||
|
_ => Err(Error::Message(format!("Unexpected sync RPC"))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SyncTodo {
|
||||||
|
fn add_full_sync<F: TableSchema, R: TableReplication>(
|
||||||
|
&mut self,
|
||||||
|
data: &TableData<F>,
|
||||||
|
aux: &TableAux<F, R>,
|
||||||
|
) {
|
||||||
|
let my_id = aux.system.id;
|
||||||
|
|
||||||
|
self.todo.clear();
|
||||||
|
|
||||||
|
let ring = aux.system.ring.borrow().clone();
|
||||||
|
let split_points = aux.replication.split_points(&ring);
|
||||||
|
|
||||||
|
for i in 0..split_points.len() {
|
||||||
|
let begin: MerklePartition = {
|
||||||
|
let b = split_points[i];
|
||||||
|
assert_eq!(b.as_slice()[2..], [0u8; 30][..]);
|
||||||
|
b.as_slice()[..2].try_into().unwrap()
|
||||||
|
};
|
||||||
|
|
||||||
|
let end: Option<MerklePartition> = if i + 1 < split_points.len() {
|
||||||
|
let e = split_points[i + 1];
|
||||||
|
assert_eq!(e.as_slice()[2..], [0u8; 30][..]);
|
||||||
|
Some(e.as_slice()[..2].try_into().unwrap())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let begin_hash = hash_of_merkle_partition(begin);
|
||||||
|
let end_hash = hash_of_merkle_partition_opt(end);
|
||||||
|
|
||||||
|
let nodes = aux.replication.replication_nodes(&begin_hash, &ring);
|
||||||
|
|
||||||
|
let retain = nodes.contains(&my_id);
|
||||||
|
if !retain {
|
||||||
|
// Check if we have some data to send, otherwise skip
|
||||||
|
if data.store.range(begin_hash..end_hash).next().is_none() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.todo.push(TodoPartition {
|
||||||
|
range: PartitionRange { begin, end },
|
||||||
|
retain,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pop_task(&mut self) -> Option<TodoPartition> {
|
||||||
|
if self.todo.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
|
||||||
|
if i == self.todo.len() - 1 {
|
||||||
|
self.todo.pop()
|
||||||
|
} else {
|
||||||
|
let replacement = self.todo.pop().unwrap();
|
||||||
|
let ret = std::mem::replace(&mut self.todo[i], replacement);
|
||||||
|
Some(ret)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
|
||||||
|
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn join_ordered<'a, K: Ord + Eq, V1, V2>(
|
||||||
|
x: &'a [(K, V1)],
|
||||||
|
y: &'a [(K, V2)],
|
||||||
|
) -> Vec<(&'a K, Option<&'a V1>, Option<&'a V2>)> {
|
||||||
|
let mut ret = vec![];
|
||||||
|
let mut i = 0;
|
||||||
|
let mut j = 0;
|
||||||
|
while i < x.len() || j < y.len() {
|
||||||
|
if i < x.len() && j < y.len() && x[i].0 == y[j].0 {
|
||||||
|
ret.push((&x[i].0, Some(&x[i].1), Some(&y[j].1)));
|
||||||
|
i += 1;
|
||||||
|
j += 1;
|
||||||
|
} else if i < x.len() && (j == y.len() || x[i].0 < y[j].0) {
|
||||||
|
ret.push((&x[i].0, Some(&x[i].1), None));
|
||||||
|
i += 1;
|
||||||
|
} else if j < y.len() && (i == x.len() || x[i].0 > y[j].0) {
|
||||||
|
ret.push((&x[i].0, None, Some(&y[j].1)));
|
||||||
|
j += 1;
|
||||||
|
} else {
|
||||||
|
unreachable!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret
|
||||||
|
}
|
|
@ -15,9 +15,9 @@ use garage_rpc::rpc_server::*;
|
||||||
|
|
||||||
use crate::crdt::CRDT;
|
use crate::crdt::CRDT;
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
use crate::schema::*;
|
|
||||||
use crate::table_sync::*;
|
|
||||||
use crate::replication::*;
|
use crate::replication::*;
|
||||||
|
use crate::schema::*;
|
||||||
|
use crate::sync::*;
|
||||||
|
|
||||||
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
|
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
|
@ -50,7 +50,6 @@ pub(crate) enum TableRPC<F: TableSchema> {
|
||||||
|
|
||||||
impl<F: TableSchema> RpcMessage for TableRPC<F> {}
|
impl<F: TableSchema> RpcMessage for TableRPC<F> {}
|
||||||
|
|
||||||
|
|
||||||
impl<F, R> Table<F, R>
|
impl<F, R> Table<F, R>
|
||||||
where
|
where
|
||||||
F: TableSchema + 'static,
|
F: TableSchema + 'static,
|
||||||
|
@ -69,29 +68,17 @@ where
|
||||||
let rpc_path = format!("table_{}", name);
|
let rpc_path = format!("table_{}", name);
|
||||||
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
|
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
|
||||||
|
|
||||||
let data = TableData::new(
|
let data = TableData::new(name, instance, db, system.background.clone());
|
||||||
name,
|
|
||||||
instance,
|
|
||||||
db,
|
|
||||||
system.background.clone(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let aux = Arc::new(TableAux{
|
let aux = Arc::new(TableAux {
|
||||||
system,
|
system,
|
||||||
replication,
|
replication,
|
||||||
rpc_client,
|
rpc_client,
|
||||||
});
|
});
|
||||||
|
|
||||||
let syncer = TableSyncer::launch(
|
let syncer = TableSyncer::launch(data.clone(), aux.clone());
|
||||||
data.clone(),
|
|
||||||
aux.clone(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let table = Arc::new(Self {
|
let table = Arc::new(Self { data, aux, syncer });
|
||||||
data,
|
|
||||||
aux,
|
|
||||||
syncer,
|
|
||||||
});
|
|
||||||
|
|
||||||
table.clone().register_handler(rpc_server, rpc_path);
|
table.clone().register_handler(rpc_server, rpc_path);
|
||||||
|
|
||||||
|
@ -106,7 +93,8 @@ where
|
||||||
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
|
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
|
||||||
let rpc = TableRPC::<F>::Update(vec![e_enc]);
|
let rpc = TableRPC::<F>::Update(vec![e_enc]);
|
||||||
|
|
||||||
self.aux.rpc_client
|
self.aux
|
||||||
|
.rpc_client
|
||||||
.try_call_many(
|
.try_call_many(
|
||||||
&who[..],
|
&who[..],
|
||||||
rpc,
|
rpc,
|
||||||
|
@ -135,7 +123,11 @@ where
|
||||||
let call_futures = call_list.drain().map(|(node, entries)| async move {
|
let call_futures = call_list.drain().map(|(node, entries)| async move {
|
||||||
let rpc = TableRPC::<F>::Update(entries);
|
let rpc = TableRPC::<F>::Update(entries);
|
||||||
|
|
||||||
let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
|
let resp = self
|
||||||
|
.aux
|
||||||
|
.rpc_client
|
||||||
|
.call(node, rpc, TABLE_RPC_TIMEOUT)
|
||||||
|
.await?;
|
||||||
Ok::<_, Error>((node, resp))
|
Ok::<_, Error>((node, resp))
|
||||||
});
|
});
|
||||||
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
|
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
|
||||||
|
@ -200,7 +192,8 @@ where
|
||||||
if not_all_same {
|
if not_all_same {
|
||||||
let self2 = self.clone();
|
let self2 = self.clone();
|
||||||
let ent2 = ret_entry.clone();
|
let ent2 = ret_entry.clone();
|
||||||
self.aux.system
|
self.aux
|
||||||
|
.system
|
||||||
.background
|
.background
|
||||||
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
|
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
|
||||||
}
|
}
|
||||||
|
@ -221,7 +214,8 @@ where
|
||||||
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
|
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
|
||||||
|
|
||||||
let resps = self
|
let resps = self
|
||||||
.aux.rpc_client
|
.aux
|
||||||
|
.rpc_client
|
||||||
.try_call_many(
|
.try_call_many(
|
||||||
&who[..],
|
&who[..],
|
||||||
rpc,
|
rpc,
|
||||||
|
@ -276,7 +270,8 @@ where
|
||||||
|
|
||||||
async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
|
async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
|
||||||
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
|
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
|
||||||
self.aux.rpc_client
|
self.aux
|
||||||
|
.rpc_client
|
||||||
.try_call_many(
|
.try_call_many(
|
||||||
&who[..],
|
&who[..],
|
||||||
TableRPC::<F>::Update(vec![what_enc]),
|
TableRPC::<F>::Update(vec![what_enc]),
|
||||||
|
@ -296,7 +291,8 @@ where
|
||||||
});
|
});
|
||||||
|
|
||||||
let self2 = self.clone();
|
let self2 = self.clone();
|
||||||
self.aux.rpc_client
|
self.aux
|
||||||
|
.rpc_client
|
||||||
.set_local_handler(self.aux.system.id, move |msg| {
|
.set_local_handler(self.aux.system.id, move |msg| {
|
||||||
let self2 = self2.clone();
|
let self2 = self2.clone();
|
||||||
async move { self2.handle(&msg).await }
|
async move { self2.handle(&msg).await }
|
||||||
|
@ -318,9 +314,7 @@ where
|
||||||
Ok(TableRPC::Ok)
|
Ok(TableRPC::Ok)
|
||||||
}
|
}
|
||||||
TableRPC::SyncRPC(rpc) => {
|
TableRPC::SyncRPC(rpc) => {
|
||||||
let response = self.syncer
|
let response = self.syncer.handle_rpc(rpc).await?;
|
||||||
.handle_rpc(rpc, self.aux.system.background.stop_signal.clone())
|
|
||||||
.await?;
|
|
||||||
Ok(TableRPC::SyncRPC(response))
|
Ok(TableRPC::SyncRPC(response))
|
||||||
}
|
}
|
||||||
_ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
|
_ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
|
||||||
|
|
|
@ -1,898 +0,0 @@
|
||||||
use rand::Rng;
|
|
||||||
use std::collections::{BTreeMap, VecDeque};
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
|
|
||||||
use futures::future::join_all;
|
|
||||||
use futures::{pin_mut, select};
|
|
||||||
use futures_util::future::*;
|
|
||||||
use futures_util::stream::*;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use serde_bytes::ByteBuf;
|
|
||||||
use tokio::sync::{mpsc, watch};
|
|
||||||
|
|
||||||
use garage_rpc::ring::Ring;
|
|
||||||
use garage_util::data::*;
|
|
||||||
use garage_util::error::Error;
|
|
||||||
|
|
||||||
use crate::*;
|
|
||||||
use crate::data::*;
|
|
||||||
use crate::replication::*;
|
|
||||||
|
|
||||||
const MAX_DEPTH: usize = 16;
|
|
||||||
|
|
||||||
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
|
|
||||||
|
|
||||||
// Do anti-entropy every 10 minutes
|
|
||||||
const SCAN_INTERVAL: Duration = Duration::from_secs(10 * 60);
|
|
||||||
|
|
||||||
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60);
|
|
||||||
|
|
||||||
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
|
|
||||||
data: Arc<TableData<F>>,
|
|
||||||
aux: Arc<TableAux<F, R>>,
|
|
||||||
|
|
||||||
todo: Mutex<SyncTodo>,
|
|
||||||
cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
pub(crate) enum SyncRPC {
|
|
||||||
GetRootChecksumRange(Hash, Hash),
|
|
||||||
RootChecksumRange(SyncRange),
|
|
||||||
Checksums(Vec<RangeChecksum>),
|
|
||||||
Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SyncTodo {
|
|
||||||
todo: Vec<TodoPartition>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct TodoPartition {
|
|
||||||
// Partition consists in hashes between begin included and end excluded
|
|
||||||
begin: Hash,
|
|
||||||
end: Hash,
|
|
||||||
|
|
||||||
// Are we a node that stores this partition or not?
|
|
||||||
retain: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
// A SyncRange defines a query on the dataset stored by a node, in the following way:
|
|
||||||
// - all items whose key are >= `begin`
|
|
||||||
// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded)
|
|
||||||
// - except if the first item of the range has such many leading zero bytes
|
|
||||||
// - and stopping at `end` (excluded) if such an item is not found
|
|
||||||
// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges"
|
|
||||||
// i.e. of ranges of level `level-1` that cover the same range
|
|
||||||
// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin)
|
|
||||||
// See RangeChecksum for the struct that stores this information.
|
|
||||||
#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub(crate) struct SyncRange {
|
|
||||||
begin: Vec<u8>,
|
|
||||||
end: Vec<u8>,
|
|
||||||
level: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::cmp::PartialOrd for SyncRange {
|
|
||||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
|
||||||
Some(self.cmp(other))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl std::cmp::Ord for SyncRange {
|
|
||||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
|
||||||
self.begin
|
|
||||||
.cmp(&other.begin)
|
|
||||||
.then(self.level.cmp(&other.level))
|
|
||||||
.then(self.end.cmp(&other.end))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub(crate) struct RangeChecksum {
|
|
||||||
bounds: SyncRange,
|
|
||||||
children: Vec<(SyncRange, Hash)>,
|
|
||||||
found_limit: Option<Vec<u8>>,
|
|
||||||
|
|
||||||
#[serde(skip, default = "std::time::Instant::now")]
|
|
||||||
time: Instant,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct RangeChecksumCache {
|
|
||||||
hash: Option<Hash>, // None if no children
|
|
||||||
found_limit: Option<Vec<u8>>,
|
|
||||||
time: Instant,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F, R> TableSyncer<F, R>
|
|
||||||
where
|
|
||||||
F: TableSchema + 'static,
|
|
||||||
R: TableReplication + 'static,
|
|
||||||
{
|
|
||||||
pub(crate) fn launch(data: Arc<TableData<F>>,
|
|
||||||
aux: Arc<TableAux<F, R>>) -> Arc<Self> {
|
|
||||||
let todo = SyncTodo{ todo: vec![] };
|
|
||||||
|
|
||||||
let syncer = Arc::new(Self {
|
|
||||||
data: data.clone(),
|
|
||||||
aux: aux.clone(),
|
|
||||||
todo: Mutex::new(todo),
|
|
||||||
cache: (0..MAX_DEPTH)
|
|
||||||
.map(|_| Mutex::new(BTreeMap::new()))
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
});
|
|
||||||
|
|
||||||
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
let s1 = syncer.clone();
|
|
||||||
aux.system.background.spawn_worker(
|
|
||||||
format!("table sync watcher for {}", data.name),
|
|
||||||
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
|
|
||||||
);
|
|
||||||
|
|
||||||
let s2 = syncer.clone();
|
|
||||||
aux.system.background.spawn_worker(
|
|
||||||
format!("table syncer for {}", data.name),
|
|
||||||
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
|
|
||||||
);
|
|
||||||
|
|
||||||
let s3 = syncer.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
tokio::time::delay_for(Duration::from_secs(20)).await;
|
|
||||||
s3.add_full_scan();
|
|
||||||
});
|
|
||||||
|
|
||||||
syncer
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn watcher_task(
|
|
||||||
self: Arc<Self>,
|
|
||||||
mut must_exit: watch::Receiver<bool>,
|
|
||||||
mut busy_rx: mpsc::UnboundedReceiver<bool>,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone();
|
|
||||||
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
|
|
||||||
let mut nothing_to_do_since = Some(Instant::now());
|
|
||||||
|
|
||||||
while !*must_exit.borrow() {
|
|
||||||
let s_ring_recv = ring_recv.recv().fuse();
|
|
||||||
let s_busy = busy_rx.recv().fuse();
|
|
||||||
let s_must_exit = must_exit.recv().fuse();
|
|
||||||
let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
|
|
||||||
pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
|
|
||||||
|
|
||||||
select! {
|
|
||||||
new_ring_r = s_ring_recv => {
|
|
||||||
if let Some(new_ring) = new_ring_r {
|
|
||||||
debug!("({}) Adding ring difference to syncer todo list", self.data.name);
|
|
||||||
self.todo.lock().unwrap().add_ring_difference(&prev_ring, &new_ring, &self.data, &self.aux);
|
|
||||||
prev_ring = new_ring;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
busy_opt = s_busy => {
|
|
||||||
if let Some(busy) = busy_opt {
|
|
||||||
if busy {
|
|
||||||
nothing_to_do_since = None;
|
|
||||||
} else {
|
|
||||||
if nothing_to_do_since.is_none() {
|
|
||||||
nothing_to_do_since = Some(Instant::now());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
must_exit_v = s_must_exit => {
|
|
||||||
if must_exit_v.unwrap_or(false) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ = s_timeout => {
|
|
||||||
if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
|
|
||||||
nothing_to_do_since = None;
|
|
||||||
debug!("({}) Adding full scan to syncer todo list", self.data.name);
|
|
||||||
self.add_full_scan();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn add_full_scan(&self) {
|
|
||||||
self.todo.lock().unwrap().add_full_scan(&self.data, &self.aux);
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn syncer_task(
|
|
||||||
self: Arc<Self>,
|
|
||||||
mut must_exit: watch::Receiver<bool>,
|
|
||||||
busy_tx: mpsc::UnboundedSender<bool>,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
while !*must_exit.borrow() {
|
|
||||||
let task = self.todo.lock().unwrap().pop_task();
|
|
||||||
if let Some(partition) = task {
|
|
||||||
busy_tx.send(true)?;
|
|
||||||
let res = self
|
|
||||||
.clone()
|
|
||||||
.sync_partition(&partition, &mut must_exit)
|
|
||||||
.await;
|
|
||||||
if let Err(e) = res {
|
|
||||||
warn!(
|
|
||||||
"({}) Error while syncing {:?}: {}",
|
|
||||||
self.data.name, partition, e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
busy_tx.send(false)?;
|
|
||||||
tokio::time::delay_for(Duration::from_secs(1)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn sync_partition(
|
|
||||||
self: Arc<Self>,
|
|
||||||
partition: &TodoPartition,
|
|
||||||
must_exit: &mut watch::Receiver<bool>,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
if partition.retain {
|
|
||||||
let my_id = self.aux.system.id;
|
|
||||||
let nodes = self
|
|
||||||
.aux
|
|
||||||
.replication
|
|
||||||
.write_nodes(&partition.begin, &self.aux.system)
|
|
||||||
.into_iter()
|
|
||||||
.filter(|node| *node != my_id)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"({}) Preparing to sync {:?} with {:?}...",
|
|
||||||
self.data.name, partition, nodes
|
|
||||||
);
|
|
||||||
let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?;
|
|
||||||
|
|
||||||
let mut sync_futures = nodes
|
|
||||||
.iter()
|
|
||||||
.map(|node| {
|
|
||||||
self.clone().do_sync_with(
|
|
||||||
partition.clone(),
|
|
||||||
root_cks.clone(),
|
|
||||||
*node,
|
|
||||||
must_exit.clone(),
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect::<FuturesUnordered<_>>();
|
|
||||||
|
|
||||||
let mut n_errors = 0;
|
|
||||||
while let Some(r) = sync_futures.next().await {
|
|
||||||
if let Err(e) = r {
|
|
||||||
n_errors += 1;
|
|
||||||
warn!("({}) Sync error: {}", self.data.name, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if n_errors > self.aux.replication.max_write_errors() {
|
|
||||||
return Err(Error::Message(format!(
|
|
||||||
"Sync failed with too many nodes (should have been: {:?}).",
|
|
||||||
nodes
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.offload_partition(&partition.begin, &partition.end, must_exit)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Offload partition: this partition is not something we are storing,
|
|
||||||
// so send it out to all other nodes that store it and delete items locally.
|
|
||||||
// We don't bother checking if the remote nodes already have the items,
|
|
||||||
// we just batch-send everything. Offloading isn't supposed to happen very often.
|
|
||||||
// If any of the nodes that are supposed to store the items is unable to
|
|
||||||
// save them, we interrupt the process.
|
|
||||||
async fn offload_partition(
|
|
||||||
self: &Arc<Self>,
|
|
||||||
begin: &Hash,
|
|
||||||
end: &Hash,
|
|
||||||
must_exit: &mut watch::Receiver<bool>,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let mut counter: usize = 0;
|
|
||||||
|
|
||||||
while !*must_exit.borrow() {
|
|
||||||
let mut items = Vec::new();
|
|
||||||
|
|
||||||
for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
|
|
||||||
let (key, value) = item?;
|
|
||||||
items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
|
|
||||||
|
|
||||||
if items.len() >= 1024 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if items.len() > 0 {
|
|
||||||
let nodes = self
|
|
||||||
.aux
|
|
||||||
.replication
|
|
||||||
.write_nodes(&begin, &self.aux.system)
|
|
||||||
.into_iter()
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
if nodes.contains(&self.aux.system.id) {
|
|
||||||
warn!("Interrupting offload as partitions seem to have changed");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
counter += 1;
|
|
||||||
debug!(
|
|
||||||
"Offloading {} items from {:?}..{:?} ({})",
|
|
||||||
items.len(),
|
|
||||||
begin,
|
|
||||||
end,
|
|
||||||
counter
|
|
||||||
);
|
|
||||||
self.offload_items(&items, &nodes[..]).await?;
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn offload_items(
|
|
||||||
self: &Arc<Self>,
|
|
||||||
items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
|
|
||||||
nodes: &[UUID],
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
|
|
||||||
let update_msg = Arc::new(TableRPC::<F>::Update(values));
|
|
||||||
|
|
||||||
for res in join_all(nodes.iter().map(|to| {
|
|
||||||
self.aux
|
|
||||||
.rpc_client
|
|
||||||
.call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
|
|
||||||
}))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
res?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// All remote nodes have written those items, now we can delete them locally
|
|
||||||
let mut not_removed = 0;
|
|
||||||
for (k, v) in items.iter() {
|
|
||||||
if !self.data.delete_if_equal(&k[..], &v[..])? {
|
|
||||||
not_removed += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if not_removed > 0 {
|
|
||||||
debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn root_checksum(
|
|
||||||
self: &Arc<Self>,
|
|
||||||
begin: &Hash,
|
|
||||||
end: &Hash,
|
|
||||||
must_exit: &mut watch::Receiver<bool>,
|
|
||||||
) -> Result<RangeChecksum, Error> {
|
|
||||||
for i in 1..MAX_DEPTH {
|
|
||||||
let rc = self.range_checksum(
|
|
||||||
&SyncRange {
|
|
||||||
begin: begin.to_vec(),
|
|
||||||
end: end.to_vec(),
|
|
||||||
level: i,
|
|
||||||
},
|
|
||||||
must_exit,
|
|
||||||
)?;
|
|
||||||
if rc.found_limit.is_none() {
|
|
||||||
return Ok(rc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(Error::Message(format!(
|
|
||||||
"Unable to compute root checksum (this should never happen)"
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn range_checksum(
|
|
||||||
self: &Arc<Self>,
|
|
||||||
range: &SyncRange,
|
|
||||||
must_exit: &mut watch::Receiver<bool>,
|
|
||||||
) -> Result<RangeChecksum, Error> {
|
|
||||||
assert!(range.level != 0);
|
|
||||||
trace!("Call range_checksum {:?}", range);
|
|
||||||
|
|
||||||
if range.level == 1 {
|
|
||||||
let mut children = vec![];
|
|
||||||
for item in self
|
|
||||||
.data
|
|
||||||
.store
|
|
||||||
.range(range.begin.clone()..range.end.clone())
|
|
||||||
{
|
|
||||||
let (key, value) = item?;
|
|
||||||
let key_hash = blake2sum(&key[..]);
|
|
||||||
if children.len() > 0
|
|
||||||
&& key_hash.as_slice()[0..range.level]
|
|
||||||
.iter()
|
|
||||||
.all(|x| *x == 0u8)
|
|
||||||
{
|
|
||||||
trace!(
|
|
||||||
"range_checksum {:?} returning {} items",
|
|
||||||
range,
|
|
||||||
children.len()
|
|
||||||
);
|
|
||||||
return Ok(RangeChecksum {
|
|
||||||
bounds: range.clone(),
|
|
||||||
children,
|
|
||||||
found_limit: Some(key.to_vec()),
|
|
||||||
time: Instant::now(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
let item_range = SyncRange {
|
|
||||||
begin: key.to_vec(),
|
|
||||||
end: vec![],
|
|
||||||
level: 0,
|
|
||||||
};
|
|
||||||
children.push((item_range, blake2sum(&value[..])));
|
|
||||||
}
|
|
||||||
trace!(
|
|
||||||
"range_checksum {:?} returning {} items",
|
|
||||||
range,
|
|
||||||
children.len()
|
|
||||||
);
|
|
||||||
Ok(RangeChecksum {
|
|
||||||
bounds: range.clone(),
|
|
||||||
children,
|
|
||||||
found_limit: None,
|
|
||||||
time: Instant::now(),
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
let mut children = vec![];
|
|
||||||
let mut sub_range = SyncRange {
|
|
||||||
begin: range.begin.clone(),
|
|
||||||
end: range.end.clone(),
|
|
||||||
level: range.level - 1,
|
|
||||||
};
|
|
||||||
let mut time = Instant::now();
|
|
||||||
while !*must_exit.borrow() {
|
|
||||||
let sub_ck = self.range_checksum_cached_hash(&sub_range, must_exit)?;
|
|
||||||
|
|
||||||
if let Some(hash) = sub_ck.hash {
|
|
||||||
children.push((sub_range.clone(), hash));
|
|
||||||
if sub_ck.time < time {
|
|
||||||
time = sub_ck.time;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
|
|
||||||
trace!(
|
|
||||||
"range_checksum {:?} returning {} items",
|
|
||||||
range,
|
|
||||||
children.len()
|
|
||||||
);
|
|
||||||
return Ok(RangeChecksum {
|
|
||||||
bounds: range.clone(),
|
|
||||||
children,
|
|
||||||
found_limit: None,
|
|
||||||
time,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
let found_limit = sub_ck.found_limit.unwrap();
|
|
||||||
|
|
||||||
let actual_limit_hash = blake2sum(&found_limit[..]);
|
|
||||||
if actual_limit_hash.as_slice()[0..range.level]
|
|
||||||
.iter()
|
|
||||||
.all(|x| *x == 0u8)
|
|
||||||
{
|
|
||||||
trace!(
|
|
||||||
"range_checksum {:?} returning {} items",
|
|
||||||
range,
|
|
||||||
children.len()
|
|
||||||
);
|
|
||||||
return Ok(RangeChecksum {
|
|
||||||
bounds: range.clone(),
|
|
||||||
children,
|
|
||||||
found_limit: Some(found_limit.clone()),
|
|
||||||
time,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
sub_range.begin = found_limit;
|
|
||||||
}
|
|
||||||
trace!("range_checksum {:?} exiting due to must_exit", range);
|
|
||||||
Err(Error::Message(format!("Exiting.")))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn range_checksum_cached_hash(
|
|
||||||
self: &Arc<Self>,
|
|
||||||
range: &SyncRange,
|
|
||||||
must_exit: &mut watch::Receiver<bool>,
|
|
||||||
) -> Result<RangeChecksumCache, Error> {
|
|
||||||
{
|
|
||||||
let mut cache = self.cache[range.level].lock().unwrap();
|
|
||||||
if let Some(v) = cache.get(&range) {
|
|
||||||
if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
|
|
||||||
return Ok(v.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cache.remove(&range);
|
|
||||||
}
|
|
||||||
|
|
||||||
let v = self.range_checksum(&range, must_exit)?;
|
|
||||||
trace!(
|
|
||||||
"({}) New checksum calculated for {}-{}/{}, {} children",
|
|
||||||
self.data.name,
|
|
||||||
hex::encode(&range.begin)
|
|
||||||
.chars()
|
|
||||||
.take(16)
|
|
||||||
.collect::<String>(),
|
|
||||||
hex::encode(&range.end).chars().take(16).collect::<String>(),
|
|
||||||
range.level,
|
|
||||||
v.children.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
let hash = if v.children.len() > 0 {
|
|
||||||
Some(blake2sum(&rmp_to_vec_all_named(&v)?[..]))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
let cache_entry = RangeChecksumCache {
|
|
||||||
hash,
|
|
||||||
found_limit: v.found_limit,
|
|
||||||
time: v.time,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut cache = self.cache[range.level].lock().unwrap();
|
|
||||||
cache.insert(range.clone(), cache_entry.clone());
|
|
||||||
Ok(cache_entry)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn do_sync_with(
|
|
||||||
self: Arc<Self>,
|
|
||||||
partition: TodoPartition,
|
|
||||||
root_ck: RangeChecksum,
|
|
||||||
who: UUID,
|
|
||||||
mut must_exit: watch::Receiver<bool>,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let mut todo = VecDeque::new();
|
|
||||||
|
|
||||||
// If their root checksum has level > than us, use that as a reference
|
|
||||||
let root_cks_resp = self
|
|
||||||
.aux
|
|
||||||
.rpc_client
|
|
||||||
.call(
|
|
||||||
who,
|
|
||||||
TableRPC::<F>::SyncRPC(SyncRPC::GetRootChecksumRange(
|
|
||||||
partition.begin.clone(),
|
|
||||||
partition.end.clone(),
|
|
||||||
)),
|
|
||||||
TABLE_SYNC_RPC_TIMEOUT,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp {
|
|
||||||
if range.level > root_ck.bounds.level {
|
|
||||||
let their_root_range_ck = self.range_checksum(&range, &mut must_exit)?;
|
|
||||||
todo.push_back(their_root_range_ck);
|
|
||||||
} else {
|
|
||||||
todo.push_back(root_ck);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(Error::Message(format!(
|
|
||||||
"Invalid respone to GetRootChecksumRange RPC: {}",
|
|
||||||
debug_serialize(root_cks_resp)
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
while !todo.is_empty() && !*must_exit.borrow() {
|
|
||||||
let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
|
|
||||||
trace!(
|
|
||||||
"({}) Sync with {:?}: {} ({}) remaining",
|
|
||||||
self.data.name,
|
|
||||||
who,
|
|
||||||
todo.len(),
|
|
||||||
total_children
|
|
||||||
);
|
|
||||||
|
|
||||||
let step_size = std::cmp::min(16, todo.len());
|
|
||||||
let step = todo.drain(..step_size).collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let rpc_resp = self
|
|
||||||
.aux
|
|
||||||
.rpc_client
|
|
||||||
.call(
|
|
||||||
who,
|
|
||||||
TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)),
|
|
||||||
TABLE_SYNC_RPC_TIMEOUT,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) =
|
|
||||||
rpc_resp
|
|
||||||
{
|
|
||||||
if diff_ranges.len() > 0 || diff_items.len() > 0 {
|
|
||||||
info!(
|
|
||||||
"({}) Sync with {:?}: difference {} ranges, {} items",
|
|
||||||
self.data.name,
|
|
||||||
who,
|
|
||||||
diff_ranges.len(),
|
|
||||||
diff_items.len()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
let mut items_to_send = vec![];
|
|
||||||
for differing in diff_ranges.drain(..) {
|
|
||||||
if differing.level == 0 {
|
|
||||||
items_to_send.push(differing.begin);
|
|
||||||
} else {
|
|
||||||
let checksum = self.range_checksum(&differing, &mut must_exit)?;
|
|
||||||
todo.push_back(checksum);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if diff_items.len() > 0 {
|
|
||||||
self.data.update_many(&diff_items[..])?;
|
|
||||||
}
|
|
||||||
if items_to_send.len() > 0 {
|
|
||||||
self.send_items(who, items_to_send).await?;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(Error::Message(format!(
|
|
||||||
"Unexpected response to sync RPC checksums: {}",
|
|
||||||
debug_serialize(&rpc_resp)
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
|
|
||||||
info!(
|
|
||||||
"({}) Sending {} items to {:?}",
|
|
||||||
self.data.name,
|
|
||||||
item_list.len(),
|
|
||||||
who
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut values = vec![];
|
|
||||||
for item in item_list.iter() {
|
|
||||||
if let Some(v) = self.data.store.get(&item[..])? {
|
|
||||||
values.push(Arc::new(ByteBuf::from(v.as_ref())));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let rpc_resp = self
|
|
||||||
.aux
|
|
||||||
.rpc_client
|
|
||||||
.call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
|
|
||||||
.await?;
|
|
||||||
if let TableRPC::<F>::Ok = rpc_resp {
|
|
||||||
Ok(())
|
|
||||||
} else {
|
|
||||||
Err(Error::Message(format!(
|
|
||||||
"Unexpected response to RPC Update: {}",
|
|
||||||
debug_serialize(&rpc_resp)
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn handle_rpc(
|
|
||||||
self: &Arc<Self>,
|
|
||||||
message: &SyncRPC,
|
|
||||||
mut must_exit: watch::Receiver<bool>,
|
|
||||||
) -> Result<SyncRPC, Error> {
|
|
||||||
match message {
|
|
||||||
SyncRPC::GetRootChecksumRange(begin, end) => {
|
|
||||||
let root_cks = self.root_checksum(&begin, &end, &mut must_exit)?;
|
|
||||||
Ok(SyncRPC::RootChecksumRange(root_cks.bounds))
|
|
||||||
}
|
|
||||||
SyncRPC::Checksums(checksums) => {
|
|
||||||
self.handle_checksums_rpc(&checksums[..], &mut must_exit)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
_ => Err(Error::Message(format!("Unexpected sync RPC"))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_checksums_rpc(
|
|
||||||
self: &Arc<Self>,
|
|
||||||
checksums: &[RangeChecksum],
|
|
||||||
must_exit: &mut watch::Receiver<bool>,
|
|
||||||
) -> Result<SyncRPC, Error> {
|
|
||||||
let mut ret_ranges = vec![];
|
|
||||||
let mut ret_items = vec![];
|
|
||||||
|
|
||||||
for their_ckr in checksums.iter() {
|
|
||||||
let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit)?;
|
|
||||||
for (their_range, their_hash) in their_ckr.children.iter() {
|
|
||||||
let differs = match our_ckr
|
|
||||||
.children
|
|
||||||
.binary_search_by(|(our_range, _)| our_range.cmp(&their_range))
|
|
||||||
{
|
|
||||||
Err(_) => {
|
|
||||||
if their_range.level >= 1 {
|
|
||||||
let cached_hash =
|
|
||||||
self.range_checksum_cached_hash(&their_range, must_exit)?;
|
|
||||||
cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true)
|
|
||||||
} else {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(i) => our_ckr.children[i].1 != *their_hash,
|
|
||||||
};
|
|
||||||
if differs {
|
|
||||||
ret_ranges.push(their_range.clone());
|
|
||||||
if their_range.level == 0 {
|
|
||||||
if let Some(item_bytes) =
|
|
||||||
self.data.store.get(their_range.begin.as_slice())?
|
|
||||||
{
|
|
||||||
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (our_range, _hash) in our_ckr.children.iter() {
|
|
||||||
if let Some(their_found_limit) = &their_ckr.found_limit {
|
|
||||||
if our_range.begin.as_slice() > their_found_limit.as_slice() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let not_present = our_ckr
|
|
||||||
.children
|
|
||||||
.binary_search_by(|(their_range, _)| their_range.cmp(&our_range))
|
|
||||||
.is_err();
|
|
||||||
if not_present {
|
|
||||||
if our_range.level > 0 {
|
|
||||||
ret_ranges.push(our_range.clone());
|
|
||||||
}
|
|
||||||
if our_range.level == 0 {
|
|
||||||
if let Some(item_bytes) =
|
|
||||||
self.data.store.get(our_range.begin.as_slice())?
|
|
||||||
{
|
|
||||||
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let n_checksums = checksums
|
|
||||||
.iter()
|
|
||||||
.map(|x| x.children.len())
|
|
||||||
.fold(0, |x, y| x + y);
|
|
||||||
if ret_ranges.len() > 0 || ret_items.len() > 0 {
|
|
||||||
trace!(
|
|
||||||
"({}) Checksum comparison RPC: {} different + {} items for {} received",
|
|
||||||
self.data.name,
|
|
||||||
ret_ranges.len(),
|
|
||||||
ret_items.len(),
|
|
||||||
n_checksums
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(SyncRPC::Difference(ret_ranges, ret_items))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn invalidate(self: &Arc<Self>, item_key: &[u8]) {
|
|
||||||
for i in 1..MAX_DEPTH {
|
|
||||||
let needle = SyncRange {
|
|
||||||
begin: item_key.to_vec(),
|
|
||||||
end: vec![],
|
|
||||||
level: i,
|
|
||||||
};
|
|
||||||
let mut cache = self.cache[i].lock().unwrap();
|
|
||||||
if let Some(cache_entry) = cache.range(..=needle).rev().next() {
|
|
||||||
if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key {
|
|
||||||
let index = cache_entry.0.clone();
|
|
||||||
drop(cache_entry);
|
|
||||||
cache.remove(&index);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SyncTodo {
|
|
||||||
fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, data: &TableData<F>, aux: &TableAux<F, R>) {
|
|
||||||
let my_id = aux.system.id;
|
|
||||||
|
|
||||||
self.todo.clear();
|
|
||||||
|
|
||||||
let ring = aux.system.ring.borrow().clone();
|
|
||||||
let split_points = aux.replication.split_points(&ring);
|
|
||||||
|
|
||||||
for i in 0..split_points.len() - 1 {
|
|
||||||
let begin = split_points[i];
|
|
||||||
let end = split_points[i + 1];
|
|
||||||
if begin == end {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let nodes = aux.replication.replication_nodes(&begin, &ring);
|
|
||||||
|
|
||||||
let retain = nodes.contains(&my_id);
|
|
||||||
if !retain {
|
|
||||||
// Check if we have some data to send, otherwise skip
|
|
||||||
if data.store.range(begin..end).next().is_none() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.todo.push(TodoPartition { begin, end, retain });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn add_ring_difference<F: TableSchema, R: TableReplication>(
|
|
||||||
&mut self,
|
|
||||||
old_ring: &Ring,
|
|
||||||
new_ring: &Ring,
|
|
||||||
data: &TableData<F>, aux: &TableAux<F, R>,
|
|
||||||
) {
|
|
||||||
let my_id = aux.system.id;
|
|
||||||
|
|
||||||
// If it is us who are entering or leaving the system,
|
|
||||||
// initiate a full sync instead of incremental sync
|
|
||||||
if old_ring.config.members.contains_key(&my_id)
|
|
||||||
!= new_ring.config.members.contains_key(&my_id)
|
|
||||||
{
|
|
||||||
self.add_full_scan(data, aux);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut all_points = None
|
|
||||||
.into_iter()
|
|
||||||
.chain(aux.replication.split_points(old_ring).drain(..))
|
|
||||||
.chain(aux.replication.split_points(new_ring).drain(..))
|
|
||||||
.chain(self.todo.iter().map(|x| x.begin))
|
|
||||||
.chain(self.todo.iter().map(|x| x.end))
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
all_points.sort();
|
|
||||||
all_points.dedup();
|
|
||||||
|
|
||||||
let mut old_todo = std::mem::replace(&mut self.todo, vec![]);
|
|
||||||
old_todo.sort_by(|x, y| x.begin.cmp(&y.begin));
|
|
||||||
let mut new_todo = vec![];
|
|
||||||
|
|
||||||
for i in 0..all_points.len() - 1 {
|
|
||||||
let begin = all_points[i];
|
|
||||||
let end = all_points[i + 1];
|
|
||||||
let was_ours = aux
|
|
||||||
.replication
|
|
||||||
.replication_nodes(&begin, &old_ring)
|
|
||||||
.contains(&my_id);
|
|
||||||
let is_ours = aux
|
|
||||||
.replication
|
|
||||||
.replication_nodes(&begin, &new_ring)
|
|
||||||
.contains(&my_id);
|
|
||||||
|
|
||||||
let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) {
|
|
||||||
Ok(_) => true,
|
|
||||||
Err(j) => {
|
|
||||||
(j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end)
|
|
||||||
|| (j < old_todo.len()
|
|
||||||
&& old_todo[j].begin < end && begin < old_todo[j].end)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) {
|
|
||||||
new_todo.push(TodoPartition {
|
|
||||||
begin,
|
|
||||||
end,
|
|
||||||
retain: is_ours,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.todo = new_todo;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn pop_task(&mut self) -> Option<TodoPartition> {
|
|
||||||
if self.todo.is_empty() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
|
|
||||||
if i == self.todo.len() - 1 {
|
|
||||||
self.todo.pop()
|
|
||||||
} else {
|
|
||||||
let replacement = self.todo.pop().unwrap();
|
|
||||||
let ret = std::mem::replace(&mut self.todo[i], replacement);
|
|
||||||
Some(ret)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue