forked from Deuxfleurs/garage
document rpc crate
This commit is contained in:
parent
f9bd2d8fb7
commit
8e0524ae15
6 changed files with 100 additions and 10 deletions
|
@ -80,7 +80,7 @@ pub struct ConfigureNodeOpt {
|
||||||
#[structopt(short = "c", long = "capacity")]
|
#[structopt(short = "c", long = "capacity")]
|
||||||
capacity: Option<u32>,
|
capacity: Option<u32>,
|
||||||
|
|
||||||
/// Optionnal node tag
|
/// Optional node tag
|
||||||
#[structopt(short = "t", long = "tag")]
|
#[structopt(short = "t", long = "tag")]
|
||||||
tag: Option<String>,
|
tag: Option<String>,
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
|
#![deny(missing_crate_level_docs, missing_docs)]
|
||||||
|
//! Crate containing rpc related functions and types used in Garage
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
pub mod consul;
|
mod consul;
|
||||||
pub(crate) mod tls_util;
|
pub(crate) mod tls_util;
|
||||||
|
|
||||||
pub mod membership;
|
pub mod membership;
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
/// Module containing structs related to membership management
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt::Write as FmtWrite;
|
use std::fmt::Write as FmtWrite;
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
|
@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
|
||||||
const PING_TIMEOUT: Duration = Duration::from_secs(2);
|
const PING_TIMEOUT: Duration = Duration::from_secs(2);
|
||||||
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
|
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
|
||||||
|
|
||||||
|
/// RPC endpoint used for calls related to membership
|
||||||
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
|
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
|
||||||
|
|
||||||
|
/// RPC messages related to membership
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum Message {
|
pub enum Message {
|
||||||
|
/// Response to successfull advertisements
|
||||||
Ok,
|
Ok,
|
||||||
|
/// Message sent to detect other nodes status
|
||||||
Ping(PingMessage),
|
Ping(PingMessage),
|
||||||
|
/// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
|
||||||
PullStatus,
|
PullStatus,
|
||||||
|
/// Ask other node its config. Answered with AdvertiseConfig
|
||||||
PullConfig,
|
PullConfig,
|
||||||
|
/// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
|
||||||
AdvertiseNodesUp(Vec<AdvertisedNode>),
|
AdvertiseNodesUp(Vec<AdvertisedNode>),
|
||||||
|
/// Advertisement of nodes config. Sent spontanously or in response to PullConfig
|
||||||
AdvertiseConfig(NetworkConfig),
|
AdvertiseConfig(NetworkConfig),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpcMessage for Message {}
|
impl RpcMessage for Message {}
|
||||||
|
|
||||||
|
/// A ping, containing informations about status and config
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct PingMessage {
|
pub struct PingMessage {
|
||||||
id: UUID,
|
id: UUID,
|
||||||
|
@ -55,18 +65,25 @@ pub struct PingMessage {
|
||||||
state_info: StateInfo,
|
state_info: StateInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A node advertisement
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct AdvertisedNode {
|
pub struct AdvertisedNode {
|
||||||
|
/// Id of the node this advertisement relates to
|
||||||
pub id: UUID,
|
pub id: UUID,
|
||||||
|
/// IP and port of the node
|
||||||
pub addr: SocketAddr,
|
pub addr: SocketAddr,
|
||||||
|
|
||||||
|
/// Is the node considered up
|
||||||
pub is_up: bool,
|
pub is_up: bool,
|
||||||
|
/// When was the node last seen up, in milliseconds since UNIX epoch
|
||||||
pub last_seen: u64,
|
pub last_seen: u64,
|
||||||
|
|
||||||
pub state_info: StateInfo,
|
pub state_info: StateInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This node's membership manager
|
||||||
pub struct System {
|
pub struct System {
|
||||||
|
/// The id of this node
|
||||||
pub id: UUID,
|
pub id: UUID,
|
||||||
|
|
||||||
persist_config: Persister<NetworkConfig>,
|
persist_config: Persister<NetworkConfig>,
|
||||||
|
@ -79,10 +96,12 @@ pub struct System {
|
||||||
rpc_client: Arc<RpcClient<Message>>,
|
rpc_client: Arc<RpcClient<Message>>,
|
||||||
|
|
||||||
pub(crate) status: watch::Receiver<Arc<Status>>,
|
pub(crate) status: watch::Receiver<Arc<Status>>,
|
||||||
|
/// The ring, viewed by this node
|
||||||
pub ring: watch::Receiver<Arc<Ring>>,
|
pub ring: watch::Receiver<Arc<Ring>>,
|
||||||
|
|
||||||
update_lock: Mutex<Updaters>,
|
update_lock: Mutex<Updaters>,
|
||||||
|
|
||||||
|
/// The job runner of this node
|
||||||
pub background: Arc<BackgroundRunner>,
|
pub background: Arc<BackgroundRunner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,21 +110,30 @@ struct Updaters {
|
||||||
update_ring: watch::Sender<Arc<Ring>>,
|
update_ring: watch::Sender<Arc<Ring>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The status of each nodes, viewed by this node
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Status {
|
pub struct Status {
|
||||||
|
/// Mapping of each node id to its known status
|
||||||
|
// considering its sorted regularly, maybe it should be a btreeset?
|
||||||
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
|
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
|
||||||
|
/// Hash of this entry
|
||||||
pub hash: Hash,
|
pub hash: Hash,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The status of a single node
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct StatusEntry {
|
pub struct StatusEntry {
|
||||||
|
/// The IP and port used to connect to this node
|
||||||
pub addr: SocketAddr,
|
pub addr: SocketAddr,
|
||||||
|
/// Last time this node was seen
|
||||||
pub last_seen: u64,
|
pub last_seen: u64,
|
||||||
|
/// Number of consecutive pings sent without reply to this node
|
||||||
pub num_failures: AtomicUsize,
|
pub num_failures: AtomicUsize,
|
||||||
pub state_info: StateInfo,
|
pub state_info: StateInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StatusEntry {
|
impl StatusEntry {
|
||||||
|
/// is the node associated to this entry considered up
|
||||||
pub fn is_up(&self) -> bool {
|
pub fn is_up(&self) -> bool {
|
||||||
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
|
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
|
||||||
}
|
}
|
||||||
|
@ -195,6 +223,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl System {
|
impl System {
|
||||||
|
/// Create this node's membership manager
|
||||||
pub fn new(
|
pub fn new(
|
||||||
metadata_dir: PathBuf,
|
metadata_dir: PathBuf,
|
||||||
rpc_http_client: Arc<RpcHttpClient>,
|
rpc_http_client: Arc<RpcHttpClient>,
|
||||||
|
@ -279,6 +308,7 @@ impl System {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get an RPC client
|
||||||
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
|
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
|
||||||
RpcClient::new(
|
RpcClient::new(
|
||||||
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
|
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
|
||||||
|
@ -287,6 +317,7 @@ impl System {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Save network configuration to disc
|
||||||
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
|
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
|
||||||
let ring = self.ring.borrow().clone();
|
let ring = self.ring.borrow().clone();
|
||||||
self.persist_config
|
self.persist_config
|
||||||
|
@ -319,6 +350,7 @@ impl System {
|
||||||
self.rpc_client.call_many(&to[..], msg, timeout).await;
|
self.rpc_client.call_many(&to[..], msg, timeout).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Perform bootstraping, starting the ping loop
|
||||||
pub async fn bootstrap(
|
pub async fn bootstrap(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
peers: Vec<SocketAddr>,
|
peers: Vec<SocketAddr>,
|
||||||
|
@ -348,7 +380,7 @@ impl System {
|
||||||
id_option,
|
id_option,
|
||||||
addr,
|
addr,
|
||||||
sys.rpc_client
|
sys.rpc_client
|
||||||
.by_addr()
|
.get_addr()
|
||||||
.call(&addr, ping_msg_ref, PING_TIMEOUT)
|
.call(&addr, ping_msg_ref, PING_TIMEOUT)
|
||||||
.await,
|
.await,
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//! Module containing types related to computing nodes which should receive a copy of data blocks
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
|
|
||||||
|
@ -8,23 +9,30 @@ use garage_util::data::*;
|
||||||
// A partition number is encoded on 16 bits,
|
// A partition number is encoded on 16 bits,
|
||||||
// i.e. we have up to 2**16 partitions.
|
// i.e. we have up to 2**16 partitions.
|
||||||
// (in practice we have exactly 2**PARTITION_BITS partitions)
|
// (in practice we have exactly 2**PARTITION_BITS partitions)
|
||||||
|
/// A partition id, stored on 16 bits
|
||||||
pub type Partition = u16;
|
pub type Partition = u16;
|
||||||
|
|
||||||
// TODO: make this constant parametrizable in the config file
|
// TODO: make this constant parametrizable in the config file
|
||||||
// For deployments with many nodes it might make sense to bump
|
// For deployments with many nodes it might make sense to bump
|
||||||
// it up to 10.
|
// it up to 10.
|
||||||
// Maximum value : 16
|
// Maximum value : 16
|
||||||
|
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
|
||||||
|
/// presence of numerous nodes, but exponentially bigger ring. Max 16
|
||||||
pub const PARTITION_BITS: usize = 8;
|
pub const PARTITION_BITS: usize = 8;
|
||||||
|
|
||||||
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
|
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
|
||||||
|
|
||||||
// TODO: make this constant paraetrizable in the config file
|
// TODO: make this constant paraetrizable in the config file
|
||||||
// (most deployments use a replication factor of 3, so...)
|
// (most deployments use a replication factor of 3, so...)
|
||||||
|
/// The maximum number of time an object might get replicated
|
||||||
pub const MAX_REPLICATION: usize = 3;
|
pub const MAX_REPLICATION: usize = 3;
|
||||||
|
|
||||||
|
/// The versionned configurations of all nodes known in the network
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct NetworkConfig {
|
pub struct NetworkConfig {
|
||||||
|
/// Map of each node's id to it's configuration
|
||||||
pub members: HashMap<UUID, NetworkConfigEntry>,
|
pub members: HashMap<UUID, NetworkConfigEntry>,
|
||||||
|
/// Version of this config
|
||||||
pub version: u64,
|
pub version: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,26 +45,40 @@ impl NetworkConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The overall configuration of one (possibly remote) node
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct NetworkConfigEntry {
|
pub struct NetworkConfigEntry {
|
||||||
|
/// Datacenter at which this entry belong. This infromation might be used to perform a better
|
||||||
|
/// geodistribution
|
||||||
pub datacenter: String,
|
pub datacenter: String,
|
||||||
|
/// The (relative) capacity of the node
|
||||||
pub capacity: u32,
|
pub capacity: u32,
|
||||||
|
/// A tag to recognize the entry, not used for other things than display
|
||||||
pub tag: String,
|
pub tag: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A ring distributing fairly objects to nodes
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Ring {
|
pub struct Ring {
|
||||||
|
/// The network configuration used to generate this ring
|
||||||
pub config: NetworkConfig,
|
pub config: NetworkConfig,
|
||||||
|
/// The list of entries in the ring
|
||||||
pub ring: Vec<RingEntry>,
|
pub ring: Vec<RingEntry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An entry in the ring
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct RingEntry {
|
pub struct RingEntry {
|
||||||
|
/// The prefix of the Hash of object which should use this entry
|
||||||
pub location: Hash,
|
pub location: Hash,
|
||||||
|
/// The nodes in which a matching object should get stored
|
||||||
pub nodes: [UUID; MAX_REPLICATION],
|
pub nodes: [UUID; MAX_REPLICATION],
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Ring {
|
impl Ring {
|
||||||
|
// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
|
||||||
|
// levels of imbrication. It is basically impossible to test, maintain, or understand for an
|
||||||
|
// outsider.
|
||||||
pub(crate) fn new(config: NetworkConfig) -> Self {
|
pub(crate) fn new(config: NetworkConfig) -> Self {
|
||||||
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
|
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
|
||||||
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
|
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
|
||||||
|
@ -166,20 +188,16 @@ impl Ring {
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
// eprintln!("RING: --");
|
|
||||||
// for e in ring.iter() {
|
|
||||||
// eprintln!("{:?}", e);
|
|
||||||
// }
|
|
||||||
// eprintln!("END --");
|
|
||||||
|
|
||||||
Self { config, ring }
|
Self { config, ring }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the partition in which data would fall on
|
||||||
pub fn partition_of(&self, from: &Hash) -> Partition {
|
pub fn partition_of(&self, from: &Hash) -> Partition {
|
||||||
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
|
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
|
||||||
top >> (16 - PARTITION_BITS)
|
top >> (16 - PARTITION_BITS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the list of partitions and
|
||||||
pub fn partitions(&self) -> Vec<(Partition, Hash)> {
|
pub fn partitions(&self) -> Vec<(Partition, Hash)> {
|
||||||
let mut ret = vec![];
|
let mut ret = vec![];
|
||||||
|
|
||||||
|
@ -193,6 +211,7 @@ impl Ring {
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Walk the ring to find the n servers in which data should be replicated
|
||||||
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
|
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
|
||||||
if self.ring.len() != 1 << PARTITION_BITS {
|
if self.ring.len() != 1 << PARTITION_BITS {
|
||||||
warn!("Ring not yet ready, read/writes will be lost!");
|
warn!("Ring not yet ready, read/writes will be lost!");
|
||||||
|
@ -201,12 +220,15 @@ impl Ring {
|
||||||
|
|
||||||
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
|
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
|
||||||
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
|
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
|
||||||
|
// TODO why computing two time in the same way and asserting?
|
||||||
assert_eq!(partition_idx, self.partition_of(from) as usize);
|
assert_eq!(partition_idx, self.partition_of(from) as usize);
|
||||||
|
|
||||||
let partition = &self.ring[partition_idx];
|
let partition = &self.ring[partition_idx];
|
||||||
|
|
||||||
let partition_top =
|
let partition_top =
|
||||||
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
|
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
|
||||||
|
// TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should
|
||||||
|
// probably be a test more than a runtime assertion
|
||||||
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
|
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
|
||||||
|
|
||||||
assert!(n <= partition.nodes.len());
|
assert!(n <= partition.nodes.len());
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//! Contain structs related to making RPCs
|
||||||
use std::borrow::Borrow;
|
use std::borrow::Borrow;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
@ -26,14 +27,19 @@ use crate::tls_util;
|
||||||
|
|
||||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
|
/// Strategy to apply when making RPC
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
pub struct RequestStrategy {
|
pub struct RequestStrategy {
|
||||||
|
/// Max time to wait for reponse
|
||||||
pub rs_timeout: Duration,
|
pub rs_timeout: Duration,
|
||||||
|
/// Min number of response to consider the request successful
|
||||||
pub rs_quorum: usize,
|
pub rs_quorum: usize,
|
||||||
|
/// Should requests be dropped after enough response are received
|
||||||
pub rs_interrupt_after_quorum: bool,
|
pub rs_interrupt_after_quorum: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RequestStrategy {
|
impl RequestStrategy {
|
||||||
|
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
|
||||||
pub fn with_quorum(quorum: usize) -> Self {
|
pub fn with_quorum(quorum: usize) -> Self {
|
||||||
RequestStrategy {
|
RequestStrategy {
|
||||||
rs_timeout: DEFAULT_TIMEOUT,
|
rs_timeout: DEFAULT_TIMEOUT,
|
||||||
|
@ -41,19 +47,24 @@ impl RequestStrategy {
|
||||||
rs_interrupt_after_quorum: false,
|
rs_interrupt_after_quorum: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// Set timeout of the strategy
|
||||||
pub fn with_timeout(mut self, timeout: Duration) -> Self {
|
pub fn with_timeout(mut self, timeout: Duration) -> Self {
|
||||||
self.rs_timeout = timeout;
|
self.rs_timeout = timeout;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
/// Set if requests can be dropped after quorum has been reached
|
||||||
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
|
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
|
||||||
self.rs_interrupt_after_quorum = interrupt;
|
self.rs_interrupt_after_quorum = interrupt;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Shortcut for a boxed async function taking a message, and resolving to another message or an
|
||||||
|
/// error
|
||||||
pub type LocalHandlerFn<M> =
|
pub type LocalHandlerFn<M> =
|
||||||
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
|
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
|
||||||
|
|
||||||
|
/// Client used to send RPC
|
||||||
pub struct RpcClient<M: RpcMessage> {
|
pub struct RpcClient<M: RpcMessage> {
|
||||||
status: watch::Receiver<Arc<Status>>,
|
status: watch::Receiver<Arc<Status>>,
|
||||||
background: Arc<BackgroundRunner>,
|
background: Arc<BackgroundRunner>,
|
||||||
|
@ -64,6 +75,7 @@ pub struct RpcClient<M: RpcMessage> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: RpcMessage + 'static> RpcClient<M> {
|
impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
|
/// Create a new RpcClient from an address, a job runner, and the status of all RPC servers
|
||||||
pub fn new(
|
pub fn new(
|
||||||
rac: RpcAddrClient<M>,
|
rac: RpcAddrClient<M>,
|
||||||
background: Arc<BackgroundRunner>,
|
background: Arc<BackgroundRunner>,
|
||||||
|
@ -77,6 +89,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the local handler, to process RPC to this node without network usage
|
||||||
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
|
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
|
||||||
where
|
where
|
||||||
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
|
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
|
||||||
|
@ -90,14 +103,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
self.local_handler.swap(Some(Arc::new((my_id, handler))));
|
self.local_handler.swap(Some(Arc::new((my_id, handler))));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn by_addr(&self) -> &RpcAddrClient<M> {
|
/// Get the server address this client connect to
|
||||||
|
pub fn get_addr(&self) -> &RpcAddrClient<M> {
|
||||||
&self.rpc_addr_client
|
&self.rpc_addr_client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make a RPC call
|
||||||
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
|
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
|
||||||
self.call_arc(to, Arc::new(msg), timeout).await
|
self.call_arc(to, Arc::new(msg), timeout).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make a RPC call from a message stored in an Arc
|
||||||
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
|
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
|
||||||
if let Some(lh) = self.local_handler.load_full() {
|
if let Some(lh) = self.local_handler.load_full() {
|
||||||
let (my_id, local_handler) = lh.as_ref();
|
let (my_id, local_handler) = lh.as_ref();
|
||||||
|
@ -135,6 +151,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make a RPC call to multiple servers, returning a Vec containing each result
|
||||||
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
|
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
|
||||||
let msg = Arc::new(msg);
|
let msg = Arc::new(msg);
|
||||||
let mut resp_stream = to
|
let mut resp_stream = to
|
||||||
|
@ -149,6 +166,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
results
|
results
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
|
||||||
|
/// strategy could not be respected due to to many errors
|
||||||
pub async fn try_call_many(
|
pub async fn try_call_many(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
to: &[UUID],
|
to: &[UUID],
|
||||||
|
@ -208,6 +227,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Endpoint to which send RPC
|
||||||
pub struct RpcAddrClient<M: RpcMessage> {
|
pub struct RpcAddrClient<M: RpcMessage> {
|
||||||
phantom: PhantomData<M>,
|
phantom: PhantomData<M>,
|
||||||
|
|
||||||
|
@ -216,6 +236,7 @@ pub struct RpcAddrClient<M: RpcMessage> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: RpcMessage> RpcAddrClient<M> {
|
impl<M: RpcMessage> RpcAddrClient<M> {
|
||||||
|
/// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs
|
||||||
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
|
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
|
||||||
Self {
|
Self {
|
||||||
phantom: PhantomData::default(),
|
phantom: PhantomData::default(),
|
||||||
|
@ -224,6 +245,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make a RPC
|
||||||
pub async fn call<MB>(
|
pub async fn call<MB>(
|
||||||
&self,
|
&self,
|
||||||
to_addr: &SocketAddr,
|
to_addr: &SocketAddr,
|
||||||
|
@ -239,6 +261,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// HTTP client used to make RPCs
|
||||||
pub struct RpcHttpClient {
|
pub struct RpcHttpClient {
|
||||||
request_limiter: Semaphore,
|
request_limiter: Semaphore,
|
||||||
method: ClientMethod,
|
method: ClientMethod,
|
||||||
|
@ -250,6 +273,7 @@ enum ClientMethod {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpcHttpClient {
|
impl RpcHttpClient {
|
||||||
|
/// Create a new RpcHttpClient
|
||||||
pub fn new(
|
pub fn new(
|
||||||
max_concurrent_requests: usize,
|
max_concurrent_requests: usize,
|
||||||
tls_config: &Option<TlsConfig>,
|
tls_config: &Option<TlsConfig>,
|
||||||
|
@ -280,6 +304,7 @@ impl RpcHttpClient {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Make a RPC
|
||||||
async fn call<M, MB>(
|
async fn call<M, MB>(
|
||||||
&self,
|
&self,
|
||||||
path: &str,
|
path: &str,
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//! Contains structs related to receiving RPCs
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
@ -22,13 +23,17 @@ use garage_util::error::Error;
|
||||||
|
|
||||||
use crate::tls_util;
|
use crate::tls_util;
|
||||||
|
|
||||||
|
/// Trait for messages that can be sent as RPC
|
||||||
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
|
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
|
||||||
|
|
||||||
type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
|
type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
|
||||||
type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
|
type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
|
||||||
|
|
||||||
|
/// Structure handling RPCs
|
||||||
pub struct RpcServer {
|
pub struct RpcServer {
|
||||||
|
/// The address the RpcServer will bind
|
||||||
pub bind_addr: SocketAddr,
|
pub bind_addr: SocketAddr,
|
||||||
|
/// The tls configuration used for RPC
|
||||||
pub tls_config: Option<TlsConfig>,
|
pub tls_config: Option<TlsConfig>,
|
||||||
|
|
||||||
handlers: HashMap<String, Handler>,
|
handlers: HashMap<String, Handler>,
|
||||||
|
@ -87,6 +92,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpcServer {
|
impl RpcServer {
|
||||||
|
/// Create a new RpcServer
|
||||||
pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
|
pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
bind_addr,
|
bind_addr,
|
||||||
|
@ -95,6 +101,7 @@ impl RpcServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add handler handling request made to `name`
|
||||||
pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
|
pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
|
||||||
where
|
where
|
||||||
M: RpcMessage + 'static,
|
M: RpcMessage + 'static,
|
||||||
|
@ -156,6 +163,7 @@ impl RpcServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Run the RpcServer
|
||||||
pub async fn run(
|
pub async fn run(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
shutdown_signal: impl Future<Output = ()>,
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
|
|
Loading…
Reference in a new issue