From 8e0524ae15e5252aecd30dc01d6993810cf49811 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Mon, 22 Mar 2021 00:00:09 +0100 Subject: [PATCH] document rpc crate --- src/garage/cli.rs | 2 +- src/rpc/lib.rs | 5 ++++- src/rpc/membership.rs | 34 +++++++++++++++++++++++++++++++++- src/rpc/ring.rs | 34 ++++++++++++++++++++++++++++------ src/rpc/rpc_client.rs | 27 ++++++++++++++++++++++++++- src/rpc/rpc_server.rs | 8 ++++++++ 6 files changed, 100 insertions(+), 10 deletions(-) diff --git a/src/garage/cli.rs b/src/garage/cli.rs index eb8275a..55cd222 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -80,7 +80,7 @@ pub struct ConfigureNodeOpt { #[structopt(short = "c", long = "capacity")] capacity: Option, - /// Optionnal node tag + /// Optional node tag #[structopt(short = "t", long = "tag")] tag: Option, diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 00e31f5..3832607 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -1,7 +1,10 @@ +#![deny(missing_crate_level_docs, missing_docs)] +//! Crate containing rpc related functions and types used in Garage + #[macro_use] extern crate log; -pub mod consul; +mod consul; pub(crate) mod tls_util; pub mod membership; diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 9fb24ad..c465ce6 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -1,3 +1,4 @@ +/// Module containing structs related to membership management use std::collections::HashMap; use std::fmt::Write as FmtWrite; use std::io::{Read, Write}; @@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const PING_TIMEOUT: Duration = Duration::from_secs(2); const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; +/// RPC endpoint used for calls related to membership pub const MEMBERSHIP_RPC_PATH: &str = "_membership"; +/// RPC messages related to membership #[derive(Debug, Serialize, Deserialize)] pub enum Message { + /// Response to successfull advertisements Ok, + /// Message sent to detect other nodes status Ping(PingMessage), + /// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp PullStatus, + /// Ask other node its config. Answered with AdvertiseConfig PullConfig, + /// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus AdvertiseNodesUp(Vec), + /// Advertisement of nodes config. Sent spontanously or in response to PullConfig AdvertiseConfig(NetworkConfig), } impl RpcMessage for Message {} +/// A ping, containing informations about status and config #[derive(Debug, Serialize, Deserialize)] pub struct PingMessage { id: UUID, @@ -55,18 +65,25 @@ pub struct PingMessage { state_info: StateInfo, } +/// A node advertisement #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AdvertisedNode { + /// Id of the node this advertisement relates to pub id: UUID, + /// IP and port of the node pub addr: SocketAddr, + /// Is the node considered up pub is_up: bool, + /// When was the node last seen up, in milliseconds since UNIX epoch pub last_seen: u64, pub state_info: StateInfo, } +/// This node's membership manager pub struct System { + /// The id of this node pub id: UUID, persist_config: Persister, @@ -79,10 +96,12 @@ pub struct System { rpc_client: Arc>, pub(crate) status: watch::Receiver>, + /// The ring, viewed by this node pub ring: watch::Receiver>, update_lock: Mutex, + /// The job runner of this node pub background: Arc, } @@ -91,21 +110,30 @@ struct Updaters { update_ring: watch::Sender>, } +/// The status of each nodes, viewed by this node #[derive(Debug, Clone)] pub struct Status { + /// Mapping of each node id to its known status + // considering its sorted regularly, maybe it should be a btreeset? pub nodes: HashMap>, + /// Hash of this entry pub hash: Hash, } +/// The status of a single node #[derive(Debug)] pub struct StatusEntry { + /// The IP and port used to connect to this node pub addr: SocketAddr, + /// Last time this node was seen pub last_seen: u64, + /// Number of consecutive pings sent without reply to this node pub num_failures: AtomicUsize, pub state_info: StateInfo, } impl StatusEntry { + /// is the node associated to this entry considered up pub fn is_up(&self) -> bool { self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN } @@ -195,6 +223,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result { } impl System { + /// Create this node's membership manager pub fn new( metadata_dir: PathBuf, rpc_http_client: Arc, @@ -279,6 +308,7 @@ impl System { }); } + /// Get an RPC client pub fn rpc_client(self: &Arc, path: &str) -> Arc> { RpcClient::new( RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()), @@ -287,6 +317,7 @@ impl System { ) } + /// Save network configuration to disc async fn save_network_config(self: Arc) -> Result<(), Error> { let ring = self.ring.borrow().clone(); self.persist_config @@ -319,6 +350,7 @@ impl System { self.rpc_client.call_many(&to[..], msg, timeout).await; } + /// Perform bootstraping, starting the ping loop pub async fn bootstrap( self: Arc, peers: Vec, @@ -348,7 +380,7 @@ impl System { id_option, addr, sys.rpc_client - .by_addr() + .get_addr() .call(&addr, ping_msg_ref, PING_TIMEOUT) .await, ) diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 2e99752..6f341fa 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -1,3 +1,4 @@ +//! Module containing types related to computing nodes which should receive a copy of data blocks use std::collections::{HashMap, HashSet}; use std::convert::TryInto; @@ -8,23 +9,30 @@ use garage_util::data::*; // A partition number is encoded on 16 bits, // i.e. we have up to 2**16 partitions. // (in practice we have exactly 2**PARTITION_BITS partitions) +/// A partition id, stored on 16 bits pub type Partition = u16; // TODO: make this constant parametrizable in the config file // For deployments with many nodes it might make sense to bump // it up to 10. // Maximum value : 16 +/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in +/// presence of numerous nodes, but exponentially bigger ring. Max 16 pub const PARTITION_BITS: usize = 8; const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); // TODO: make this constant paraetrizable in the config file // (most deployments use a replication factor of 3, so...) +/// The maximum number of time an object might get replicated pub const MAX_REPLICATION: usize = 3; +/// The versionned configurations of all nodes known in the network #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfig { + /// Map of each node's id to it's configuration pub members: HashMap, + /// Version of this config pub version: u64, } @@ -37,26 +45,40 @@ impl NetworkConfig { } } +/// The overall configuration of one (possibly remote) node #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfigEntry { + /// Datacenter at which this entry belong. This infromation might be used to perform a better + /// geodistribution pub datacenter: String, + /// The (relative) capacity of the node pub capacity: u32, + /// A tag to recognize the entry, not used for other things than display pub tag: String, } +/// A ring distributing fairly objects to nodes #[derive(Clone)] pub struct Ring { + /// The network configuration used to generate this ring pub config: NetworkConfig, + /// The list of entries in the ring pub ring: Vec, } +/// An entry in the ring #[derive(Clone, Debug)] pub struct RingEntry { + /// The prefix of the Hash of object which should use this entry pub location: Hash, + /// The nodes in which a matching object should get stored pub nodes: [UUID; MAX_REPLICATION], } impl Ring { + // TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6 + // levels of imbrication. It is basically impossible to test, maintain, or understand for an + // outsider. pub(crate) fn new(config: NetworkConfig) -> Self { // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::>(); @@ -166,20 +188,16 @@ impl Ring { }) .collect::>(); - // eprintln!("RING: --"); - // for e in ring.iter() { - // eprintln!("{:?}", e); - // } - // eprintln!("END --"); - Self { config, ring } } + /// Get the partition in which data would fall on pub fn partition_of(&self, from: &Hash) -> Partition { let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); top >> (16 - PARTITION_BITS) } + /// Get the list of partitions and pub fn partitions(&self) -> Vec<(Partition, Hash)> { let mut ret = vec![]; @@ -193,6 +211,7 @@ impl Ring { ret } + /// Walk the ring to find the n servers in which data should be replicated pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { if self.ring.len() != 1 << PARTITION_BITS { warn!("Ring not yet ready, read/writes will be lost!"); @@ -201,12 +220,15 @@ impl Ring { let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); let partition_idx = (top >> (16 - PARTITION_BITS)) as usize; + // TODO why computing two time in the same way and asserting? assert_eq!(partition_idx, self.partition_of(from) as usize); let partition = &self.ring[partition_idx]; let partition_top = u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap()); + // TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should + // probably be a test more than a runtime assertion assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16); assert!(n <= partition.nodes.len()); diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index eb4f662..261dec7 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -1,3 +1,4 @@ +//! Contain structs related to making RPCs use std::borrow::Borrow; use std::marker::PhantomData; use std::net::SocketAddr; @@ -26,14 +27,19 @@ use crate::tls_util; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +/// Strategy to apply when making RPC #[derive(Copy, Clone)] pub struct RequestStrategy { + /// Max time to wait for reponse pub rs_timeout: Duration, + /// Min number of response to consider the request successful pub rs_quorum: usize, + /// Should requests be dropped after enough response are received pub rs_interrupt_after_quorum: bool, } impl RequestStrategy { + /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_quorum(quorum: usize) -> Self { RequestStrategy { rs_timeout: DEFAULT_TIMEOUT, @@ -41,19 +47,24 @@ impl RequestStrategy { rs_interrupt_after_quorum: false, } } + /// Set timeout of the strategy pub fn with_timeout(mut self, timeout: Duration) -> Self { self.rs_timeout = timeout; self } + /// Set if requests can be dropped after quorum has been reached pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { self.rs_interrupt_after_quorum = interrupt; self } } +/// Shortcut for a boxed async function taking a message, and resolving to another message or an +/// error pub type LocalHandlerFn = Box) -> Pin> + Send>> + Send + Sync>; +/// Client used to send RPC pub struct RpcClient { status: watch::Receiver>, background: Arc, @@ -64,6 +75,7 @@ pub struct RpcClient { } impl RpcClient { + /// Create a new RpcClient from an address, a job runner, and the status of all RPC servers pub fn new( rac: RpcAddrClient, background: Arc, @@ -77,6 +89,7 @@ impl RpcClient { }) } + /// Set the local handler, to process RPC to this node without network usage pub fn set_local_handler(&self, my_id: UUID, handler: F) where F: Fn(Arc) -> Fut + Send + Sync + 'static, @@ -90,14 +103,17 @@ impl RpcClient { self.local_handler.swap(Some(Arc::new((my_id, handler)))); } - pub fn by_addr(&self) -> &RpcAddrClient { + /// Get the server address this client connect to + pub fn get_addr(&self) -> &RpcAddrClient { &self.rpc_addr_client } + /// Make a RPC call pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result { self.call_arc(to, Arc::new(msg), timeout).await } + /// Make a RPC call from a message stored in an Arc pub async fn call_arc(&self, to: UUID, msg: Arc, timeout: Duration) -> Result { if let Some(lh) = self.local_handler.load_full() { let (my_id, local_handler) = lh.as_ref(); @@ -135,6 +151,7 @@ impl RpcClient { } } + /// Make a RPC call to multiple servers, returning a Vec containing each result pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec> { let msg = Arc::new(msg); let mut resp_stream = to @@ -149,6 +166,8 @@ impl RpcClient { results } + /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if + /// strategy could not be respected due to to many errors pub async fn try_call_many( self: &Arc, to: &[UUID], @@ -208,6 +227,7 @@ impl RpcClient { } } +/// Endpoint to which send RPC pub struct RpcAddrClient { phantom: PhantomData, @@ -216,6 +236,7 @@ pub struct RpcAddrClient { } impl RpcAddrClient { + /// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs pub fn new(http_client: Arc, path: String) -> Self { Self { phantom: PhantomData::default(), @@ -224,6 +245,7 @@ impl RpcAddrClient { } } + /// Make a RPC pub async fn call( &self, to_addr: &SocketAddr, @@ -239,6 +261,7 @@ impl RpcAddrClient { } } +/// HTTP client used to make RPCs pub struct RpcHttpClient { request_limiter: Semaphore, method: ClientMethod, @@ -250,6 +273,7 @@ enum ClientMethod { } impl RpcHttpClient { + /// Create a new RpcHttpClient pub fn new( max_concurrent_requests: usize, tls_config: &Option, @@ -280,6 +304,7 @@ impl RpcHttpClient { }) } + /// Make a RPC async fn call( &self, path: &str, diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 0d82d79..4419a6f 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -1,3 +1,4 @@ +//! Contains structs related to receiving RPCs use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; @@ -22,13 +23,17 @@ use garage_util::error::Error; use crate::tls_util; +/// Trait for messages that can be sent as RPC pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {} type ResponseFuture = Pin, Error>> + Send>>; type Handler = Box, SocketAddr) -> ResponseFuture + Send + Sync>; +/// Structure handling RPCs pub struct RpcServer { + /// The address the RpcServer will bind pub bind_addr: SocketAddr, + /// The tls configuration used for RPC pub tls_config: Option, handlers: HashMap, @@ -87,6 +92,7 @@ where } impl RpcServer { + /// Create a new RpcServer pub fn new(bind_addr: SocketAddr, tls_config: Option) -> Self { Self { bind_addr, @@ -95,6 +101,7 @@ impl RpcServer { } } + /// Add handler handling request made to `name` pub fn add_handler(&mut self, name: String, handler: F) where M: RpcMessage + 'static, @@ -156,6 +163,7 @@ impl RpcServer { } } + /// Run the RpcServer pub async fn run( self: Arc, shutdown_signal: impl Future,