Make syncer have its own rpc client/server

This commit is contained in:
Alex 2021-03-12 15:05:26 +01:00
parent 1fea257291
commit 8860aa19b8
3 changed files with 73 additions and 30 deletions

View file

@ -109,11 +109,17 @@ impl MerkleUpdater {
match x { match x {
Ok((key, valhash)) => { Ok((key, valhash)) => {
if let Err(e) = self.update_item(&key[..], &valhash[..]) { if let Err(e) = self.update_item(&key[..], &valhash[..]) {
warn!("({}) Error while updating Merkle tree item: {}", self.table_name, e); warn!(
"({}) Error while updating Merkle tree item: {}",
self.table_name, e
);
} }
} }
Err(e) => { Err(e) => {
warn!("({}) Error while iterating on Merkle todo tree: {}", self.table_name, e); warn!(
"({}) Error while iterating on Merkle todo tree: {}",
self.table_name, e
);
tokio::time::delay_for(Duration::from_secs(10)).await; tokio::time::delay_for(Duration::from_secs(10)).await;
} }
} }
@ -154,8 +160,7 @@ impl MerkleUpdater {
if !deleted { if !deleted {
debug!( debug!(
"({}) Item not deleted from Merkle todo because it changed: {:?}", "({}) Item not deleted from Merkle todo because it changed: {:?}",
self.table_name, self.table_name, k
k
); );
} }
Ok(()) Ok(())
@ -196,7 +201,10 @@ impl MerkleUpdater {
if children.len() == 0 { if children.len() == 0 {
// should not happen // should not happen
warn!("({}) Replacing intermediate node with empty node, should not happen.", self.table_name); warn!(
"({}) Replacing intermediate node with empty node, should not happen.",
self.table_name
);
Some(MerkleNode::Empty) Some(MerkleNode::Empty)
} else if children.len() == 1 { } else if children.len() == 1 {
// We now have a single node (case when the update deleted one of only two // We now have a single node (case when the update deleted one of only two

View file

@ -12,10 +12,13 @@ use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use garage_rpc::ring::Ring;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use crate::data::*; use crate::data::*;
use crate::merkle::*; use crate::merkle::*;
use crate::replication::*; use crate::replication::*;
@ -31,6 +34,7 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> {
aux: Arc<TableAux<F, R>>, aux: Arc<TableAux<F, R>>,
todo: Mutex<SyncTodo>, todo: Mutex<SyncTodo>,
rpc_client: Arc<RpcClient<SyncRPC>>,
} }
type RootCk = Vec<(MerklePartition, Hash)>; type RootCk = Vec<(MerklePartition, Hash)>;
@ -49,8 +53,12 @@ pub(crate) enum SyncRPC {
CkNoDifference, CkNoDifference,
GetNode(MerkleNodeKey), GetNode(MerkleNodeKey),
Node(MerkleNodeKey, MerkleNode), Node(MerkleNodeKey, MerkleNode),
Items(Vec<Arc<ByteBuf>>),
Ok,
} }
impl RpcMessage for SyncRPC {}
struct SyncTodo { struct SyncTodo {
todo: Vec<TodoPartition>, todo: Vec<TodoPartition>,
} }
@ -68,15 +76,25 @@ where
F: TableSchema + 'static, F: TableSchema + 'static,
R: TableReplication + 'static, R: TableReplication + 'static,
{ {
pub(crate) fn launch(data: Arc<TableData<F>>, aux: Arc<TableAux<F, R>>) -> Arc<Self> { pub(crate) fn launch(
data: Arc<TableData<F>>,
aux: Arc<TableAux<F, R>>,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rpc_path = format!("table_{}/sync", data.name);
let rpc_client = aux.system.rpc_client::<SyncRPC>(&rpc_path);
let todo = SyncTodo { todo: vec![] }; let todo = SyncTodo { todo: vec![] };
let syncer = Arc::new(Self { let syncer = Arc::new(Self {
data: data.clone(), data: data.clone(),
aux: aux.clone(), aux: aux.clone(),
todo: Mutex::new(todo), todo: Mutex::new(todo),
rpc_client,
}); });
syncer.register_handler(rpc_server, rpc_path);
let (busy_tx, busy_rx) = mpsc::unbounded_channel(); let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone(); let s1 = syncer.clone();
@ -100,6 +118,21 @@ where
syncer syncer
} }
fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
let self2 = self.clone();
rpc_server.add_handler::<SyncRPC, _, _>(path, move |msg, _addr| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
let self2 = self.clone();
self.rpc_client
.set_local_handler(self.aux.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
}
async fn watcher_task( async fn watcher_task(
self: Arc<Self>, self: Arc<Self>,
mut must_exit: watch::Receiver<bool>, mut must_exit: watch::Receiver<bool>,
@ -278,11 +311,16 @@ where
.into_iter() .into_iter()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if nodes.contains(&self.aux.system.id) { if nodes.contains(&self.aux.system.id) {
warn!("({}) Interrupting offload as partitions seem to have changed", self.data.name); warn!(
"({}) Interrupting offload as partitions seem to have changed",
self.data.name
);
break; break;
} }
if nodes.len() < self.aux.replication.write_quorum(&self.aux.system) { if nodes.len() < self.aux.replication.write_quorum(&self.aux.system) {
return Err(Error::Message(format!("Not offloading as we don't have a quorum of nodes to write to."))); return Err(Error::Message(format!(
"Not offloading as we don't have a quorum of nodes to write to."
)));
} }
counter += 1; counter += 1;
@ -309,11 +347,10 @@ where
nodes: &[UUID], nodes: &[UUID],
) -> Result<(), Error> { ) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
let update_msg = Arc::new(TableRPC::<F>::Update(values)); let update_msg = Arc::new(SyncRPC::Items(values));
for res in join_all(nodes.iter().map(|to| { for res in join_all(nodes.iter().map(|to| {
self.aux self.rpc_client
.rpc_client
.call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
})) }))
.await .await
@ -380,31 +417,30 @@ where
"({}) Sync {:?} with {:?}: partition is empty.", "({}) Sync {:?} with {:?}: partition is empty.",
self.data.name, partition, who self.data.name, partition, who
); );
return Ok(()) return Ok(());
} }
let root_ck_hash = hash_of(&root_ck)?; let root_ck_hash = hash_of(&root_ck)?;
// If their root checksum has level > than us, use that as a reference // If their root checksum has level > than us, use that as a reference
let root_resp = self let root_resp = self
.aux
.rpc_client .rpc_client
.call( .call(
who, who,
TableRPC::<F>::SyncRPC(SyncRPC::RootCkHash(partition.range, root_ck_hash)), SyncRPC::RootCkHash(partition.range, root_ck_hash),
TABLE_SYNC_RPC_TIMEOUT, TABLE_SYNC_RPC_TIMEOUT,
) )
.await?; .await?;
let mut todo = match root_resp { let mut todo = match root_resp {
TableRPC::<F>::SyncRPC(SyncRPC::CkNoDifference) => { SyncRPC::CkNoDifference => {
debug!( debug!(
"({}) Sync {:?} with {:?}: no difference", "({}) Sync {:?} with {:?}: no difference",
self.data.name, partition, who self.data.name, partition, who
); );
return Ok(()); return Ok(());
} }
TableRPC::<F>::SyncRPC(SyncRPC::RootCkList(_, their_root_ck)) => { SyncRPC::RootCkList(_, their_root_ck) => {
let join = join_ordered(&root_ck[..], &their_root_ck[..]); let join = join_ordered(&root_ck[..], &their_root_ck[..]);
let mut todo = VecDeque::new(); let mut todo = VecDeque::new();
for (p, v1, v2) in join.iter() { for (p, v1, v2) in join.iter() {
@ -464,16 +500,11 @@ where
// Get Merkle node for this tree position at remote node // Get Merkle node for this tree position at remote node
// and compare it with local node // and compare it with local node
let remote_node = match self let remote_node = match self
.aux
.rpc_client .rpc_client
.call( .call(who, SyncRPC::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT)
who,
TableRPC::<F>::SyncRPC(SyncRPC::GetNode(key.clone())),
TABLE_SYNC_RPC_TIMEOUT,
)
.await? .await?
{ {
TableRPC::<F>::SyncRPC(SyncRPC::Node(_, node)) => node, SyncRPC::Node(_, node) => node,
x => { x => {
return Err(Error::Message(format!( return Err(Error::Message(format!(
"Invalid respone to GetNode RPC: {}", "Invalid respone to GetNode RPC: {}",
@ -525,16 +556,16 @@ where
who who
); );
let values = item_value_list.into_iter() let values = item_value_list
.into_iter()
.map(|x| Arc::new(ByteBuf::from(x))) .map(|x| Arc::new(ByteBuf::from(x)))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let rpc_resp = self let rpc_resp = self
.aux
.rpc_client .rpc_client
.call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT) .call(who, SyncRPC::Items(values), TABLE_SYNC_RPC_TIMEOUT)
.await?; .await?;
if let TableRPC::<F>::Ok = rpc_resp { if let SyncRPC::Ok = rpc_resp {
Ok(()) Ok(())
} else { } else {
Err(Error::Message(format!( Err(Error::Message(format!(
@ -561,6 +592,10 @@ where
let node = self.data.merkle_updater.read_node(&k)?; let node = self.data.merkle_updater.read_node(&k)?;
Ok(SyncRPC::Node(k.clone(), node)) Ok(SyncRPC::Node(k.clone(), node))
} }
SyncRPC::Items(items) => {
self.data.update_many(items)?;
Ok(SyncRPC::Ok)
}
_ => Err(Error::Message(format!("Unexpected sync RPC"))), _ => Err(Error::Message(format!("Unexpected sync RPC"))),
} }
} }

View file

@ -24,7 +24,7 @@ const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
pub struct TableAux<F: TableSchema, R: TableReplication> { pub struct TableAux<F: TableSchema, R: TableReplication> {
pub system: Arc<System>, pub system: Arc<System>,
pub replication: R, pub replication: R,
pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>, rpc_client: Arc<RpcClient<TableRPC<F>>>,
} }
pub struct Table<F: TableSchema, R: TableReplication> { pub struct Table<F: TableSchema, R: TableReplication> {
@ -76,7 +76,7 @@ where
rpc_client, rpc_client,
}); });
let syncer = TableSyncer::launch(data.clone(), aux.clone()); let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server);
let table = Arc::new(Self { data, aux, syncer }); let table = Arc::new(Self { data, aux, syncer });