From 94f3d287742ff90f179f528421c690b00b71a912 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 16:54:15 +0100 Subject: [PATCH] WIP big refactoring --- Cargo.lock | 3 +- src/garage/repair.rs | 29 +- src/model/block.rs | 5 +- src/model/garage.rs | 4 +- src/model/object_table.rs | 2 +- src/model/version_table.rs | 2 +- src/table/Cargo.toml | 1 - src/table/data.rs | 189 ++++++++++++ src/table/lib.rs | 4 +- src/table/merkle.rs | 39 ++- .../fullcopy.rs} | 2 +- src/table/replication/mod.rs | 6 + src/table/replication/parameters.rs | 22 ++ .../sharded.rs} | 2 +- src/table/table.rs | 273 ++++-------------- src/table/table_sync.rs | 129 +++++---- 16 files changed, 387 insertions(+), 325 deletions(-) create mode 100644 src/table/data.rs rename src/table/{table_fullcopy.rs => replication/fullcopy.rs} (98%) create mode 100644 src/table/replication/mod.rs create mode 100644 src/table/replication/parameters.rs rename src/table/{table_sharded.rs => replication/sharded.rs} (98%) diff --git a/Cargo.lock b/Cargo.lock index 6d4cc6a3..45244b8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "aho-corasick" version = "0.7.15" @@ -583,7 +585,6 @@ dependencies = [ name = "garage_table" version = "0.1.1" dependencies = [ - "arc-swap", "async-trait", "bytes 0.4.12", "futures", diff --git a/src/garage/repair.rs b/src/garage/repair.rs index e330f7bb..f9cd5884 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -28,38 +28,23 @@ impl Repair { self.garage .bucket_table .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; + .add_full_scan(); self.garage .object_table .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; + .add_full_scan(); self.garage .version_table .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; + .add_full_scan(); self.garage .block_ref_table .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; + .add_full_scan(); self.garage .key_table .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; + .add_full_scan(); } // TODO: wait for full sync to finish before proceeding to the rest? @@ -93,7 +78,7 @@ impl Repair { async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { let mut pos = vec![]; - while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? { + while let Some((item_key, item_bytes)) = self.garage.version_table.data.store.get_gt(&pos)? { pos = item_key.to_vec(); let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; @@ -141,7 +126,7 @@ impl Repair { async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { let mut pos = vec![]; - while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? { + while let Some((item_key, item_bytes)) = self.garage.block_ref_table.data.store.get_gt(&pos)? { pos = item_key.to_vec(); let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; diff --git a/src/model/block.rs b/src/model/block.rs index ba5544a3..987ec9e4 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -19,8 +19,7 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; -use garage_table::table_sharded::TableShardedReplication; -use garage_table::TableReplication; +use garage_table::replication::{sharded::TableShardedReplication, TableReplication}; use crate::block_ref_table::*; @@ -412,7 +411,7 @@ impl BlockManager { let garage = self.garage.load_full().unwrap(); let mut last_hash = None; let mut i = 0usize; - for entry in garage.block_ref_table.store.iter() { + for entry in garage.block_ref_table.data.store.iter() { let (_k, v_bytes) = entry?; let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?; if Some(&block_ref.block) == last_hash.as_ref() { diff --git a/src/model/garage.rs b/src/model/garage.rs index aac79a85..193c71d2 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -7,8 +7,8 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::RpcHttpClient; use garage_rpc::rpc_server::RpcServer; -use garage_table::table_fullcopy::*; -use garage_table::table_sharded::*; +use garage_table::replication::sharded::*; +use garage_table::replication::fullcopy::*; use garage_table::*; use crate::block::*; diff --git a/src/model/object_table.rs b/src/model/object_table.rs index cd09f678..99fad3ce 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -6,7 +6,7 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; -use garage_table::table_sharded::*; +use garage_table::replication::sharded::*; use garage_table::*; use crate::version_table::*; diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 7ccc6a33..cdc73a85 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -5,7 +5,7 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; -use garage_table::table_sharded::*; +use garage_table::replication::sharded::*; use garage_table::*; use crate::block_ref_table::*; diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 6485a542..6b3aaceb 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -19,7 +19,6 @@ garage_rpc = { version = "0.1.1", path = "../rpc" } bytes = "0.4" rand = "0.7" hex = "0.3" -arc-swap = "0.4" log = "0.4" hexdump = "0.1" diff --git a/src/table/data.rs b/src/table/data.rs new file mode 100644 index 00000000..fa89fc27 --- /dev/null +++ b/src/table/data.rs @@ -0,0 +1,189 @@ +use std::sync::Arc; + +use log::warn; +use sled::Transactional; +use serde_bytes::ByteBuf; + +use garage_util::data::*; +use garage_util::error::*; +use garage_util::background::BackgroundRunner; + +use crate::schema::*; +use crate::merkle::*; +use crate::crdt::CRDT; + +pub struct TableData { + pub name: String, + pub instance: F, + + pub store: sled::Tree, + pub(crate) merkle_updater: Arc, +} + +impl TableData where F: TableSchema { + pub fn new( + name: String, + instance: F, + db: &sled::Db, + background: Arc, + ) -> Arc { + let store = db + .open_tree(&format!("{}:table", name)) + .expect("Unable to open DB tree"); + + let merkle_todo_store = db + .open_tree(&format!("{}:merkle_todo", name)) + .expect("Unable to open DB Merkle TODO tree"); + let merkle_tree_store = db + .open_tree(&format!("{}:merkle_tree", name)) + .expect("Unable to open DB Merkle tree tree"); + + let merkle_updater = MerkleUpdater::launch( + name.clone(), + background, + merkle_todo_store, + merkle_tree_store, + ); + + Arc::new(Self{ + name, + instance, + store, + merkle_updater, + }) + } + + // Read functions + + pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result, Error> { + let tree_key = self.tree_key(p, s); + if let Some(bytes) = self.store.get(&tree_key)? { + Ok(Some(ByteBuf::from(bytes.to_vec()))) + } else { + Ok(None) + } + } + + pub fn read_range( + &self, + p: &F::P, + s: &Option, + filter: &Option, + limit: usize, + ) -> Result>, Error> { + let partition_hash = p.hash(); + let first_key = match s { + None => partition_hash.to_vec(), + Some(sk) => self.tree_key(p, sk), + }; + let mut ret = vec![]; + for item in self.store.range(first_key..) { + let (key, value) = item?; + if &key[..32] != partition_hash.as_slice() { + break; + } + let keep = match filter { + None => true, + Some(f) => { + let entry = self.decode_entry(value.as_ref())?; + F::matches_filter(&entry, f) + } + }; + if keep { + ret.push(Arc::new(ByteBuf::from(value.as_ref()))); + } + if ret.len() >= limit { + break; + } + } + Ok(ret) + } + + // Mutation functions + + pub(crate) fn update_many(&self, entries: &[Arc]) -> Result<(), Error> { + for update_bytes in entries.iter() { + self.update_entry(update_bytes.as_slice())?; + } + Ok(()) + } + + pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { + let update = self.decode_entry(update_bytes)?; + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + + let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { + let (old_entry, new_entry) = match db.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = self + .decode_entry(&prev_bytes) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) + } + None => (None, update.clone()), + }; + + if Some(&new_entry) != old_entry.as_ref() { + let new_bytes = rmp_to_vec_all_named(&new_entry) + .map_err(Error::RMPEncode) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; + db.insert(tree_key.clone(), new_bytes)?; + Ok(Some((old_entry, new_entry))) + } else { + Ok(None) + } + })?; + + if let Some((old_entry, new_entry)) = changed { + self.instance.updated(old_entry, Some(new_entry)); + //self.syncer.load_full().unwrap().invalidate(&tree_key[..]); + } + + Ok(()) + } + + pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { + let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { + if let Some(cur_v) = txn.get(k)? { + if cur_v == v { + txn.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(true); + } + } + Ok(false) + })?; + + if removed { + let old_entry = self.decode_entry(v)?; + self.instance.updated(Some(old_entry), None); + //self.syncer.load_full().unwrap().invalidate(k); + } + Ok(removed) + } + + pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { + let mut ret = p.hash().to_vec(); + ret.extend(s.sort_key()); + ret + } + + pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result { + match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { + Ok(x) => Ok(x), + Err(e) => match F::try_migrate(bytes) { + Some(x) => Ok(x), + None => { + warn!("Unable to decode entry of {}: {}", self.name, e); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); + } + Err(e.into()) + } + }, + } + } +} diff --git a/src/table/lib.rs b/src/table/lib.rs index 62fd30c5..bb249a56 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -8,9 +8,9 @@ pub mod schema; pub mod util; pub mod merkle; +pub mod replication; +pub mod data; pub mod table; -pub mod table_fullcopy; -pub mod table_sharded; pub mod table_sync; pub use schema::*; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 50cb90d5..ef197dc8 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -61,7 +61,7 @@ pub enum MerkleNode { } impl MerkleUpdater { - pub(crate) fn new( + pub(crate) fn launch( table_name: String, background: Arc, todo: sled::Tree, @@ -69,22 +69,22 @@ impl MerkleUpdater { ) -> Arc { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); - Arc::new(Self { + let ret = Arc::new(Self { table_name, background, todo, todo_notify: Notify::new(), merkle_tree, empty_node_hash, - }) - } + }); - pub(crate) fn launch(self: &Arc) { - let self2 = self.clone(); - self.background.spawn_worker( - format!("Merkle tree updater for {}", self.table_name), - |must_exit: watch::Receiver| self2.updater_loop(must_exit), + let ret2 = ret.clone(); + ret.background.spawn_worker( + format!("Merkle tree updater for {}", ret.table_name), + |must_exit: watch::Receiver| ret2.updater_loop(must_exit), ); + + ret } async fn updater_loop( @@ -156,28 +156,37 @@ impl MerkleUpdater { new_vhash: Option, ) -> ConflictableTransactionResult, Error> { let i = key.prefix.len(); + + // Read node at current position (defined by the prefix stored in key) + // Calculate an update to apply to this node + // This update is an Option<_>, so that it is None if the update is a no-op + // and we can thus skip recalculating and re-storing everything let mutate = match self.read_node_txn(tx, &key)? { MerkleNode::Empty => { if let Some(vhv) = new_vhash { Some(MerkleNode::Leaf(k.to_vec(), vhv)) } else { + // Nothing to do, keep empty node None } } MerkleNode::Intermediate(mut children) => { let key2 = key.next_key(khash); if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? { + // Subtree changed, update this node as well if subhash == self.empty_node_hash { intermediate_rm_child(&mut children, key2.prefix[i]); } else { intermediate_set_child(&mut children, key2.prefix[i], subhash); } + if children.len() == 0 { // should not happen warn!("Replacing intermediate node with empty node, should not happen."); Some(MerkleNode::Empty) } else if children.len() == 1 { - // move node down to this level + // We now have a single node (case when the update deleted one of only two + // children). Move that single child to this level of the tree. let key_sub = key.add_byte(children[0].0); let subnode = self.read_node_txn(tx, &key_sub)?; tx.remove(key_sub.encode())?; @@ -186,19 +195,23 @@ impl MerkleUpdater { Some(MerkleNode::Intermediate(children)) } } else { + // Subtree not changed, nothing to do None } } MerkleNode::Leaf(exlf_key, exlf_hash) => { if exlf_key == k { + // This leaf is for the same key that the one we are updating match new_vhash { Some(vhv) if vhv == exlf_hash => None, Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)), None => Some(MerkleNode::Empty), } } else { + // This is an only leaf for another key if let Some(vhv) = new_vhash { - // Create two sub-nodes and replace by intermediary node + // Move that other key to a subnode, create another subnode for our + // insertion and replace current node by an intermediary node let (pos1, h1) = { let key2 = key.next_key(blake2sum(&exlf_key[..])); let subhash = @@ -216,6 +229,9 @@ impl MerkleUpdater { intermediate_set_child(&mut int, pos2, h2); Some(MerkleNode::Intermediate(int)) } else { + // Nothing to do, we don't want to insert this value because it is None, + // and we don't want to change the other value because it's for something + // else None } } @@ -263,6 +279,7 @@ impl MerkleUpdater { } } + // Access a node in the Merkle tree, used by the sync protocol pub(crate) fn read_node( &self, k: &MerkleNodeKey, diff --git a/src/table/table_fullcopy.rs b/src/table/replication/fullcopy.rs similarity index 98% rename from src/table/table_fullcopy.rs rename to src/table/replication/fullcopy.rs index c55879d8..a62a6c3c 100644 --- a/src/table/table_fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -4,7 +4,7 @@ use garage_rpc::membership::System; use garage_rpc::ring::Ring; use garage_util::data::*; -use crate::*; +use crate::replication::*; #[derive(Clone)] pub struct TableFullReplication { diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs new file mode 100644 index 00000000..d43d7f19 --- /dev/null +++ b/src/table/replication/mod.rs @@ -0,0 +1,6 @@ +mod parameters; + +pub mod fullcopy; +pub mod sharded; + +pub use parameters::*; diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs new file mode 100644 index 00000000..4607b050 --- /dev/null +++ b/src/table/replication/parameters.rs @@ -0,0 +1,22 @@ +use garage_rpc::membership::System; +use garage_rpc::ring::Ring; + +use garage_util::data::*; + +pub trait TableReplication: Send + Sync { + // See examples in table_sharded.rs and table_fullcopy.rs + // To understand various replication methods + + // Which nodes to send reads from + fn read_nodes(&self, hash: &Hash, system: &System) -> Vec; + fn read_quorum(&self) -> usize; + + // Which nodes to send writes to + fn write_nodes(&self, hash: &Hash, system: &System) -> Vec; + fn write_quorum(&self, system: &System) -> usize; + fn max_write_errors(&self) -> usize; + + // Which are the nodes that do actually replicate the data + fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec; + fn split_points(&self, ring: &Ring) -> Vec; +} diff --git a/src/table/table_sharded.rs b/src/table/replication/sharded.rs similarity index 98% rename from src/table/table_sharded.rs rename to src/table/replication/sharded.rs index 47bdfeaf..42a742cd 100644 --- a/src/table/table_sharded.rs +++ b/src/table/replication/sharded.rs @@ -2,7 +2,7 @@ use garage_rpc::membership::System; use garage_rpc::ring::Ring; use garage_util::data::*; -use crate::*; +use crate::replication::*; #[derive(Clone)] pub struct TableShardedReplication { diff --git a/src/table/table.rs b/src/table/table.rs index 0e75754c..a4cb4b24 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,40 +2,35 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; -use log::warn; - -use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use sled::Transactional; use garage_util::data::*; use garage_util::error::Error; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::crdt::CRDT; -use crate::merkle::*; +use crate::data::*; use crate::schema::*; use crate::table_sync::*; +use crate::replication::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -pub struct Table { - pub instance: F, - pub replication: R, - - pub name: String, - pub(crate) rpc_client: Arc>>, - +pub struct TableAux { pub system: Arc, - pub store: sled::Tree, - pub syncer: ArcSwapOption>, - merkle_updater: Arc, + pub replication: R, + pub(crate) rpc_client: Arc>>, +} + +pub struct Table { + pub data: Arc>, + pub aux: Arc>, + pub syncer: Arc>, } #[derive(Serialize, Deserialize)] @@ -55,23 +50,6 @@ pub(crate) enum TableRPC { impl RpcMessage for TableRPC {} -pub trait TableReplication: Send + Sync { - // See examples in table_sharded.rs and table_fullcopy.rs - // To understand various replication methods - - // Which nodes to send reads from - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec; - fn read_quorum(&self) -> usize; - - // Which nodes to send writes to - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec; - fn write_quorum(&self, system: &System) -> usize; - fn max_write_errors(&self) -> usize; - - // Which are the nodes that do actually replicate the data - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec; - fn split_points(&self, ring: &Ring) -> Vec; -} impl Table where @@ -88,60 +66,51 @@ where name: String, rpc_server: &mut RpcServer, ) -> Arc { - let store = db - .open_tree(&format!("{}:table", name)) - .expect("Unable to open DB tree"); - - let merkle_todo_store = db - .open_tree(&format!("{}:merkle_todo", name)) - .expect("Unable to open DB Merkle TODO tree"); - let merkle_tree_store = db - .open_tree(&format!("{}:merkle_tree", name)) - .expect("Unable to open DB Merkle tree tree"); - let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::>(&rpc_path); - let merkle_updater = MerkleUpdater::new( - name.clone(), + let data = TableData::new( + name, + instance, + db, system.background.clone(), - merkle_todo_store, - merkle_tree_store, + ); + + let aux = Arc::new(TableAux{ + system, + replication, + rpc_client, + }); + + let syncer = TableSyncer::launch( + data.clone(), + aux.clone(), ); let table = Arc::new(Self { - instance, - replication, - name, - rpc_client, - system, - store, - syncer: ArcSwapOption::from(None), - merkle_updater, + data, + aux, + syncer, }); + table.clone().register_handler(rpc_server, rpc_path); - let syncer = TableSyncer::launch(table.clone()); - table.syncer.swap(Some(syncer)); - - table.merkle_updater.launch(); - table } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.aux.replication.write_nodes(&hash, &self.aux.system); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::::Update(vec![e_enc]); - self.rpc_client + self.aux.rpc_client .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) + RequestStrategy::with_quorum(self.aux.replication.write_quorum(&self.aux.system)) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -153,7 +122,7 @@ where for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.aux.replication.write_nodes(&hash, &self.aux.system); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -166,7 +135,7 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::::Update(entries); - let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; + let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::>(); @@ -177,7 +146,7 @@ where errors.push(e); } } - if errors.len() > self.replication.max_write_errors() { + if errors.len() > self.aux.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { Ok(()) @@ -190,16 +159,17 @@ where sort_key: &F::S, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.aux.replication.read_nodes(&hash, &self.aux.system); //eprintln!("get who: {:?}", who); let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self + .aux .rpc_client .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) + RequestStrategy::with_quorum(self.aux.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -210,7 +180,7 @@ where for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { - let v = self.decode_entry(v_bytes.as_slice())?; + let v = self.data.decode_entry(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -230,7 +200,7 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.system + self.aux.system .background .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } @@ -246,16 +216,16 @@ where limit: usize, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.aux.replication.read_nodes(&hash, &self.aux.system); let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .rpc_client + .aux.rpc_client .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) + RequestStrategy::with_quorum(self.aux.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -266,8 +236,8 @@ where for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = self.decode_entry(entry_bytes.as_slice())?; - let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); + let entry = self.data.decode_entry(entry_bytes.as_slice())?; + let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { ret.insert(entry_key, Some(entry)); @@ -287,7 +257,7 @@ where } if !to_repair.is_empty() { let self2 = self.clone(); - self.system.background.spawn_cancellable(async move { + self.aux.system.background.spawn_cancellable(async move { for (_, v) in to_repair.iter_mut() { self2.repair_on_read(&who[..], v.take().unwrap()).await?; } @@ -306,7 +276,7 @@ where async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.rpc_client + self.aux.rpc_client .try_call_many( &who[..], TableRPC::::Update(vec![what_enc]), @@ -326,8 +296,8 @@ where }); let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { + self.aux.rpc_client + .set_local_handler(self.aux.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } }); @@ -336,157 +306,24 @@ where async fn handle(self: &Arc, msg: &TableRPC) -> Result, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { - let value = self.handle_read_entry(key, sort_key)?; + let value = self.data.read_entry(key, sort_key)?; Ok(TableRPC::ReadEntryResponse(value)) } TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?; + let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; Ok(TableRPC::Update(values)) } TableRPC::Update(pairs) => { - self.handle_update(pairs)?; + self.data.update_many(pairs)?; Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { - let syncer = self.syncer.load_full().unwrap(); - let response = syncer - .handle_rpc(rpc, self.system.background.stop_signal.clone()) + let response = self.syncer + .handle_rpc(rpc, self.aux.system.background.stop_signal.clone()) .await?; Ok(TableRPC::SyncRPC(response)) } _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), } } - - fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result, Error> { - let tree_key = self.tree_key(p, s); - if let Some(bytes) = self.store.get(&tree_key)? { - Ok(Some(ByteBuf::from(bytes.to_vec()))) - } else { - Ok(None) - } - } - - fn handle_read_range( - &self, - p: &F::P, - s: &Option, - filter: &Option, - limit: usize, - ) -> Result>, Error> { - let partition_hash = p.hash(); - let first_key = match s { - None => partition_hash.to_vec(), - Some(sk) => self.tree_key(p, sk), - }; - let mut ret = vec![]; - for item in self.store.range(first_key..) { - let (key, value) = item?; - if &key[..32] != partition_hash.as_slice() { - break; - } - let keep = match filter { - None => true, - Some(f) => { - let entry = self.decode_entry(value.as_ref())?; - F::matches_filter(&entry, f) - } - }; - if keep { - ret.push(Arc::new(ByteBuf::from(value.as_ref()))); - } - if ret.len() >= limit { - break; - } - } - Ok(ret) - } - - // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================ - - pub fn handle_update(self: &Arc, entries: &[Arc]) -> Result<(), Error> { - for update_bytes in entries.iter() { - self.update_entry(update_bytes.as_slice())?; - } - Ok(()) - } - - pub(crate) fn update_entry(self: &Arc, update_bytes: &[u8]) -> Result<(), Error> { - let update = self.decode_entry(update_bytes)?; - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - - let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { - let (old_entry, new_entry) = match db.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; - - if Some(&new_entry) != old_entry.as_ref() { - let new_bytes = rmp_to_vec_all_named(&new_entry) - .map_err(Error::RMPEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; - db.insert(tree_key.clone(), new_bytes)?; - Ok(Some((old_entry, new_entry))) - } else { - Ok(None) - } - })?; - - if let Some((old_entry, new_entry)) = changed { - self.instance.updated(old_entry, Some(new_entry)); - self.syncer.load_full().unwrap().invalidate(&tree_key[..]); - } - - Ok(()) - } - - pub(crate) fn delete_if_equal(self: &Arc, k: &[u8], v: &[u8]) -> Result { - let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { - if let Some(cur_v) = txn.get(k)? { - if cur_v == v { - txn.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(true); - } - } - Ok(false) - })?; - - if removed { - let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None); - self.syncer.load_full().unwrap().invalidate(k); - } - Ok(removed) - } - - fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { - let mut ret = p.hash().to_vec(); - ret.extend(s.sort_key()); - ret - } - - fn decode_entry(&self, bytes: &[u8]) -> Result { - match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { - Ok(x) => Ok(x), - Err(e) => match F::try_migrate(bytes) { - Some(x) => Ok(x), - None => { - warn!("Unable to decode entry of {}: {}", self.name, e); - for line in hexdump::hexdump_iter(bytes) { - debug!("{}", line); - } - Err(e.into()) - } - }, - } - } } diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 51f8cd6f..7394be1b 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -16,18 +16,22 @@ use garage_util::data::*; use garage_util::error::Error; use crate::*; +use crate::data::*; +use crate::replication::*; const MAX_DEPTH: usize = 16; + const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); -// Scan & sync every 12 hours -const SCAN_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); +// Do anti-entropy every 10 minutes +const SCAN_INTERVAL: Duration = Duration::from_secs(10 * 60); -// Expire cache after 30 minutes -const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(30 * 60); +const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60); pub struct TableSyncer { - table: Arc>, + data: Arc>, + aux: Arc>, + todo: Mutex, cache: Vec>>, } @@ -106,10 +110,13 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(table: Arc>) -> Arc { - let todo = SyncTodo { todo: Vec::new() }; - let syncer = Arc::new(TableSyncer { - table: table.clone(), + pub(crate) fn launch(data: Arc>, + aux: Arc>) -> Arc { + let todo = SyncTodo{ todo: vec![] }; + + let syncer = Arc::new(Self { + data: data.clone(), + aux: aux.clone(), todo: Mutex::new(todo), cache: (0..MAX_DEPTH) .map(|_| Mutex::new(BTreeMap::new())) @@ -119,21 +126,21 @@ where let (busy_tx, busy_rx) = mpsc::unbounded_channel(); let s1 = syncer.clone(); - table.system.background.spawn_worker( - format!("table sync watcher for {}", table.name), + aux.system.background.spawn_worker( + format!("table sync watcher for {}", data.name), move |must_exit: watch::Receiver| s1.watcher_task(must_exit, busy_rx), ); let s2 = syncer.clone(); - table.system.background.spawn_worker( - format!("table syncer for {}", table.name), + aux.system.background.spawn_worker( + format!("table syncer for {}", data.name), move |must_exit: watch::Receiver| s2.syncer_task(must_exit, busy_tx), ); let s3 = syncer.clone(); tokio::spawn(async move { tokio::time::delay_for(Duration::from_secs(20)).await; - s3.add_full_scan().await; + s3.add_full_scan(); }); syncer @@ -144,8 +151,8 @@ where mut must_exit: watch::Receiver, mut busy_rx: mpsc::UnboundedReceiver, ) -> Result<(), Error> { - let mut prev_ring: Arc = self.table.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver> = self.table.system.ring.clone(); + let mut prev_ring: Arc = self.aux.system.ring.borrow().clone(); + let mut ring_recv: watch::Receiver> = self.aux.system.ring.clone(); let mut nothing_to_do_since = Some(Instant::now()); while !*must_exit.borrow() { @@ -158,8 +165,8 @@ where select! { new_ring_r = s_ring_recv => { if let Some(new_ring) = new_ring_r { - debug!("({}) Adding ring difference to syncer todo list", self.table.name); - self.todo.lock().unwrap().add_ring_difference(&self.table, &prev_ring, &new_ring); + debug!("({}) Adding ring difference to syncer todo list", self.data.name); + self.todo.lock().unwrap().add_ring_difference(&prev_ring, &new_ring, &self.data, &self.aux); prev_ring = new_ring; } } @@ -182,8 +189,8 @@ where _ = s_timeout => { if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; - debug!("({}) Adding full scan to syncer todo list", self.table.name); - self.add_full_scan().await; + debug!("({}) Adding full scan to syncer todo list", self.data.name); + self.add_full_scan(); } } } @@ -191,8 +198,8 @@ where Ok(()) } - pub async fn add_full_scan(&self) { - self.todo.lock().unwrap().add_full_scan(&self.table); + pub fn add_full_scan(&self) { + self.todo.lock().unwrap().add_full_scan(&self.data, &self.aux); } async fn syncer_task( @@ -211,7 +218,7 @@ where if let Err(e) = res { warn!( "({}) Error while syncing {:?}: {}", - self.table.name, partition, e + self.data.name, partition, e ); } } else { @@ -228,18 +235,18 @@ where must_exit: &mut watch::Receiver, ) -> Result<(), Error> { if partition.retain { - let my_id = self.table.system.id; + let my_id = self.aux.system.id; let nodes = self - .table + .aux .replication - .write_nodes(&partition.begin, &self.table.system) + .write_nodes(&partition.begin, &self.aux.system) .into_iter() .filter(|node| *node != my_id) .collect::>(); debug!( "({}) Preparing to sync {:?} with {:?}...", - self.table.name, partition, nodes + self.data.name, partition, nodes ); let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?; @@ -259,10 +266,10 @@ where while let Some(r) = sync_futures.next().await { if let Err(e) = r { n_errors += 1; - warn!("({}) Sync error: {}", self.table.name, e); + warn!("({}) Sync error: {}", self.data.name, e); } } - if n_errors > self.table.replication.max_write_errors() { + if n_errors > self.aux.replication.max_write_errors() { return Err(Error::Message(format!( "Sync failed with too many nodes (should have been: {:?}).", nodes @@ -293,7 +300,7 @@ where while !*must_exit.borrow() { let mut items = Vec::new(); - for item in self.table.store.range(begin.to_vec()..end.to_vec()) { + for item in self.data.store.range(begin.to_vec()..end.to_vec()) { let (key, value) = item?; items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); @@ -304,12 +311,12 @@ where if items.len() > 0 { let nodes = self - .table + .aux .replication - .write_nodes(&begin, &self.table.system) + .write_nodes(&begin, &self.aux.system) .into_iter() .collect::>(); - if nodes.contains(&self.table.system.id) { + if nodes.contains(&self.aux.system.id) { warn!("Interrupting offload as partitions seem to have changed"); break; } @@ -340,7 +347,7 @@ where let update_msg = Arc::new(TableRPC::::Update(values)); for res in join_all(nodes.iter().map(|to| { - self.table + self.aux .rpc_client .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) })) @@ -352,7 +359,7 @@ where // All remote nodes have written those items, now we can delete them locally let mut not_removed = 0; for (k, v) in items.iter() { - if !self.table.delete_if_equal(&k[..], &v[..])? { + if !self.data.delete_if_equal(&k[..], &v[..])? { not_removed += 1; } } @@ -399,7 +406,7 @@ where if range.level == 1 { let mut children = vec![]; for item in self - .table + .data .store .range(range.begin.clone()..range.end.clone()) { @@ -516,7 +523,7 @@ where let v = self.range_checksum(&range, must_exit)?; trace!( "({}) New checksum calculated for {}-{}/{}, {} children", - self.table.name, + self.data.name, hex::encode(&range.begin) .chars() .take(16) @@ -553,7 +560,7 @@ where // If their root checksum has level > than us, use that as a reference let root_cks_resp = self - .table + .aux .rpc_client .call( who, @@ -582,7 +589,7 @@ where let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); trace!( "({}) Sync with {:?}: {} ({}) remaining", - self.table.name, + self.data.name, who, todo.len(), total_children @@ -592,7 +599,7 @@ where let step = todo.drain(..step_size).collect::>(); let rpc_resp = self - .table + .aux .rpc_client .call( who, @@ -606,7 +613,7 @@ where if diff_ranges.len() > 0 || diff_items.len() > 0 { info!( "({}) Sync with {:?}: difference {} ranges, {} items", - self.table.name, + self.data.name, who, diff_ranges.len(), diff_items.len() @@ -622,7 +629,7 @@ where } } if diff_items.len() > 0 { - self.table.handle_update(&diff_items[..])?; + self.data.update_many(&diff_items[..])?; } if items_to_send.len() > 0 { self.send_items(who, items_to_send).await?; @@ -640,19 +647,19 @@ where async fn send_items(&self, who: UUID, item_list: Vec>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", - self.table.name, + self.data.name, item_list.len(), who ); let mut values = vec![]; for item in item_list.iter() { - if let Some(v) = self.table.store.get(&item[..])? { + if let Some(v) = self.data.store.get(&item[..])? { values.push(Arc::new(ByteBuf::from(v.as_ref()))); } } let rpc_resp = self - .table + .aux .rpc_client .call(who, TableRPC::::Update(values), TABLE_SYNC_RPC_TIMEOUT) .await?; @@ -714,7 +721,7 @@ where ret_ranges.push(their_range.clone()); if their_range.level == 0 { if let Some(item_bytes) = - self.table.store.get(their_range.begin.as_slice())? + self.data.store.get(their_range.begin.as_slice())? { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } @@ -738,7 +745,7 @@ where } if our_range.level == 0 { if let Some(item_bytes) = - self.table.store.get(our_range.begin.as_slice())? + self.data.store.get(our_range.begin.as_slice())? { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } @@ -753,7 +760,7 @@ where if ret_ranges.len() > 0 || ret_items.len() > 0 { trace!( "({}) Checksum comparison RPC: {} different + {} items for {} received", - self.table.name, + self.data.name, ret_ranges.len(), ret_items.len(), n_checksums @@ -782,13 +789,13 @@ where } impl SyncTodo { - fn add_full_scan(&mut self, table: &Table) { - let my_id = table.system.id; + fn add_full_scan(&mut self, data: &TableData, aux: &TableAux) { + let my_id = aux.system.id; self.todo.clear(); - let ring = table.system.ring.borrow().clone(); - let split_points = table.replication.split_points(&ring); + let ring = aux.system.ring.borrow().clone(); + let split_points = aux.replication.split_points(&ring); for i in 0..split_points.len() - 1 { let begin = split_points[i]; @@ -797,12 +804,12 @@ impl SyncTodo { continue; } - let nodes = table.replication.replication_nodes(&begin, &ring); + let nodes = aux.replication.replication_nodes(&begin, &ring); let retain = nodes.contains(&my_id); if !retain { // Check if we have some data to send, otherwise skip - if table.store.range(begin..end).next().is_none() { + if data.store.range(begin..end).next().is_none() { continue; } } @@ -813,25 +820,25 @@ impl SyncTodo { fn add_ring_difference( &mut self, - table: &Table, old_ring: &Ring, new_ring: &Ring, + data: &TableData, aux: &TableAux, ) { - let my_id = table.system.id; + let my_id = aux.system.id; // If it is us who are entering or leaving the system, // initiate a full sync instead of incremental sync if old_ring.config.members.contains_key(&my_id) != new_ring.config.members.contains_key(&my_id) { - self.add_full_scan(table); + self.add_full_scan(data, aux); return; } let mut all_points = None .into_iter() - .chain(table.replication.split_points(old_ring).drain(..)) - .chain(table.replication.split_points(new_ring).drain(..)) + .chain(aux.replication.split_points(old_ring).drain(..)) + .chain(aux.replication.split_points(new_ring).drain(..)) .chain(self.todo.iter().map(|x| x.begin)) .chain(self.todo.iter().map(|x| x.end)) .collect::>(); @@ -845,11 +852,11 @@ impl SyncTodo { for i in 0..all_points.len() - 1 { let begin = all_points[i]; let end = all_points[i + 1]; - let was_ours = table + let was_ours = aux .replication .replication_nodes(&begin, &old_ring) .contains(&my_id); - let is_ours = table + let is_ours = aux .replication .replication_nodes(&begin, &new_ring) .contains(&my_id);