rpc helper: small refactorings

This commit is contained in:
Alex 2023-12-08 12:18:12 +01:00
parent 5dd200c015
commit 64a6e557a4
Signed by: lx
GPG key ID: 0E496D15096376BE

View file

@ -436,13 +436,12 @@ impl RpcHelper {
H: StreamingEndpointHandler<M> + 'static, H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static, S: Send + 'static,
{ {
let msg = msg.into_req().map_err(netapp::error::Error::from)?;
// Peers may appear in many quorum sets. Here, build a list of peers, // Peers may appear in many quorum sets. Here, build a list of peers,
// mapping to the index of the quorum sets in which they appear. // mapping to the index of the quorum sets in which they appear.
let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum); let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum);
// Send one request to each peer of the quorum sets // Send one request to each peer of the quorum sets
let msg = msg.into_req().map_err(netapp::error::Error::from)?;
let requests = result_tracker.nodes.iter().map(|(peer, _)| { let requests = result_tracker.nodes.iter().map(|(peer, _)| {
let self2 = self.clone(); let self2 = self.clone();
let msg = msg.clone(); let msg = msg.clone();
@ -523,10 +522,10 @@ impl RpcHelper {
) -> Vec<Uuid> { ) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests // Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list(); let peer_list = self.0.fullmesh.get_peer_list();
let our_zone = match layout.current().node_role(&self.0.our_node_id) { let our_zone = layout
Some(pc) => &pc.zone, .current()
None => "", .get_node_zone(&self.0.our_node_id)
}; .unwrap_or("");
// Augment requests with some information used to sort them. // Augment requests with some information used to sort them.
// The tuples are as follows: // The tuples are as follows:
@ -536,10 +535,7 @@ impl RpcHelper {
// and within a same zone we priorize nodes with the lowest latency. // and within a same zone we priorize nodes with the lowest latency.
let mut nodes = nodes let mut nodes = nodes
.map(|to| { .map(|to| {
let peer_zone = match layout.current().node_role(&to) { let peer_zone = layout.current().get_node_zone(&to).unwrap_or("");
Some(pc) => &pc.zone,
None => "",
};
let peer_avg_ping = peer_list let peer_avg_ping = peer_list
.iter() .iter()
.find(|x| x.id.as_ref() == to.as_slice()) .find(|x| x.id.as_ref() == to.as_slice())
@ -567,21 +563,28 @@ impl RpcHelper {
// ------- utility for tracking successes/errors among write sets -------- // ------- utility for tracking successes/errors among write sets --------
pub struct QuorumSetResultTracker<S, E> { pub struct QuorumSetResultTracker<S, E> {
// The set of nodes and the quorum sets they belong to /// The set of nodes and the index of the quorum sets they belong to
pub nodes: HashMap<Uuid, Vec<usize>>, pub nodes: HashMap<Uuid, Vec<usize>>,
/// The quorum value, i.e. number of success responses to await in each set
pub quorum: usize, pub quorum: usize,
// The success and error responses received /// The success responses received
pub successes: Vec<(Uuid, S)>, pub successes: Vec<(Uuid, S)>,
/// The error responses received
pub failures: Vec<(Uuid, E)>, pub failures: Vec<(Uuid, E)>,
// The counters for successes and failures in each set /// The counters for successes in each set
pub success_counters: Box<[usize]>, pub success_counters: Box<[usize]>,
/// The counters for failures in each set
pub failure_counters: Box<[usize]>, pub failure_counters: Box<[usize]>,
/// The total number of nodes in each set
pub set_lens: Box<[usize]>, pub set_lens: Box<[usize]>,
} }
impl<S, E: std::fmt::Display> QuorumSetResultTracker<S, E> { impl<S, E> QuorumSetResultTracker<S, E>
where
E: std::fmt::Display,
{
pub fn new<A>(sets: &[A], quorum: usize) -> Self pub fn new<A>(sets: &[A], quorum: usize) -> Self
where where
A: AsRef<[Uuid]>, A: AsRef<[Uuid]>,