add doc comments #53

Merged
lx merged 10 commits from trinity-1686a/garage:doc-comments into main 2021-04-08 13:01:22 +00:00
6 changed files with 100 additions and 10 deletions
Showing only changes of commit 7d3a951836 - Show all commits

View file

@ -80,7 +80,7 @@ pub struct ConfigureNodeOpt {
#[structopt(short = "c", long = "capacity")] #[structopt(short = "c", long = "capacity")]
capacity: Option<u32>, capacity: Option<u32>,
/// Optionnal node tag /// Optional node tag
#[structopt(short = "t", long = "tag")] #[structopt(short = "t", long = "tag")]
tag: Option<String>, tag: Option<String>,

View file

@ -1,7 +1,10 @@
#![deny(missing_crate_level_docs, missing_docs)]
//! Crate containing rpc related functions and types used in Garage
#[macro_use] #[macro_use]
extern crate log; extern crate log;
pub mod consul; mod consul;
pub(crate) mod tls_util; pub(crate) mod tls_util;
pub mod membership; pub mod membership;

View file

@ -1,3 +1,4 @@
/// Module containing structs related to membership management
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

//!

`//!`
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Write as FmtWrite; use std::fmt::Write as FmtWrite;
use std::io::{Read, Write}; use std::io::{Read, Write};
@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2); const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
/// RPC endpoint used for calls related to membership
pub const MEMBERSHIP_RPC_PATH: &str = "_membership"; pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum Message { pub enum Message {
/// Response to successfull advertisements
Ok, Ok,
/// Message sent to detect other nodes status
Ping(PingMessage), Ping(PingMessage),
/// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
PullStatus, PullStatus,
/// Ask other node its config. Answered with AdvertiseConfig
PullConfig, PullConfig,
/// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
AdvertiseNodesUp(Vec<AdvertisedNode>), AdvertiseNodesUp(Vec<AdvertisedNode>),
/// Advertisement of nodes config. Sent spontanously or in response to PullConfig
AdvertiseConfig(NetworkConfig), AdvertiseConfig(NetworkConfig),
} }
impl RpcMessage for Message {} impl RpcMessage for Message {}
/// A ping, containing informations about status and config
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage { pub struct PingMessage {
id: UUID, id: UUID,
@ -55,18 +65,25 @@ pub struct PingMessage {
state_info: StateInfo, state_info: StateInfo,
} }
/// A node advertisement
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode { pub struct AdvertisedNode {
/// Id of the node this advertisement relates to
pub id: UUID, pub id: UUID,
/// IP and port of the node
pub addr: SocketAddr, pub addr: SocketAddr,
/// Is the node considered up
pub is_up: bool, pub is_up: bool,
/// When was the node last seen up, in milliseconds since UNIX epoch
pub last_seen: u64, pub last_seen: u64,
pub state_info: StateInfo, pub state_info: StateInfo,
} }
/// This node's membership manager
pub struct System { pub struct System {
/// The id of this node
pub id: UUID, pub id: UUID,
persist_config: Persister<NetworkConfig>, persist_config: Persister<NetworkConfig>,
@ -79,10 +96,12 @@ pub struct System {
rpc_client: Arc<RpcClient<Message>>, rpc_client: Arc<RpcClient<Message>>,
pub(crate) status: watch::Receiver<Arc<Status>>, pub(crate) status: watch::Receiver<Arc<Status>>,
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

not "viewed by this node", all nodes should have equal rings (if not it's a bug)

not "viewed by this node", all nodes should have equal rings (if not it's a bug)
/// The ring, viewed by this node
pub ring: watch::Receiver<Arc<Ring>>, pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<Updaters>, update_lock: Mutex<Updaters>,
/// The job runner of this node
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
} }
@ -91,21 +110,30 @@ struct Updaters {
update_ring: watch::Sender<Arc<Ring>>, update_ring: watch::Sender<Arc<Ring>>,
} }
lx marked this conversation as resolved Outdated
Outdated
Review

when is it sorted?

when is it sorted?

it's sorted in recalculate_hash. Actually the node list is short, so sorting it is pretty much free

it's sorted in recalculate_hash. Actually the node list is short, so sorting it is pretty much free
Outdated
Review

That's my reasoning as well.

That's my reasoning as well.
/// The status of each nodes, viewed by this node
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

hash of the whole set of currently known nodes. This hash is sent in regular ping messages so nodes can detect when they have different views of the cluster. They then exchange their peer lists kind of in an anti-entropy process.

hash of the whole set of currently known nodes. This hash is sent in regular ping messages so nodes can detect when they have different views of the cluster. They then exchange their peer lists kind of in an anti-entropy process.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Status { pub struct Status {
/// Mapping of each node id to its known status
// considering its sorted regularly, maybe it should be a btreeset?
pub nodes: HashMap<UUID, Arc<StatusEntry>>, pub nodes: HashMap<UUID, Arc<StatusEntry>>,
/// Hash of this entry
pub hash: Hash, pub hash: Hash,
} }
/// The status of a single node
#[derive(Debug)] #[derive(Debug)]
Outdated
Review

Not sure but this counts failed requests of any kind, and not just ping messages. It is incremented in rpc_client.rs

Not sure but this counts failed requests of any kind, and not just ping messages. It is incremented in rpc_client.rs

hum, it might double-count pings, as it count once in rpc_clients, and once in this file in ping_nodes

hum, it might double-count pings, as it count once in rpc_clients, and once in this file in ping_nodes
Outdated
Review

Thanks for noticing that, I'll have to check.

Thanks for noticing that, I'll have to check.
Outdated
Review

Ok so the cound in rpc_client does not happen when sending a ping message in ping_nodes in membership.rs because we are calling RpcAddrClient::call and not RpcClient::call (i.e. we are calling the node using its IP address and not its node identifier). The increment in rpc_client is done in RpcClient::call because it needs to know the node ID. So there is no redundancy between the two.

Ok so the cound in `rpc_client` does not happen when sending a ping message in `ping_nodes` in `membership.rs` because we are calling `RpcAddrClient::call` and not `RpcClient::call` (i.e. we are calling the node using its IP address and not its node identifier). The increment in `rpc_client` is done in `RpcClient::call` because it needs to know the node ID. So there is no redundancy between the two.
pub struct StatusEntry { pub struct StatusEntry {
/// The IP and port used to connect to this node
pub addr: SocketAddr, pub addr: SocketAddr,
/// Last time this node was seen
pub last_seen: u64, pub last_seen: u64,
/// Number of consecutive pings sent without reply to this node
pub num_failures: AtomicUsize, pub num_failures: AtomicUsize,
pub state_info: StateInfo, pub state_info: StateInfo,
} }
impl StatusEntry { impl StatusEntry {
/// is the node associated to this entry considered up
pub fn is_up(&self) -> bool { pub fn is_up(&self) -> bool {
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
} }
@ -195,6 +223,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
} }
impl System { impl System {
/// Create this node's membership manager
pub fn new( pub fn new(
metadata_dir: PathBuf, metadata_dir: PathBuf,
rpc_http_client: Arc<RpcHttpClient>, rpc_http_client: Arc<RpcHttpClient>,
@ -279,6 +308,7 @@ impl System {
}); });
} }
/// Get an RPC client
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> { pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
RpcClient::new( RpcClient::new(
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()), RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
@ -287,6 +317,7 @@ impl System {
) )
} }
/// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let ring = self.ring.borrow().clone(); let ring = self.ring.borrow().clone();
self.persist_config self.persist_config
@ -319,6 +350,7 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await; self.rpc_client.call_many(&to[..], msg, timeout).await;
} }
/// Perform bootstraping, starting the ping loop
pub async fn bootstrap( pub async fn bootstrap(
self: Arc<Self>, self: Arc<Self>,
peers: Vec<SocketAddr>, peers: Vec<SocketAddr>,
@ -348,7 +380,7 @@ impl System {
id_option, id_option,
addr, addr,
sys.rpc_client sys.rpc_client
.by_addr() .get_addr()
.call(&addr, ping_msg_ref, PING_TIMEOUT) .call(&addr, ping_msg_ref, PING_TIMEOUT)
.await, .await,
) )

View file

@ -1,3 +1,4 @@
//! Module containing types related to computing nodes which should receive a copy of data blocks
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

and metadata entries

and metadata entries
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::convert::TryInto; use std::convert::TryInto;
@ -8,23 +9,30 @@ use garage_util::data::*;
// A partition number is encoded on 16 bits, // A partition number is encoded on 16 bits,
// i.e. we have up to 2**16 partitions. // i.e. we have up to 2**16 partitions.
// (in practice we have exactly 2**PARTITION_BITS partitions) // (in practice we have exactly 2**PARTITION_BITS partitions)
/// A partition id, stored on 16 bits
pub type Partition = u16; pub type Partition = u16;
// TODO: make this constant parametrizable in the config file // TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump // For deployments with many nodes it might make sense to bump
// it up to 10. // it up to 10.
// Maximum value : 16 // Maximum value : 16
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
/// presence of numerous nodes, but exponentially bigger ring. Max 16
pub const PARTITION_BITS: usize = 8; pub const PARTITION_BITS: usize = 8;
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
// TODO: make this constant paraetrizable in the config file // TODO: make this constant paraetrizable in the config file
// (most deployments use a replication factor of 3, so...) // (most deployments use a replication factor of 3, so...)
/// The maximum number of time an object might get replicated
pub const MAX_REPLICATION: usize = 3; pub const MAX_REPLICATION: usize = 3;
/// The versionned configurations of all nodes known in the network
Outdated
Review

Not really, a node can be known in that it answers ping messages, and not be configured if it is idle or draining pending removal by admins.

Not really, a node can be known in that it answers ping messages, and not be configured if it is idle or draining pending removal by admins.

would "available" be correct instead of known?

would "available" be correct instead of known?
Outdated
Review

Just put a TODO in the comment and I'll write it later

Just put a TODO in the comment and I'll write it later
Outdated
Review

Suggestion: The user-defined configuration of the cluster's nodes

Suggestion: `The user-defined configuration of the cluster's nodes`
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig { pub struct NetworkConfig {
/// Map of each node's id to it's configuration
pub members: HashMap<UUID, NetworkConfigEntry>, pub members: HashMap<UUID, NetworkConfigEntry>,
/// Version of this config
pub version: u64, pub version: u64,
} }
@ -37,26 +45,40 @@ impl NetworkConfig {
} }
} }
/// The overall configuration of one (possibly remote) node
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfigEntry { pub struct NetworkConfigEntry {
/// Datacenter at which this entry belong. This infromation might be used to perform a better
/// geodistribution
pub datacenter: String, pub datacenter: String,
/// The (relative) capacity of the node
pub capacity: u32, pub capacity: u32,
/// A tag to recognize the entry, not used for other things than display
pub tag: String, pub tag: String,
} }
/// A ring distributing fairly objects to nodes
#[derive(Clone)] #[derive(Clone)]
pub struct Ring { pub struct Ring {
/// The network configuration used to generate this ring
pub config: NetworkConfig, pub config: NetworkConfig,
/// The list of entries in the ring
pub ring: Vec<RingEntry>, pub ring: Vec<RingEntry>,
} }
/// An entry in the ring
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RingEntry { pub struct RingEntry {
/// The prefix of the Hash of object which should use this entry
pub location: Hash, pub location: Hash,
/// The nodes in which a matching object should get stored
pub nodes: [UUID; MAX_REPLICATION], pub nodes: [UUID; MAX_REPLICATION],
} }
impl Ring { impl Ring {
// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
// levels of imbrication. It is basically impossible to test, maintain, or understand for an
// outsider.
pub(crate) fn new(config: NetworkConfig) -> Self { pub(crate) fn new(config: NetworkConfig) -> Self {
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1) // Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>(); let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
@ -166,20 +188,16 @@ impl Ring {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// eprintln!("RING: --");
// for e in ring.iter() {
// eprintln!("{:?}", e);
// }
// eprintln!("END --");
Self { config, ring } Self { config, ring }
} }
/// Get the partition in which data would fall on
pub fn partition_of(&self, from: &Hash) -> Partition { pub fn partition_of(&self, from: &Hash) -> Partition {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
top >> (16 - PARTITION_BITS) top >> (16 - PARTITION_BITS)
} }
/// Get the list of partitions and
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

and the first hash (of a partition key) that would fall in this partition

and the first hash (of a partition key) that would fall in this partition
pub fn partitions(&self) -> Vec<(Partition, Hash)> { pub fn partitions(&self) -> Vec<(Partition, Hash)> {
let mut ret = vec![]; let mut ret = vec![];
@ -193,6 +211,7 @@ impl Ring {
ret ret
} }
/// Walk the ring to find the n servers in which data should be replicated
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

TODO rename this function as it doesn't walk the ring anymore but just returns the nodes at the corresponding location. This is vocabulary from the previous implementation that used datacenter-aware ring walking and led to data imbalance.

TODO rename this function as it doesn't walk the ring anymore but just returns the nodes at the corresponding location. This is vocabulary from the previous implementation that used datacenter-aware ring walking and led to data imbalance.
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if self.ring.len() != 1 << PARTITION_BITS { if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!"); warn!("Ring not yet ready, read/writes will be lost!");
@ -201,12 +220,15 @@ impl Ring {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize; let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
// TODO why computing two time in the same way and asserting?
lx marked this conversation as resolved Outdated
Outdated
Review

just to make sure I don't mess up. Should probably be factorized in a single location.

just to make sure I don't mess up. Should probably be factorized in a single location.
assert_eq!(partition_idx, self.partition_of(from) as usize); assert_eq!(partition_idx, self.partition_of(from) as usize);
let partition = &self.ring[partition_idx]; let partition = &self.ring[partition_idx];
let partition_top = let partition_top =
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap()); u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
// TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should
// probably be a test more than a runtime assertion
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16); assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
assert!(n <= partition.nodes.len()); assert!(n <= partition.nodes.len());

View file

@ -1,3 +1,4 @@
//! Contain structs related to making RPCs
use std::borrow::Borrow; use std::borrow::Borrow;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -26,14 +27,19 @@ use crate::tls_util;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
/// Strategy to apply when making RPC
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct RequestStrategy { pub struct RequestStrategy {
/// Max time to wait for reponse
pub rs_timeout: Duration, pub rs_timeout: Duration,
/// Min number of response to consider the request successful
pub rs_quorum: usize, pub rs_quorum: usize,
/// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool, pub rs_interrupt_after_quorum: bool,
} }
impl RequestStrategy { impl RequestStrategy {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_quorum(quorum: usize) -> Self { pub fn with_quorum(quorum: usize) -> Self {
RequestStrategy { RequestStrategy {
rs_timeout: DEFAULT_TIMEOUT, rs_timeout: DEFAULT_TIMEOUT,
@ -41,19 +47,24 @@ impl RequestStrategy {
rs_interrupt_after_quorum: false, rs_interrupt_after_quorum: false,
} }
} }
/// Set timeout of the strategy
pub fn with_timeout(mut self, timeout: Duration) -> Self { pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.rs_timeout = timeout; self.rs_timeout = timeout;
self self
} }
/// Set if requests can be dropped after quorum has been reached
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

as a general rule: true for read requests, false for write requests

as a general rule: true for read requests, false for write requests
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt; self.rs_interrupt_after_quorum = interrupt;
self self
} }
} }
/// Shortcut for a boxed async function taking a message, and resolving to another message or an
/// error
pub type LocalHandlerFn<M> = pub type LocalHandlerFn<M> =
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>; Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
/// Client used to send RPC
pub struct RpcClient<M: RpcMessage> { pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>, status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
@ -64,6 +75,7 @@ pub struct RpcClient<M: RpcMessage> {
} }
impl<M: RpcMessage + 'static> RpcClient<M> { impl<M: RpcMessage + 'static> RpcClient<M> {
/// Create a new RpcClient from an address, a job runner, and the status of all RPC servers
pub fn new( pub fn new(
rac: RpcAddrClient<M>, rac: RpcAddrClient<M>,
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
@ -77,6 +89,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}) })
} }
/// Set the local handler, to process RPC to this node without network usage
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F) pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
where where
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static, F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
@ -90,14 +103,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
self.local_handler.swap(Some(Arc::new((my_id, handler)))); self.local_handler.swap(Some(Arc::new((my_id, handler))));
} }
pub fn by_addr(&self) -> &RpcAddrClient<M> { /// Get the server address this client connect to
pub fn get_addr(&self) -> &RpcAddrClient<M> {
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

Please don't call this get_addr, it's not what it does ! It does not return a SocketAddr or something like that. It is used to get the underlying RpcAddrClient which allows for RPC by specifing the target node's SocketAddr instead of their node IDs. Also, the comment is wrong.

Please don't call this `get_addr`, it's not what it does ! It does not return a SocketAddr or something like that. It is used to get the underlying RpcAddrClient which allows for RPC by specifing the target node's SocketAddr instead of their node IDs. Also, the comment is wrong.
&self.rpc_addr_client &self.rpc_addr_client
} }
/// Make a RPC call
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> { pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
self.call_arc(to, Arc::new(msg), timeout).await self.call_arc(to, Arc::new(msg), timeout).await
} }
/// Make a RPC call from a message stored in an Arc
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> { pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
if let Some(lh) = self.local_handler.load_full() { if let Some(lh) = self.local_handler.load_full() {
let (my_id, local_handler) = lh.as_ref(); let (my_id, local_handler) = lh.as_ref();
@ -135,6 +151,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
} }
} }
/// Make a RPC call to multiple servers, returning a Vec containing each result
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> { pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
let msg = Arc::new(msg); let msg = Arc::new(msg);
let mut resp_stream = to let mut resp_stream = to
@ -149,6 +166,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
results results
} }
/// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
/// strategy could not be respected due to to many errors
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

too many*

too many*
pub async fn try_call_many( pub async fn try_call_many(
self: &Arc<Self>, self: &Arc<Self>,
to: &[UUID], to: &[UUID],
@ -208,6 +227,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
} }
} }
/// Endpoint to which send RPC
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

No. This is an RPC client which contains only the necessary logic to call RPC's to nodes by specifying their SocketAddr, and not by specifying their node IDs. It is used as an underlying layer for RpcClient.

No. This is an RPC client which contains only the necessary logic to call RPC's to nodes by specifying their SocketAddr, and not by specifying their node IDs. It is used as an underlying layer for RpcClient.
pub struct RpcAddrClient<M: RpcMessage> { pub struct RpcAddrClient<M: RpcMessage> {
phantom: PhantomData<M>, phantom: PhantomData<M>,
@ -216,6 +236,7 @@ pub struct RpcAddrClient<M: RpcMessage> {
} }
impl<M: RpcMessage> RpcAddrClient<M> { impl<M: RpcMessage> RpcAddrClient<M> {
/// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self { pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
Self { Self {
phantom: PhantomData::default(), phantom: PhantomData::default(),
@ -224,6 +245,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
} }
} }
/// Make a RPC
pub async fn call<MB>( pub async fn call<MB>(
&self, &self,
to_addr: &SocketAddr, to_addr: &SocketAddr,
@ -239,6 +261,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
} }
} }
/// HTTP client used to make RPCs
pub struct RpcHttpClient { pub struct RpcHttpClient {
request_limiter: Semaphore, request_limiter: Semaphore,
method: ClientMethod, method: ClientMethod,
@ -250,6 +273,7 @@ enum ClientMethod {
} }
impl RpcHttpClient { impl RpcHttpClient {
/// Create a new RpcHttpClient
pub fn new( pub fn new(
max_concurrent_requests: usize, max_concurrent_requests: usize,
tls_config: &Option<TlsConfig>, tls_config: &Option<TlsConfig>,
@ -280,6 +304,7 @@ impl RpcHttpClient {
}) })
} }
/// Make a RPC
async fn call<M, MB>( async fn call<M, MB>(
&self, &self,
path: &str, path: &str,

View file

@ -1,3 +1,4 @@
//! Contains structs related to receiving RPCs
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
@ -22,13 +23,17 @@ use garage_util::error::Error;
use crate::tls_util; use crate::tls_util;
/// Trait for messages that can be sent as RPC
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {} pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>; type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>; type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
/// Structure handling RPCs
pub struct RpcServer { pub struct RpcServer {
/// The address the RpcServer will bind
pub bind_addr: SocketAddr, pub bind_addr: SocketAddr,
/// The tls configuration used for RPC
pub tls_config: Option<TlsConfig>, pub tls_config: Option<TlsConfig>,
handlers: HashMap<String, Handler>, handlers: HashMap<String, Handler>,
@ -87,6 +92,7 @@ where
} }
impl RpcServer { impl RpcServer {
/// Create a new RpcServer
pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self { pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
Self { Self {
bind_addr, bind_addr,
@ -95,6 +101,7 @@ impl RpcServer {
} }
} }
/// Add handler handling request made to `name`
pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F) pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
where where
M: RpcMessage + 'static, M: RpcMessage + 'static,
@ -156,6 +163,7 @@ impl RpcServer {
} }
} }
/// Run the RpcServer
pub async fn run( pub async fn run(
self: Arc<Self>, self: Arc<Self>,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,