WIP add content defined chunking #42

Closed
trinity-1686a wants to merge 42 commits from content-defined-chunking into master
16 changed files with 387 additions and 325 deletions
Showing only changes of commit 94f3d28774 - Show all commits

3
Cargo.lock generated
View file

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "aho-corasick"
version = "0.7.15"
@ -583,7 +585,6 @@ dependencies = [
name = "garage_table"
version = "0.1.1"
dependencies = [
"arc-swap",
"async-trait",
"bytes 0.4.12",
"futures",

View file

@ -28,38 +28,23 @@ impl Repair {
self.garage
.bucket_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
.add_full_scan();
self.garage
.object_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
.add_full_scan();
self.garage
.version_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
.add_full_scan();
self.garage
.block_ref_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
.add_full_scan();
self.garage
.key_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
.add_full_scan();
}
// TODO: wait for full sync to finish before proceeding to the rest?
@ -93,7 +78,7 @@ impl Repair {
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? {
while let Some((item_key, item_bytes)) = self.garage.version_table.data.store.get_gt(&pos)? {
pos = item_key.to_vec();
let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
@ -141,7 +126,7 @@ impl Repair {
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? {
while let Some((item_key, item_bytes)) = self.garage.block_ref_table.data.store.get_gt(&pos)? {
pos = item_key.to_vec();
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;

View file

@ -19,8 +19,7 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use garage_table::table_sharded::TableShardedReplication;
use garage_table::TableReplication;
use garage_table::replication::{sharded::TableShardedReplication, TableReplication};
use crate::block_ref_table::*;
@ -412,7 +411,7 @@ impl BlockManager {
let garage = self.garage.load_full().unwrap();
let mut last_hash = None;
let mut i = 0usize;
for entry in garage.block_ref_table.store.iter() {
for entry in garage.block_ref_table.data.store.iter() {
let (_k, v_bytes) = entry?;
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?;
if Some(&block_ref.block) == last_hash.as_ref() {

View file

@ -7,8 +7,8 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::RpcHttpClient;
use garage_rpc::rpc_server::RpcServer;
use garage_table::table_fullcopy::*;
use garage_table::table_sharded::*;
use garage_table::replication::sharded::*;
use garage_table::replication::fullcopy::*;
use garage_table::*;
use crate::block::*;

View file

@ -6,7 +6,7 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::table_sharded::*;
use garage_table::replication::sharded::*;
use garage_table::*;
use crate::version_table::*;

View file

@ -5,7 +5,7 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::table_sharded::*;
use garage_table::replication::sharded::*;
use garage_table::*;
use crate::block_ref_table::*;

View file

@ -19,7 +19,6 @@ garage_rpc = { version = "0.1.1", path = "../rpc" }
bytes = "0.4"
rand = "0.7"
hex = "0.3"
arc-swap = "0.4"
log = "0.4"
hexdump = "0.1"

189
src/table/data.rs Normal file
View file

@ -0,0 +1,189 @@
use std::sync::Arc;
use log::warn;
use sled::Transactional;
use serde_bytes::ByteBuf;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::background::BackgroundRunner;
use crate::schema::*;
use crate::merkle::*;
use crate::crdt::CRDT;
pub struct TableData<F: TableSchema> {
pub name: String,
pub instance: F,
pub store: sled::Tree,
pub(crate) merkle_updater: Arc<MerkleUpdater>,
}
impl<F> TableData<F> where F: TableSchema {
pub fn new(
name: String,
instance: F,
db: &sled::Db,
background: Arc<BackgroundRunner>,
) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", name))
.expect("Unable to open DB tree");
let merkle_todo_store = db
.open_tree(&format!("{}:merkle_todo", name))
.expect("Unable to open DB Merkle TODO tree");
let merkle_tree_store = db
.open_tree(&format!("{}:merkle_tree", name))
.expect("Unable to open DB Merkle tree tree");
let merkle_updater = MerkleUpdater::launch(
name.clone(),
background,
merkle_todo_store,
merkle_tree_store,
);
Arc::new(Self{
name,
instance,
store,
merkle_updater,
})
}
// Read functions
pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
let tree_key = self.tree_key(p, s);
if let Some(bytes) = self.store.get(&tree_key)? {
Ok(Some(ByteBuf::from(bytes.to_vec())))
} else {
Ok(None)
}
}
pub fn read_range(
&self,
p: &F::P,
s: &Option<F::S>,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
let partition_hash = p.hash();
let first_key = match s {
None => partition_hash.to_vec(),
Some(sk) => self.tree_key(p, sk),
};
let mut ret = vec![];
for item in self.store.range(first_key..) {
let (key, value) = item?;
if &key[..32] != partition_hash.as_slice() {
break;
}
let keep = match filter {
None => true,
Some(f) => {
let entry = self.decode_entry(value.as_ref())?;
F::matches_filter(&entry, f)
}
};
if keep {
ret.push(Arc::new(ByteBuf::from(value.as_ref())));
}
if ret.len() >= limit {
break;
}
}
Ok(ret)
}
// Mutation functions
pub(crate) fn update_many(&self, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
for update_bytes in entries.iter() {
self.update_entry(update_bytes.as_slice())?;
}
Ok(())
}
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| {
let (old_entry, new_entry) = match db.get(&tree_key)? {
Some(prev_bytes) => {
let old_entry = self
.decode_entry(&prev_bytes)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
let mut new_entry = old_entry.clone();
new_entry.merge(&update);
(Some(old_entry), new_entry)
}
None => (None, update.clone()),
};
if Some(&new_entry) != old_entry.as_ref() {
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RMPEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?;
db.insert(tree_key.clone(), new_bytes)?;
Ok(Some((old_entry, new_entry)))
} else {
Ok(None)
}
})?;
if let Some((old_entry, new_entry)) = changed {
self.instance.updated(old_entry, Some(new_entry));
//self.syncer.load_full().unwrap().invalidate(&tree_key[..]);
}
Ok(())
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| {
if let Some(cur_v) = txn.get(k)? {
if cur_v == v {
txn.remove(k)?;
mkl_todo.insert(k, vec![])?;
return Ok(true);
}
}
Ok(false)
})?;
if removed {
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None);
//self.syncer.load_full().unwrap().invalidate(k);
}
Ok(removed)
}
pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
let mut ret = p.hash().to_vec();
ret.extend(s.sort_key());
ret
}
pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
Ok(x) => Ok(x),
Err(e) => match F::try_migrate(bytes) {
Some(x) => Ok(x),
None => {
warn!("Unable to decode entry of {}: {}", self.name, e);
for line in hexdump::hexdump_iter(bytes) {
debug!("{}", line);
}
Err(e.into())
}
},
}
}
}

View file

@ -8,9 +8,9 @@ pub mod schema;
pub mod util;
pub mod merkle;
pub mod replication;
pub mod data;
pub mod table;
pub mod table_fullcopy;
pub mod table_sharded;
pub mod table_sync;
pub use schema::*;

View file

@ -61,7 +61,7 @@ pub enum MerkleNode {
}
impl MerkleUpdater {
pub(crate) fn new(
pub(crate) fn launch(
table_name: String,
background: Arc<BackgroundRunner>,
todo: sled::Tree,
@ -69,22 +69,22 @@ impl MerkleUpdater {
) -> Arc<Self> {
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
Arc::new(Self {
let ret = Arc::new(Self {
table_name,
background,
todo,
todo_notify: Notify::new(),
merkle_tree,
empty_node_hash,
})
}
});
pub(crate) fn launch(self: &Arc<Self>) {
let self2 = self.clone();
self.background.spawn_worker(
format!("Merkle tree updater for {}", self.table_name),
|must_exit: watch::Receiver<bool>| self2.updater_loop(must_exit),
let ret2 = ret.clone();
ret.background.spawn_worker(
format!("Merkle tree updater for {}", ret.table_name),
|must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
);
ret
}
async fn updater_loop(
@ -156,28 +156,37 @@ impl MerkleUpdater {
new_vhash: Option<Hash>,
) -> ConflictableTransactionResult<Option<Hash>, Error> {
let i = key.prefix.len();
// Read node at current position (defined by the prefix stored in key)
// Calculate an update to apply to this node
// This update is an Option<_>, so that it is None if the update is a no-op
// and we can thus skip recalculating and re-storing everything
let mutate = match self.read_node_txn(tx, &key)? {
MerkleNode::Empty => {
if let Some(vhv) = new_vhash {
Some(MerkleNode::Leaf(k.to_vec(), vhv))
} else {
// Nothing to do, keep empty node
None
}
}
MerkleNode::Intermediate(mut children) => {
let key2 = key.next_key(khash);
if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? {
// Subtree changed, update this node as well
if subhash == self.empty_node_hash {
intermediate_rm_child(&mut children, key2.prefix[i]);
} else {
intermediate_set_child(&mut children, key2.prefix[i], subhash);
}
if children.len() == 0 {
// should not happen
warn!("Replacing intermediate node with empty node, should not happen.");
Some(MerkleNode::Empty)
} else if children.len() == 1 {
// move node down to this level
// We now have a single node (case when the update deleted one of only two
// children). Move that single child to this level of the tree.
let key_sub = key.add_byte(children[0].0);
let subnode = self.read_node_txn(tx, &key_sub)?;
tx.remove(key_sub.encode())?;
@ -186,19 +195,23 @@ impl MerkleUpdater {
Some(MerkleNode::Intermediate(children))
}
} else {
// Subtree not changed, nothing to do
None
}
}
MerkleNode::Leaf(exlf_key, exlf_hash) => {
if exlf_key == k {
// This leaf is for the same key that the one we are updating
match new_vhash {
Some(vhv) if vhv == exlf_hash => None,
Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)),
None => Some(MerkleNode::Empty),
}
} else {
// This is an only leaf for another key
if let Some(vhv) = new_vhash {
// Create two sub-nodes and replace by intermediary node
// Move that other key to a subnode, create another subnode for our
// insertion and replace current node by an intermediary node
let (pos1, h1) = {
let key2 = key.next_key(blake2sum(&exlf_key[..]));
let subhash =
@ -216,6 +229,9 @@ impl MerkleUpdater {
intermediate_set_child(&mut int, pos2, h2);
Some(MerkleNode::Intermediate(int))
} else {
// Nothing to do, we don't want to insert this value because it is None,
// and we don't want to change the other value because it's for something
// else
None
}
}
@ -263,6 +279,7 @@ impl MerkleUpdater {
}
}
// Access a node in the Merkle tree, used by the sync protocol
pub(crate) fn read_node(
&self,
k: &MerkleNodeKey,

View file

@ -4,7 +4,7 @@ use garage_rpc::membership::System;
use garage_rpc::ring::Ring;
use garage_util::data::*;
use crate::*;
use crate::replication::*;
#[derive(Clone)]
pub struct TableFullReplication {

View file

@ -0,0 +1,6 @@
mod parameters;
pub mod fullcopy;
pub mod sharded;
pub use parameters::*;

View file

@ -0,0 +1,22 @@
use garage_rpc::membership::System;
use garage_rpc::ring::Ring;
use garage_util::data::*;
pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
// Which nodes to send reads from
fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
fn read_quorum(&self) -> usize;
// Which nodes to send writes to
fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
fn write_quorum(&self, system: &System) -> usize;
fn max_write_errors(&self) -> usize;
// Which are the nodes that do actually replicate the data
fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
fn split_points(&self, ring: &Ring) -> Vec<Hash>;
}

View file

@ -2,7 +2,7 @@ use garage_rpc::membership::System;
use garage_rpc::ring::Ring;
use garage_util::data::*;
use crate::*;
use crate::replication::*;
#[derive(Clone)]
pub struct TableShardedReplication {

View file

@ -2,40 +2,35 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
use log::warn;
use arc_swap::ArcSwapOption;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use sled::Transactional;
use garage_util::data::*;
use garage_util::error::Error;
use garage_rpc::membership::System;
use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use crate::crdt::CRDT;
use crate::merkle::*;
use crate::data::*;
use crate::schema::*;
use crate::table_sync::*;
use crate::replication::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
pub struct Table<F: TableSchema, R: TableReplication> {
pub instance: F,
pub replication: R,
pub name: String,
pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>,
pub struct TableAux<F: TableSchema, R: TableReplication> {
pub system: Arc<System>,
pub store: sled::Tree,
pub syncer: ArcSwapOption<TableSyncer<F, R>>,
merkle_updater: Arc<MerkleUpdater>,
pub replication: R,
pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>,
}
pub struct Table<F: TableSchema, R: TableReplication> {
pub data: Arc<TableData<F>>,
pub aux: Arc<TableAux<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
}
#[derive(Serialize, Deserialize)]
@ -55,23 +50,6 @@ pub(crate) enum TableRPC<F: TableSchema> {
impl<F: TableSchema> RpcMessage for TableRPC<F> {}
pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
// Which nodes to send reads from
fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
fn read_quorum(&self) -> usize;
// Which nodes to send writes to
fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
fn write_quorum(&self, system: &System) -> usize;
fn max_write_errors(&self) -> usize;
// Which are the nodes that do actually replicate the data
fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
fn split_points(&self, ring: &Ring) -> Vec<Hash>;
}
impl<F, R> Table<F, R>
where
@ -88,60 +66,51 @@ where
name: String,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", name))
.expect("Unable to open DB tree");
let merkle_todo_store = db
.open_tree(&format!("{}:merkle_todo", name))
.expect("Unable to open DB Merkle TODO tree");
let merkle_tree_store = db
.open_tree(&format!("{}:merkle_tree", name))
.expect("Unable to open DB Merkle tree tree");
let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
let merkle_updater = MerkleUpdater::new(
name.clone(),
let data = TableData::new(
name,
instance,
db,
system.background.clone(),
merkle_todo_store,
merkle_tree_store,
);
let aux = Arc::new(TableAux{
system,
replication,
rpc_client,
});
let syncer = TableSyncer::launch(
data.clone(),
aux.clone(),
);
let table = Arc::new(Self {
instance,
replication,
name,
rpc_client,
system,
store,
syncer: ArcSwapOption::from(None),
merkle_updater,
data,
aux,
syncer,
});
table.clone().register_handler(rpc_server, rpc_path);
let syncer = TableSyncer::launch(table.clone());
table.syncer.swap(Some(syncer));
table.merkle_updater.launch();
table
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
let who = self.replication.write_nodes(&hash, &self.system);
let who = self.aux.replication.write_nodes(&hash, &self.aux.system);
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRPC::<F>::Update(vec![e_enc]);
self.rpc_client
self.aux.rpc_client
.try_call_many(
&who[..],
rpc,
RequestStrategy::with_quorum(self.replication.write_quorum(&self.system))
RequestStrategy::with_quorum(self.aux.replication.write_quorum(&self.aux.system))
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@ -153,7 +122,7 @@ where
for entry in entries.iter() {
let hash = entry.partition_key().hash();
let who = self.replication.write_nodes(&hash, &self.system);
let who = self.aux.replication.write_nodes(&hash, &self.aux.system);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
if !call_list.contains_key(&node) {
@ -166,7 +135,7 @@ where
let call_futures = call_list.drain().map(|(node, entries)| async move {
let rpc = TableRPC::<F>::Update(entries);
let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
Ok::<_, Error>((node, resp))
});
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
@ -177,7 +146,7 @@ where
errors.push(e);
}
}
if errors.len() > self.replication.max_write_errors() {
if errors.len() > self.aux.replication.max_write_errors() {
Err(Error::Message("Too many errors".into()))
} else {
Ok(())
@ -190,16 +159,17 @@ where
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
let who = self.replication.read_nodes(&hash, &self.system);
let who = self.aux.replication.read_nodes(&hash, &self.aux.system);
//eprintln!("get who: {:?}", who);
let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
.aux
.rpc_client
.try_call_many(
&who[..],
rpc,
RequestStrategy::with_quorum(self.replication.read_quorum())
RequestStrategy::with_quorum(self.aux.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@ -210,7 +180,7 @@ where
for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v_bytes) = value {
let v = self.decode_entry(v_bytes.as_slice())?;
let v = self.data.decode_entry(v_bytes.as_slice())?;
ret = match ret {
None => Some(v),
Some(mut x) => {
@ -230,7 +200,7 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
self.system
self.aux.system
.background
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
}
@ -246,16 +216,16 @@ where
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
let who = self.replication.read_nodes(&hash, &self.system);
let who = self.aux.replication.read_nodes(&hash, &self.aux.system);
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
let resps = self
.rpc_client
.aux.rpc_client
.try_call_many(
&who[..],
rpc,
RequestStrategy::with_quorum(self.replication.read_quorum())
RequestStrategy::with_quorum(self.aux.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@ -266,8 +236,8 @@ where
for resp in resps {
if let TableRPC::Update(entries) = resp {
for entry_bytes in entries.iter() {
let entry = self.decode_entry(entry_bytes.as_slice())?;
let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
let entry = self.data.decode_entry(entry_bytes.as_slice())?;
let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) {
None => {
ret.insert(entry_key, Some(entry));
@ -287,7 +257,7 @@ where
}
if !to_repair.is_empty() {
let self2 = self.clone();
self.system.background.spawn_cancellable(async move {
self.aux.system.background.spawn_cancellable(async move {
for (_, v) in to_repair.iter_mut() {
self2.repair_on_read(&who[..], v.take().unwrap()).await?;
}
@ -306,7 +276,7 @@ where
async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
self.rpc_client
self.aux.rpc_client
.try_call_many(
&who[..],
TableRPC::<F>::Update(vec![what_enc]),
@ -326,8 +296,8 @@ where
});
let self2 = self.clone();
self.rpc_client
.set_local_handler(self.system.id, move |msg| {
self.aux.rpc_client
.set_local_handler(self.aux.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle(&msg).await }
});
@ -336,157 +306,24 @@ where
async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> {
match msg {
TableRPC::ReadEntry(key, sort_key) => {
let value = self.handle_read_entry(key, sort_key)?;
let value = self.data.read_entry(key, sort_key)?;
Ok(TableRPC::ReadEntryResponse(value))
}
TableRPC::ReadRange(key, begin_sort_key, filter, limit) => {
let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?;
let values = self.data.read_range(key, begin_sort_key, filter, *limit)?;
Ok(TableRPC::Update(values))
}
TableRPC::Update(pairs) => {
self.handle_update(pairs)?;
self.data.update_many(pairs)?;
Ok(TableRPC::Ok)
}
TableRPC::SyncRPC(rpc) => {
let syncer = self.syncer.load_full().unwrap();
let response = syncer
.handle_rpc(rpc, self.system.background.stop_signal.clone())
let response = self.syncer
.handle_rpc(rpc, self.aux.system.background.stop_signal.clone())
.await?;
Ok(TableRPC::SyncRPC(response))
}
_ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
}
}
fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
let tree_key = self.tree_key(p, s);
if let Some(bytes) = self.store.get(&tree_key)? {
Ok(Some(ByteBuf::from(bytes.to_vec())))
} else {
Ok(None)
}
}
fn handle_read_range(
&self,
p: &F::P,
s: &Option<F::S>,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
let partition_hash = p.hash();
let first_key = match s {
None => partition_hash.to_vec(),
Some(sk) => self.tree_key(p, sk),
};
let mut ret = vec![];
for item in self.store.range(first_key..) {
let (key, value) = item?;
if &key[..32] != partition_hash.as_slice() {
break;
}
let keep = match filter {
None => true,
Some(f) => {
let entry = self.decode_entry(value.as_ref())?;
F::matches_filter(&entry, f)
}
};
if keep {
ret.push(Arc::new(ByteBuf::from(value.as_ref())));
}
if ret.len() >= limit {
break;
}
}
Ok(ret)
}
// ========== CODE THAT ACTUALLY MODIFIES THE TREE ================
pub fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
for update_bytes in entries.iter() {
self.update_entry(update_bytes.as_slice())?;
}
Ok(())
}
pub(crate) fn update_entry(self: &Arc<Self>, update_bytes: &[u8]) -> Result<(), Error> {
let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| {
let (old_entry, new_entry) = match db.get(&tree_key)? {
Some(prev_bytes) => {
let old_entry = self
.decode_entry(&prev_bytes)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
let mut new_entry = old_entry.clone();
new_entry.merge(&update);
(Some(old_entry), new_entry)
}
None => (None, update.clone()),
};
if Some(&new_entry) != old_entry.as_ref() {
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RMPEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?;
db.insert(tree_key.clone(), new_bytes)?;
Ok(Some((old_entry, new_entry)))
} else {
Ok(None)
}
})?;
if let Some((old_entry, new_entry)) = changed {
self.instance.updated(old_entry, Some(new_entry));
self.syncer.load_full().unwrap().invalidate(&tree_key[..]);
}
Ok(())
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| {
if let Some(cur_v) = txn.get(k)? {
if cur_v == v {
txn.remove(k)?;
mkl_todo.insert(k, vec![])?;
return Ok(true);
}
}
Ok(false)
})?;
if removed {
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None);
self.syncer.load_full().unwrap().invalidate(k);
}
Ok(removed)
}
fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
let mut ret = p.hash().to_vec();
ret.extend(s.sort_key());
ret
}
fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
Ok(x) => Ok(x),
Err(e) => match F::try_migrate(bytes) {
Some(x) => Ok(x),
None => {
warn!("Unable to decode entry of {}: {}", self.name, e);
for line in hexdump::hexdump_iter(bytes) {
debug!("{}", line);
}
Err(e.into())
}
},
}
}
}

View file

@ -16,18 +16,22 @@ use garage_util::data::*;
use garage_util::error::Error;
use crate::*;
use crate::data::*;
use crate::replication::*;
const MAX_DEPTH: usize = 16;
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
// Scan & sync every 12 hours
const SCAN_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60);
// Do anti-entropy every 10 minutes
const SCAN_INTERVAL: Duration = Duration::from_secs(10 * 60);
// Expire cache after 30 minutes
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(30 * 60);
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60);
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
table: Arc<Table<F, R>>,
data: Arc<TableData<F>>,
aux: Arc<TableAux<F, R>>,
todo: Mutex<SyncTodo>,
cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>,
}
@ -106,10 +110,13 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
pub(crate) fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
let todo = SyncTodo { todo: Vec::new() };
let syncer = Arc::new(TableSyncer {
table: table.clone(),
pub(crate) fn launch(data: Arc<TableData<F>>,
aux: Arc<TableAux<F, R>>) -> Arc<Self> {
let todo = SyncTodo{ todo: vec![] };
let syncer = Arc::new(Self {
data: data.clone(),
aux: aux.clone(),
todo: Mutex::new(todo),
cache: (0..MAX_DEPTH)
.map(|_| Mutex::new(BTreeMap::new()))
@ -119,21 +126,21 @@ where
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone();
table.system.background.spawn_worker(
format!("table sync watcher for {}", table.name),
aux.system.background.spawn_worker(
format!("table sync watcher for {}", data.name),
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
);
let s2 = syncer.clone();
table.system.background.spawn_worker(
format!("table syncer for {}", table.name),
aux.system.background.spawn_worker(
format!("table syncer for {}", data.name),
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
);
let s3 = syncer.clone();
tokio::spawn(async move {
tokio::time::delay_for(Duration::from_secs(20)).await;
s3.add_full_scan().await;
s3.add_full_scan();
});
syncer
@ -144,8 +151,8 @@ where
mut must_exit: watch::Receiver<bool>,
mut busy_rx: mpsc::UnboundedReceiver<bool>,
) -> Result<(), Error> {
let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
@ -158,8 +165,8 @@ where
select! {
new_ring_r = s_ring_recv => {
if let Some(new_ring) = new_ring_r {
debug!("({}) Adding ring difference to syncer todo list", self.table.name);
self.todo.lock().unwrap().add_ring_difference(&self.table, &prev_ring, &new_ring);
debug!("({}) Adding ring difference to syncer todo list", self.data.name);
self.todo.lock().unwrap().add_ring_difference(&prev_ring, &new_ring, &self.data, &self.aux);
prev_ring = new_ring;
}
}
@ -182,8 +189,8 @@ where
_ = s_timeout => {
if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
debug!("({}) Adding full scan to syncer todo list", self.table.name);
self.add_full_scan().await;
debug!("({}) Adding full scan to syncer todo list", self.data.name);
self.add_full_scan();
}
}
}
@ -191,8 +198,8 @@ where
Ok(())
}
pub async fn add_full_scan(&self) {
self.todo.lock().unwrap().add_full_scan(&self.table);
pub fn add_full_scan(&self) {
self.todo.lock().unwrap().add_full_scan(&self.data, &self.aux);
}
async fn syncer_task(
@ -211,7 +218,7 @@ where
if let Err(e) = res {
warn!(
"({}) Error while syncing {:?}: {}",
self.table.name, partition, e
self.data.name, partition, e
);
}
} else {
@ -228,18 +235,18 @@ where
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
if partition.retain {
let my_id = self.table.system.id;
let my_id = self.aux.system.id;
let nodes = self
.table
.aux
.replication
.write_nodes(&partition.begin, &self.table.system)
.write_nodes(&partition.begin, &self.aux.system)
.into_iter()
.filter(|node| *node != my_id)
.collect::<Vec<_>>();
debug!(
"({}) Preparing to sync {:?} with {:?}...",
self.table.name, partition, nodes
self.data.name, partition, nodes
);
let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?;
@ -259,10 +266,10 @@ where
while let Some(r) = sync_futures.next().await {
if let Err(e) = r {
n_errors += 1;
warn!("({}) Sync error: {}", self.table.name, e);
warn!("({}) Sync error: {}", self.data.name, e);
}
}
if n_errors > self.table.replication.max_write_errors() {
if n_errors > self.aux.replication.max_write_errors() {
return Err(Error::Message(format!(
"Sync failed with too many nodes (should have been: {:?}).",
nodes
@ -293,7 +300,7 @@ where
while !*must_exit.borrow() {
let mut items = Vec::new();
for item in self.table.store.range(begin.to_vec()..end.to_vec()) {
for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
let (key, value) = item?;
items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
@ -304,12 +311,12 @@ where
if items.len() > 0 {
let nodes = self
.table
.aux
.replication
.write_nodes(&begin, &self.table.system)
.write_nodes(&begin, &self.aux.system)
.into_iter()
.collect::<Vec<_>>();
if nodes.contains(&self.table.system.id) {
if nodes.contains(&self.aux.system.id) {
warn!("Interrupting offload as partitions seem to have changed");
break;
}
@ -340,7 +347,7 @@ where
let update_msg = Arc::new(TableRPC::<F>::Update(values));
for res in join_all(nodes.iter().map(|to| {
self.table
self.aux
.rpc_client
.call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
}))
@ -352,7 +359,7 @@ where
// All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0;
for (k, v) in items.iter() {
if !self.table.delete_if_equal(&k[..], &v[..])? {
if !self.data.delete_if_equal(&k[..], &v[..])? {
not_removed += 1;
}
}
@ -399,7 +406,7 @@ where
if range.level == 1 {
let mut children = vec![];
for item in self
.table
.data
.store
.range(range.begin.clone()..range.end.clone())
{
@ -516,7 +523,7 @@ where
let v = self.range_checksum(&range, must_exit)?;
trace!(
"({}) New checksum calculated for {}-{}/{}, {} children",
self.table.name,
self.data.name,
hex::encode(&range.begin)
.chars()
.take(16)
@ -553,7 +560,7 @@ where
// If their root checksum has level > than us, use that as a reference
let root_cks_resp = self
.table
.aux
.rpc_client
.call(
who,
@ -582,7 +589,7 @@ where
let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
trace!(
"({}) Sync with {:?}: {} ({}) remaining",
self.table.name,
self.data.name,
who,
todo.len(),
total_children
@ -592,7 +599,7 @@ where
let step = todo.drain(..step_size).collect::<Vec<_>>();
let rpc_resp = self
.table
.aux
.rpc_client
.call(
who,
@ -606,7 +613,7 @@ where
if diff_ranges.len() > 0 || diff_items.len() > 0 {
info!(
"({}) Sync with {:?}: difference {} ranges, {} items",
self.table.name,
self.data.name,
who,
diff_ranges.len(),
diff_items.len()
@ -622,7 +629,7 @@ where
}
}
if diff_items.len() > 0 {
self.table.handle_update(&diff_items[..])?;
self.data.update_many(&diff_items[..])?;
}
if items_to_send.len() > 0 {
self.send_items(who, items_to_send).await?;
@ -640,19 +647,19 @@ where
async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
self.table.name,
self.data.name,
item_list.len(),
who
);
let mut values = vec![];
for item in item_list.iter() {
if let Some(v) = self.table.store.get(&item[..])? {
if let Some(v) = self.data.store.get(&item[..])? {
values.push(Arc::new(ByteBuf::from(v.as_ref())));
}
}
let rpc_resp = self
.table
.aux
.rpc_client
.call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
.await?;
@ -714,7 +721,7 @@ where
ret_ranges.push(their_range.clone());
if their_range.level == 0 {
if let Some(item_bytes) =
self.table.store.get(their_range.begin.as_slice())?
self.data.store.get(their_range.begin.as_slice())?
{
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
@ -738,7 +745,7 @@ where
}
if our_range.level == 0 {
if let Some(item_bytes) =
self.table.store.get(our_range.begin.as_slice())?
self.data.store.get(our_range.begin.as_slice())?
{
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
@ -753,7 +760,7 @@ where
if ret_ranges.len() > 0 || ret_items.len() > 0 {
trace!(
"({}) Checksum comparison RPC: {} different + {} items for {} received",
self.table.name,
self.data.name,
ret_ranges.len(),
ret_items.len(),
n_checksums
@ -782,13 +789,13 @@ where
}
impl SyncTodo {
fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) {
let my_id = table.system.id;
fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, data: &TableData<F>, aux: &TableAux<F, R>) {
let my_id = aux.system.id;
self.todo.clear();
let ring = table.system.ring.borrow().clone();
let split_points = table.replication.split_points(&ring);
let ring = aux.system.ring.borrow().clone();
let split_points = aux.replication.split_points(&ring);
for i in 0..split_points.len() - 1 {
let begin = split_points[i];
@ -797,12 +804,12 @@ impl SyncTodo {
continue;
}
let nodes = table.replication.replication_nodes(&begin, &ring);
let nodes = aux.replication.replication_nodes(&begin, &ring);
let retain = nodes.contains(&my_id);
if !retain {
// Check if we have some data to send, otherwise skip
if table.store.range(begin..end).next().is_none() {
if data.store.range(begin..end).next().is_none() {
continue;
}
}
@ -813,25 +820,25 @@ impl SyncTodo {
fn add_ring_difference<F: TableSchema, R: TableReplication>(
&mut self,
table: &Table<F, R>,
old_ring: &Ring,
new_ring: &Ring,
data: &TableData<F>, aux: &TableAux<F, R>,
) {
let my_id = table.system.id;
let my_id = aux.system.id;
// If it is us who are entering or leaving the system,
// initiate a full sync instead of incremental sync
if old_ring.config.members.contains_key(&my_id)
!= new_ring.config.members.contains_key(&my_id)
{
self.add_full_scan(table);
self.add_full_scan(data, aux);
return;
}
let mut all_points = None
.into_iter()
.chain(table.replication.split_points(old_ring).drain(..))
.chain(table.replication.split_points(new_ring).drain(..))
.chain(aux.replication.split_points(old_ring).drain(..))
.chain(aux.replication.split_points(new_ring).drain(..))
.chain(self.todo.iter().map(|x| x.begin))
.chain(self.todo.iter().map(|x| x.end))
.collect::<Vec<_>>();
@ -845,11 +852,11 @@ impl SyncTodo {
for i in 0..all_points.len() - 1 {
let begin = all_points[i];
let end = all_points[i + 1];
let was_ours = table
let was_ours = aux
.replication
.replication_nodes(&begin, &old_ring)
.contains(&my_id);
let is_ours = table
let is_ours = aux
.replication
.replication_nodes(&begin, &new_ring)
.contains(&my_id);