Refactor code

This commit is contained in:
Alex 2021-03-16 11:43:58 +01:00
parent 1d9961e411
commit 515029d026
6 changed files with 175 additions and 167 deletions

View file

@ -463,24 +463,28 @@ impl AdminRpcHandler {
Ok(ret) Ok(ret)
} }
fn gather_table_stats<F: TableSchema, R: TableReplication>( fn gather_table_stats<F, R>(
&self, &self,
to: &mut String, to: &mut String,
t: &Arc<Table<F, R>>, t: &Arc<Table<F, R>>,
_opt: &StatsOpt, _opt: &StatsOpt,
) -> Result<(), Error> { ) -> Result<(), Error>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
writeln!(to, "\nTable stats for {}", t.data.name).unwrap(); writeln!(to, "\nTable stats for {}", t.data.name).unwrap();
writeln!(to, " number of items: {}", t.data.store.len()).unwrap(); writeln!(to, " number of items: {}", t.data.store.len()).unwrap();
writeln!( writeln!(
to, to,
" Merkle updater todo queue length: {}", " Merkle updater todo queue length: {}",
t.data.merkle_updater.todo_len() t.merkle_updater.todo_len()
) )
.unwrap(); .unwrap();
writeln!( writeln!(
to, to,
" Merkle tree size: {}", " Merkle tree size: {}",
t.data.merkle_updater.merkle_tree_len() t.merkle_updater.merkle_tree_len()
) )
.unwrap(); .unwrap();
writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();

View file

@ -4,62 +4,59 @@ use std::sync::Arc;
use log::warn; use log::warn;
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
use sled::Transactional; use sled::Transactional;
use tokio::sync::Notify;
use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use crate::crdt::CRDT; use crate::crdt::CRDT;
use crate::merkle::*; use crate::replication::*;
use crate::schema::*; use crate::schema::*;
pub struct TableData<F: TableSchema> { pub struct TableData<F: TableSchema, R: TableReplication> {
pub name: String, pub name: String,
pub instance: F,
pub(crate) instance: F,
pub(crate) replication: R,
pub store: sled::Tree, pub store: sled::Tree,
pub(crate) merkle_tree: sled::Tree,
pub(crate) merkle_todo: sled::Tree,
pub(crate) merkle_todo_notify: Notify,
pub(crate) gc_todo: sled::Tree, pub(crate) gc_todo: sled::Tree,
pub merkle_updater: Arc<MerkleUpdater>,
} }
impl<F> TableData<F> impl<F, R> TableData<F, R>
where where
F: TableSchema, F: TableSchema,
R: TableReplication,
{ {
pub fn new( pub fn new(name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> {
name: String,
instance: F,
db: &sled::Db,
background: Arc<BackgroundRunner>,
) -> Arc<Self> {
let store = db let store = db
.open_tree(&format!("{}:table", name)) .open_tree(&format!("{}:table", name))
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
let merkle_todo_store = db let merkle_tree = db
.open_tree(&format!("{}:merkle_todo", name))
.expect("Unable to open DB Merkle TODO tree");
let merkle_tree_store = db
.open_tree(&format!("{}:merkle_tree", name)) .open_tree(&format!("{}:merkle_tree", name))
.expect("Unable to open DB Merkle tree tree"); .expect("Unable to open DB Merkle tree tree");
let merkle_todo = db
.open_tree(&format!("{}:merkle_todo", name))
.expect("Unable to open DB Merkle TODO tree");
let gc_todo = db let gc_todo = db
.open_tree(&format!("{}:gc_todo", name)) .open_tree(&format!("{}:gc_todo", name))
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
let merkle_updater = MerkleUpdater::launch(
name.clone(),
background,
merkle_todo_store,
merkle_tree_store,
);
Arc::new(Self { Arc::new(Self {
name, name,
instance, instance,
replication,
store, store,
merkle_tree,
merkle_todo,
merkle_todo_notify: Notify::new(),
gc_todo, gc_todo,
merkle_updater,
}) })
} }
@ -129,8 +126,7 @@ where
let update = self.decode_entry(update_bytes)?; let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let changed = let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
(&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
let (old_entry, new_entry) = match store.get(&tree_key)? { let (old_entry, new_entry) = match store.get(&tree_key)? {
Some(prev_bytes) => { Some(prev_bytes) => {
let old_entry = self let old_entry = self
@ -159,7 +155,7 @@ where
if let Some((old_entry, new_entry, new_bytes_hash)) = changed { if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
let is_tombstone = new_entry.is_tombstone(); let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry, Some(new_entry)); self.instance.updated(old_entry, Some(new_entry));
self.merkle_updater.todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
if is_tombstone { if is_tombstone {
self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
} }
@ -169,8 +165,7 @@ where
} }
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
let removed = let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
(&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
if let Some(cur_v) = store.get(k)? { if let Some(cur_v) = store.get(k)? {
if cur_v == v { if cur_v == v {
store.remove(k)?; store.remove(k)?;
@ -184,7 +179,7 @@ where
if removed { if removed {
let old_entry = self.decode_entry(v)?; let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None); self.instance.updated(Some(old_entry), None);
self.merkle_updater.todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
} }
Ok(removed) Ok(removed)
} }
@ -194,8 +189,7 @@ where
k: &[u8], k: &[u8],
vhash: Hash, vhash: Hash,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let removed = let removed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| {
(&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
if let Some(cur_v) = store.get(k)? { if let Some(cur_v) = store.get(k)? {
if blake2sum(&cur_v[..]) == vhash { if blake2sum(&cur_v[..]) == vhash {
store.remove(k)?; store.remove(k)?;
@ -209,7 +203,7 @@ where
if let Some(old_v) = removed { if let Some(old_v) = removed {
let old_entry = self.decode_entry(&old_v[..])?; let old_entry = self.decode_entry(&old_v[..])?;
self.instance.updated(Some(old_entry), None); self.instance.updated(Some(old_entry), None);
self.merkle_updater.todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
Ok(true) Ok(true)
} else { } else {
Ok(false) Ok(false)

View file

@ -13,20 +13,20 @@ use tokio::sync::watch;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_rpc::membership::System;
use garage_rpc::rpc_client::*; use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
use crate::data::*; use crate::data::*;
use crate::replication::*; use crate::replication::*;
use crate::schema::*; use crate::schema::*;
use crate::table::*;
const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub struct TableGC<F: TableSchema, R: TableReplication> { pub struct TableGC<F: TableSchema, R: TableReplication> {
data: Arc<TableData<F>>, data: Arc<TableData<F, R>>,
aux: Arc<TableAux<R>>, system: Arc<System>,
rpc_client: Arc<RpcClient<GcRPC>>, rpc_client: Arc<RpcClient<GcRPC>>,
} }
@ -46,23 +46,23 @@ where
R: TableReplication + 'static, R: TableReplication + 'static,
{ {
pub(crate) fn launch( pub(crate) fn launch(
data: Arc<TableData<F>>, data: Arc<TableData<F, R>>,
aux: Arc<TableAux<R>>, system: Arc<System>,
rpc_server: &mut RpcServer, rpc_server: &mut RpcServer,
) -> Arc<Self> { ) -> Arc<Self> {
let rpc_path = format!("table_{}/gc", data.name); let rpc_path = format!("table_{}/gc", data.name);
let rpc_client = aux.system.rpc_client::<GcRPC>(&rpc_path); let rpc_client = system.rpc_client::<GcRPC>(&rpc_path);
let gc = Arc::new(Self { let gc = Arc::new(Self {
data: data.clone(), data: data.clone(),
aux: aux.clone(), system: system.clone(),
rpc_client, rpc_client,
}); });
gc.register_handler(rpc_server, rpc_path); gc.register_handler(rpc_server, rpc_path);
let gc1 = gc.clone(); let gc1 = gc.clone();
aux.system.background.spawn_worker( system.background.spawn_worker(
format!("GC loop for {}", data.name), format!("GC loop for {}", data.name),
move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit), move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
); );
@ -130,8 +130,8 @@ where
let mut partitions = HashMap::new(); let mut partitions = HashMap::new();
for (k, vhash, v) in entries { for (k, vhash, v) in entries {
let pkh = Hash::try_from(&k[..32]).unwrap(); let pkh = Hash::try_from(&k[..32]).unwrap();
let mut nodes = self.aux.replication.write_nodes(&pkh); let mut nodes = self.data.replication.write_nodes(&pkh);
nodes.retain(|x| *x != self.aux.system.id); nodes.retain(|x| *x != self.system.id);
nodes.sort(); nodes.sort();
if !partitions.contains_key(&nodes) { if !partitions.contains_key(&nodes) {
@ -220,7 +220,7 @@ where
let self2 = self.clone(); let self2 = self.clone();
self.rpc_client self.rpc_client
.set_local_handler(self.aux.system.id, move |msg| { .set_local_handler(self.system.id, move |msg| {
let self2 = self2.clone(); let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await } async move { self2.handle_rpc(&msg).await }
}); });

View file

@ -9,12 +9,16 @@ use serde::{Deserialize, Serialize};
use sled::transaction::{ use sled::transaction::{
ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
}; };
use tokio::sync::{watch, Notify}; use tokio::sync::watch;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use crate::data::*;
use crate::replication::*;
use crate::schema::*;
pub type MerklePartition = [u8; 2]; pub type MerklePartition = [u8; 2];
pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash { pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash {
@ -32,28 +36,30 @@ pub fn hash_of_merkle_partition_opt(p: Option<MerklePartition>) -> Hash {
// 16 bits (two bytes) of item's partition keys' hashes. // 16 bits (two bytes) of item's partition keys' hashes.
// It builds one Merkle tree for each of these 2**16 partitions. // It builds one Merkle tree for each of these 2**16 partitions.
pub struct MerkleUpdater { pub struct MerkleUpdater<F: TableSchema, R: TableReplication> {
table_name: String, data: Arc<TableData<F, R>>,
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
// Content of the todo tree: items where // Content of the todo tree: items where
// - key = the key of an item in the main table, ie hash(partition_key)+sort_key // - key = the key of an item in the main table, ie hash(partition_key)+sort_key
// - value = the hash of the full serialized item, if present, // - value = the hash of the full serialized item, if present,
// or an empty vec if item is absent (deleted) // or an empty vec if item is absent (deleted)
pub(crate) todo: sled::Tree, // Fields in data:
pub(crate) todo_notify: Notify, // pub(crate) merkle_todo: sled::Tree,
// pub(crate) merkle_todo_notify: Notify,
// Content of the merkle tree: items where // Content of the merkle tree: items where
// - key = .bytes() for MerkleNodeKey // - key = .bytes() for MerkleNodeKey
// - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found // - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found
pub(crate) merkle_tree: sled::Tree, // Field in data:
// pub(crate) merkle_tree: sled::Tree,
empty_node_hash: Hash, empty_node_hash: Hash,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MerkleNodeKey { pub struct MerkleNodeKey {
// partition: first 16 bits (two bytes) of the partition_key's hash // partition: first 16 bits (two bytes) of the partition_key's hash
pub partition: MerklePartition, pub partition: [u8; 2],
// prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key) // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key)
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
@ -74,27 +80,26 @@ pub enum MerkleNode {
Leaf(Vec<u8>, Hash), Leaf(Vec<u8>, Hash),
} }
impl MerkleUpdater { impl<F, R> MerkleUpdater<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
pub(crate) fn launch( pub(crate) fn launch(
table_name: String, data: Arc<TableData<F, R>>,
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
todo: sled::Tree,
merkle_tree: sled::Tree,
) -> Arc<Self> { ) -> Arc<Self> {
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
let ret = Arc::new(Self { let ret = Arc::new(Self {
table_name, data,
background, background,
todo,
todo_notify: Notify::new(),
merkle_tree,
empty_node_hash, empty_node_hash,
}); });
let ret2 = ret.clone(); let ret2 = ret.clone();
ret.background.spawn_worker( ret.background.spawn_worker(
format!("Merkle tree updater for {}", ret.table_name), format!("Merkle tree updater for {}", ret.data.name),
|must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit), |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
); );
@ -103,27 +108,27 @@ impl MerkleUpdater {
async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() { while !*must_exit.borrow() {
if let Some(x) = self.todo.iter().next() { if let Some(x) = self.data.merkle_todo.iter().next() {
match x { match x {
Ok((key, valhash)) => { Ok((key, valhash)) => {
if let Err(e) = self.update_item(&key[..], &valhash[..]) { if let Err(e) = self.update_item(&key[..], &valhash[..]) {
warn!( warn!(
"({}) Error while updating Merkle tree item: {}", "({}) Error while updating Merkle tree item: {}",
self.table_name, e self.data.name, e
); );
} }
} }
Err(e) => { Err(e) => {
warn!( warn!(
"({}) Error while iterating on Merkle todo tree: {}", "({}) Error while iterating on Merkle todo tree: {}",
self.table_name, e self.data.name, e
); );
tokio::time::sleep(Duration::from_secs(10)).await; tokio::time::sleep(Duration::from_secs(10)).await;
} }
} }
} else { } else {
select! { select! {
_ = self.todo_notify.notified().fuse() => (), _ = self.data.merkle_todo_notify.notified().fuse() => (),
_ = must_exit.changed().fuse() => (), _ = must_exit.changed().fuse() => (),
} }
} }
@ -143,18 +148,20 @@ impl MerkleUpdater {
partition: k[0..2].try_into().unwrap(), partition: k[0..2].try_into().unwrap(),
prefix: vec![], prefix: vec![],
}; };
self.merkle_tree self.data
.merkle_tree
.transaction(|tx| self.update_item_rec(tx, k, khash, &key, new_vhash))?; .transaction(|tx| self.update_item_rec(tx, k, khash, &key, new_vhash))?;
let deleted = self let deleted = self
.todo .data
.merkle_todo
.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)? .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)?
.is_ok(); .is_ok();
if !deleted { if !deleted {
debug!( debug!(
"({}) Item not deleted from Merkle todo because it changed: {:?}", "({}) Item not deleted from Merkle todo because it changed: {:?}",
self.table_name, k self.data.name, k
); );
} }
Ok(()) Ok(())
@ -197,7 +204,7 @@ impl MerkleUpdater {
// should not happen // should not happen
warn!( warn!(
"({}) Replacing intermediate node with empty node, should not happen.", "({}) Replacing intermediate node with empty node, should not happen.",
self.table_name self.data.name
); );
Some(MerkleNode::Empty) Some(MerkleNode::Empty)
} else if children.len() == 1 { } else if children.len() == 1 {
@ -301,7 +308,7 @@ impl MerkleUpdater {
// Access a node in the Merkle tree, used by the sync protocol // Access a node in the Merkle tree, used by the sync protocol
pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> { pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> {
let ent = self.merkle_tree.get(k.encode())?; let ent = self.data.merkle_tree.get(k.encode())?;
match ent { match ent {
None => Ok(MerkleNode::Empty), None => Ok(MerkleNode::Empty),
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
@ -309,11 +316,11 @@ impl MerkleUpdater {
} }
pub fn merkle_tree_len(&self) -> usize { pub fn merkle_tree_len(&self) -> usize {
self.merkle_tree.len() self.data.merkle_tree.len()
} }
pub fn todo_len(&self) -> usize { pub fn todo_len(&self) -> usize {
self.todo.len() self.data.merkle_todo.len()
} }
} }

View file

@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_rpc::membership::System;
use garage_rpc::ring::Ring; use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*; use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
@ -29,8 +30,9 @@ const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
pub struct TableSyncer<F: TableSchema, R: TableReplication> { pub struct TableSyncer<F: TableSchema, R: TableReplication> {
data: Arc<TableData<F>>, system: Arc<System>,
aux: Arc<TableAux<R>>, data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
todo: Mutex<SyncTodo>, todo: Mutex<SyncTodo>,
rpc_client: Arc<RpcClient<SyncRPC>>, rpc_client: Arc<RpcClient<SyncRPC>>,
@ -76,18 +78,20 @@ where
R: TableReplication + 'static, R: TableReplication + 'static,
{ {
pub(crate) fn launch( pub(crate) fn launch(
data: Arc<TableData<F>>, system: Arc<System>,
aux: Arc<TableAux<R>>, data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
rpc_server: &mut RpcServer, rpc_server: &mut RpcServer,
) -> Arc<Self> { ) -> Arc<Self> {
let rpc_path = format!("table_{}/sync", data.name); let rpc_path = format!("table_{}/sync", data.name);
let rpc_client = aux.system.rpc_client::<SyncRPC>(&rpc_path); let rpc_client = system.rpc_client::<SyncRPC>(&rpc_path);
let todo = SyncTodo { todo: vec![] }; let todo = SyncTodo { todo: vec![] };
let syncer = Arc::new(Self { let syncer = Arc::new(Self {
system: system.clone(),
data: data.clone(), data: data.clone(),
aux: aux.clone(), merkle,
todo: Mutex::new(todo), todo: Mutex::new(todo),
rpc_client, rpc_client,
}); });
@ -97,13 +101,13 @@ where
let (busy_tx, busy_rx) = mpsc::unbounded_channel(); let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone(); let s1 = syncer.clone();
aux.system.background.spawn_worker( system.background.spawn_worker(
format!("table sync watcher for {}", data.name), format!("table sync watcher for {}", data.name),
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
); );
let s2 = syncer.clone(); let s2 = syncer.clone();
aux.system.background.spawn_worker( system.background.spawn_worker(
format!("table syncer for {}", data.name), format!("table syncer for {}", data.name),
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
); );
@ -126,7 +130,7 @@ where
let self2 = self.clone(); let self2 = self.clone();
self.rpc_client self.rpc_client
.set_local_handler(self.aux.system.id, move |msg| { .set_local_handler(self.system.id, move |msg| {
let self2 = self2.clone(); let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await } async move { self2.handle_rpc(&msg).await }
}); });
@ -137,8 +141,8 @@ where
mut must_exit: watch::Receiver<bool>, mut must_exit: watch::Receiver<bool>,
mut busy_rx: mpsc::UnboundedReceiver<bool>, mut busy_rx: mpsc::UnboundedReceiver<bool>,
) { ) {
let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone(); let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone(); let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone();
let mut nothing_to_do_since = Some(Instant::now()); let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() { while !*must_exit.borrow() {
@ -178,7 +182,7 @@ where
self.todo self.todo
.lock() .lock()
.unwrap() .unwrap()
.add_full_sync(&self.data, &self.aux); .add_full_sync(&self.data, &self.system);
} }
async fn syncer_task( async fn syncer_task(
@ -213,10 +217,10 @@ where
must_exit: &mut watch::Receiver<bool>, must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> { ) -> Result<(), Error> {
if partition.retain { if partition.retain {
let my_id = self.aux.system.id; let my_id = self.system.id;
let nodes = self let nodes = self
.aux .data
.replication .replication
.write_nodes(&hash_of_merkle_partition(partition.range.begin)) .write_nodes(&hash_of_merkle_partition(partition.range.begin))
.into_iter() .into_iter()
@ -242,7 +246,7 @@ where
warn!("({}) Sync error: {}", self.data.name, e); warn!("({}) Sync error: {}", self.data.name, e);
} }
} }
if n_errors > self.aux.replication.max_write_errors() { if n_errors > self.data.replication.max_write_errors() {
return Err(Error::Message(format!( return Err(Error::Message(format!(
"Sync failed with too many nodes (should have been: {:?}).", "Sync failed with too many nodes (should have been: {:?}).",
nodes nodes
@ -288,19 +292,19 @@ where
if items.len() > 0 { if items.len() > 0 {
let nodes = self let nodes = self
.aux .data
.replication .replication
.write_nodes(&begin) .write_nodes(&begin)
.into_iter() .into_iter()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if nodes.contains(&self.aux.system.id) { if nodes.contains(&self.system.id) {
warn!( warn!(
"({}) Interrupting offload as partitions seem to have changed", "({}) Interrupting offload as partitions seem to have changed",
self.data.name self.data.name
); );
break; break;
} }
if nodes.len() < self.aux.replication.write_quorum() { if nodes.len() < self.data.replication.write_quorum() {
return Err(Error::Message(format!( return Err(Error::Message(format!(
"Not offloading as we don't have a quorum of nodes to write to." "Not offloading as we don't have a quorum of nodes to write to."
))); )));
@ -376,7 +380,7 @@ where
partition: u16::to_be_bytes(i), partition: u16::to_be_bytes(i),
prefix: vec![], prefix: vec![],
}; };
match self.data.merkle_updater.read_node(&key)? { match self.merkle.read_node(&key)? {
MerkleNode::Empty => (), MerkleNode::Empty => (),
x => { x => {
ret.push((key.partition, hash_of(&x)?)); ret.push((key.partition, hash_of(&x)?));
@ -458,7 +462,7 @@ where
while !todo.is_empty() && !*must_exit.borrow() { while !todo.is_empty() && !*must_exit.borrow() {
let key = todo.pop_front().unwrap(); let key = todo.pop_front().unwrap();
let node = self.data.merkle_updater.read_node(&key)?; let node = self.merkle.read_node(&key)?;
match node { match node {
MerkleNode::Empty => { MerkleNode::Empty => {
@ -570,7 +574,7 @@ where
} }
} }
SyncRPC::GetNode(k) => { SyncRPC::GetNode(k) => {
let node = self.data.merkle_updater.read_node(&k)?; let node = self.merkle.read_node(&k)?;
Ok(SyncRPC::Node(k.clone(), node)) Ok(SyncRPC::Node(k.clone(), node))
} }
SyncRPC::Items(items) => { SyncRPC::Items(items) => {
@ -585,15 +589,15 @@ where
impl SyncTodo { impl SyncTodo {
fn add_full_sync<F: TableSchema, R: TableReplication>( fn add_full_sync<F: TableSchema, R: TableReplication>(
&mut self, &mut self,
data: &TableData<F>, data: &TableData<F, R>,
aux: &TableAux<R>, system: &System,
) { ) {
let my_id = aux.system.id; let my_id = system.id;
self.todo.clear(); self.todo.clear();
let ring = aux.system.ring.borrow().clone(); let ring = system.ring.borrow().clone();
let split_points = aux.replication.split_points(&ring); let split_points = data.replication.split_points(&ring);
for i in 0..split_points.len() { for i in 0..split_points.len() {
let begin: MerklePartition = { let begin: MerklePartition = {
@ -613,7 +617,7 @@ impl SyncTodo {
let begin_hash = hash_of_merkle_partition(begin); let begin_hash = hash_of_merkle_partition(begin);
let end_hash = hash_of_merkle_partition_opt(end); let end_hash = hash_of_merkle_partition_opt(end);
let nodes = aux.replication.write_nodes(&begin_hash); let nodes = data.replication.write_nodes(&begin_hash);
let retain = nodes.contains(&my_id); let retain = nodes.contains(&my_id);
if !retain { if !retain {

View file

@ -16,20 +16,17 @@ use garage_rpc::rpc_server::*;
use crate::crdt::CRDT; use crate::crdt::CRDT;
use crate::data::*; use crate::data::*;
use crate::gc::*; use crate::gc::*;
use crate::merkle::*;
use crate::replication::*; use crate::replication::*;
use crate::schema::*; use crate::schema::*;
use crate::sync::*; use crate::sync::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
pub struct TableAux<R: TableReplication> {
pub system: Arc<System>,
pub replication: R,
}
pub struct Table<F: TableSchema, R: TableReplication> { pub struct Table<F: TableSchema, R: TableReplication> {
pub data: Arc<TableData<F>>, pub system: Arc<System>,
pub aux: Arc<TableAux<R>>, pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>, pub syncer: Arc<TableSyncer<F, R>>,
rpc_client: Arc<RpcClient<TableRPC<F>>>, rpc_client: Arc<RpcClient<TableRPC<F>>>,
} }
@ -67,19 +64,22 @@ where
let rpc_path = format!("table_{}", name); let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
let data = TableData::new(name, instance, db, system.background.clone()); let data = TableData::new(name, instance, replication, db);
let aux = Arc::new(TableAux { let merkle_updater = MerkleUpdater::launch(data.clone(), system.background.clone());
system,
replication,
});
let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server); let syncer = TableSyncer::launch(
TableGC::launch(data.clone(), aux.clone(), rpc_server); system.clone(),
data.clone(),
merkle_updater.clone(),
rpc_server,
);
TableGC::launch(data.clone(), system.clone(), rpc_server);
let table = Arc::new(Self { let table = Arc::new(Self {
system,
data, data,
aux, merkle_updater,
syncer, syncer,
rpc_client, rpc_client,
}); });
@ -91,7 +91,7 @@ where
pub async fn insert(&self, e: &F::E) -> Result<(), Error> { pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash(); let hash = e.partition_key().hash();
let who = self.aux.replication.write_nodes(&hash); let who = self.data.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who); //eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
@ -101,7 +101,7 @@ where
.try_call_many( .try_call_many(
&who[..], &who[..],
rpc, rpc,
RequestStrategy::with_quorum(self.aux.replication.write_quorum()) RequestStrategy::with_quorum(self.data.replication.write_quorum())
.with_timeout(TABLE_RPC_TIMEOUT), .with_timeout(TABLE_RPC_TIMEOUT),
) )
.await?; .await?;
@ -113,7 +113,7 @@ where
for entry in entries.iter() { for entry in entries.iter() {
let hash = entry.partition_key().hash(); let hash = entry.partition_key().hash();
let who = self.aux.replication.write_nodes(&hash); let who = self.data.replication.write_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who { for node in who {
if !call_list.contains_key(&node) { if !call_list.contains_key(&node) {
@ -137,7 +137,7 @@ where
errors.push(e); errors.push(e);
} }
} }
if errors.len() > self.aux.replication.max_write_errors() { if errors.len() > self.data.replication.max_write_errors() {
Err(Error::Message("Too many errors".into())) Err(Error::Message("Too many errors".into()))
} else { } else {
Ok(()) Ok(())
@ -150,7 +150,7 @@ where
sort_key: &F::S, sort_key: &F::S,
) -> Result<Option<F::E>, Error> { ) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash(); let hash = partition_key.hash();
let who = self.aux.replication.read_nodes(&hash); let who = self.data.replication.read_nodes(&hash);
//eprintln!("get who: {:?}", who); //eprintln!("get who: {:?}", who);
let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
@ -159,7 +159,7 @@ where
.try_call_many( .try_call_many(
&who[..], &who[..],
rpc, rpc,
RequestStrategy::with_quorum(self.aux.replication.read_quorum()) RequestStrategy::with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT) .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true), .interrupt_after_quorum(true),
) )
@ -190,8 +190,7 @@ where
if not_all_same { if not_all_same {
let self2 = self.clone(); let self2 = self.clone();
let ent2 = ret_entry.clone(); let ent2 = ret_entry.clone();
self.aux self.system
.system
.background .background
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
} }
@ -207,7 +206,7 @@ where
limit: usize, limit: usize,
) -> Result<Vec<F::E>, Error> { ) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash(); let hash = partition_key.hash();
let who = self.aux.replication.read_nodes(&hash); let who = self.data.replication.read_nodes(&hash);
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
@ -216,7 +215,7 @@ where
.try_call_many( .try_call_many(
&who[..], &who[..],
rpc, rpc,
RequestStrategy::with_quorum(self.aux.replication.read_quorum()) RequestStrategy::with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT) .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true), .interrupt_after_quorum(true),
) )
@ -248,7 +247,7 @@ where
} }
if !to_repair.is_empty() { if !to_repair.is_empty() {
let self2 = self.clone(); let self2 = self.clone();
self.aux.system.background.spawn_cancellable(async move { self.system.background.spawn_cancellable(async move {
for (_, v) in to_repair.iter_mut() { for (_, v) in to_repair.iter_mut() {
self2.repair_on_read(&who[..], v.take().unwrap()).await?; self2.repair_on_read(&who[..], v.take().unwrap()).await?;
} }
@ -288,7 +287,7 @@ where
let self2 = self.clone(); let self2 = self.clone();
self.rpc_client self.rpc_client
.set_local_handler(self.aux.system.id, move |msg| { .set_local_handler(self.system.id, move |msg| {
let self2 = self2.clone(); let self2 = self2.clone();
async move { self2.handle(&msg).await } async move { self2.handle(&msg).await }
}); });