fixes to RPC networking #721

Merged
lx merged 3 commits from networking-fixes into main 2024-02-19 11:44:05 +00:00
6 changed files with 61 additions and 19 deletions

View file

@ -27,6 +27,7 @@ compression_level = 1
rpc_secret = "4425f5c26c5e11581d3223904324dcb5b5d5dfb14e5e7f35e38c595424f5f1e6" rpc_secret = "4425f5c26c5e11581d3223904324dcb5b5d5dfb14e5e7f35e38c595424f5f1e6"
rpc_bind_addr = "[::]:3901" rpc_bind_addr = "[::]:3901"
rpc_bind_outgoing = false
rpc_public_addr = "[fc00:1::1]:3901" rpc_public_addr = "[fc00:1::1]:3901"
bootstrap_peers = [ bootstrap_peers = [
@ -91,6 +92,7 @@ Top-level configuration options:
[`metadata_fsync`](#metadata_fsync), [`metadata_fsync`](#metadata_fsync),
[`replication_mode`](#replication_mode), [`replication_mode`](#replication_mode),
[`rpc_bind_addr`](#rpc_bind_addr), [`rpc_bind_addr`](#rpc_bind_addr),
[`rpc_bind_outgoing`](#rpc_bind_outgoing),
[`rpc_public_addr`](#rpc_public_addr), [`rpc_public_addr`](#rpc_public_addr),
[`rpc_secret`/`rpc_secret_file`](#rpc_secret), [`rpc_secret`/`rpc_secret_file`](#rpc_secret),
[`sled_cache_capacity`](#sled_cache_capacity), [`sled_cache_capacity`](#sled_cache_capacity),
@ -415,6 +417,17 @@ the node, even in the case of a NAT: the NAT should be configured to forward the
port number to the same internal port nubmer. This means that if you have several nodes running port number to the same internal port nubmer. This means that if you have several nodes running
behind a NAT, they should each use a different RPC port number. behind a NAT, they should each use a different RPC port number.
#### `rpc_bind_outgoing` {#rpc_bind_outgoing} (since v0.9.2)
If enabled, pre-bind all sockets for outgoing connections to the same IP address
used for listening (the IP address specified in `rpc_bind_addr`) before
trying to connect to remote nodes.
This can be necessary if a node has multiple IP addresses,
but only one is allowed or able to reach the other nodes,
for instance due to firewall rules or specific routing configuration.
Disabled by default.
#### `rpc_public_addr` {#rpc_public_addr} #### `rpc_public_addr` {#rpc_public_addr}
The address and port that other nodes need to use to contact this node for The address and port that other nodes need to use to contact this node for

View file

@ -203,7 +203,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Generate a temporary keypair for our RPC client // Generate a temporary keypair for our RPC client
let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair(); let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair();
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk); let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk, None);
// Find and parse the address of the target host // Find and parse the address of the target host
let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host { let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host {

View file

@ -13,7 +13,7 @@ use sodiumoxide::crypto::sign::ed25519;
use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
@ -38,6 +38,11 @@ pub(crate) type VersionTag = [u8; 16];
/// Value of the Netapp version used in the version tag /// Value of the Netapp version used in the version tag
pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6e65746170700005; // netapp 0x0005 pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6e65746170700005; // netapp 0x0005
/// HelloMessage is sent by the client on a Netapp connection to indicate
/// that they are also a server and ready to recieve incoming connections
/// at the specified address and port. If the client doesn't know their
/// public address, they don't need to specify it and we look at the
/// remote address of the socket is used instead.
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub(crate) struct HelloMessage { pub(crate) struct HelloMessage {
pub server_addr: Option<IpAddr>, pub server_addr: Option<IpAddr>,
@ -56,10 +61,8 @@ type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>;
/// NetApp can be used in a stand-alone fashion or together with a peering strategy. /// NetApp can be used in a stand-alone fashion or together with a peering strategy.
/// If using it alone, you will want to set `on_connect` and `on_disconnect` events /// If using it alone, you will want to set `on_connect` and `on_disconnect` events
/// in order to manage information about the current peer list. /// in order to manage information about the current peer list.
///
/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh
/// and RPS peering strategies take care of the most common use cases.
pub struct NetApp { pub struct NetApp {
bind_outgoing_to: Option<IpAddr>,
listen_params: ArcSwapOption<ListenParams>, listen_params: ArcSwapOption<ListenParams>,
/// Version tag, 8 bytes for netapp version, 8 bytes for app version /// Version tag, 8 bytes for netapp version, 8 bytes for app version
@ -83,7 +86,7 @@ pub struct NetApp {
struct ListenParams { struct ListenParams {
listen_addr: SocketAddr, listen_addr: SocketAddr,
public_addr: Option<IpAddr>, public_addr: Option<SocketAddr>,
} }
impl NetApp { impl NetApp {
@ -92,13 +95,19 @@ impl NetApp {
/// using `.listen()` /// using `.listen()`
/// ///
/// Our Peer ID is the public key associated to the secret key given here. /// Our Peer ID is the public key associated to the secret key given here.
pub fn new(app_version_tag: u64, netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> { pub fn new(
app_version_tag: u64,
netid: auth::Key,
privkey: ed25519::SecretKey,
bind_outgoing_to: Option<IpAddr>,
) -> Arc<Self> {
let mut version_tag = [0u8; 16]; let mut version_tag = [0u8; 16];
version_tag[0..8].copy_from_slice(&u64::to_be_bytes(NETAPP_VERSION_TAG)[..]); version_tag[0..8].copy_from_slice(&u64::to_be_bytes(NETAPP_VERSION_TAG)[..]);
version_tag[8..16].copy_from_slice(&u64::to_be_bytes(app_version_tag)[..]); version_tag[8..16].copy_from_slice(&u64::to_be_bytes(app_version_tag)[..]);
let id = privkey.public_key(); let id = privkey.public_key();
let netapp = Arc::new(Self { let netapp = Arc::new(Self {
bind_outgoing_to,
listen_params: ArcSwapOption::new(None), listen_params: ArcSwapOption::new(None),
version_tag, version_tag,
netid, netid,
@ -180,7 +189,7 @@ impl NetApp {
pub async fn listen( pub async fn listen(
self: Arc<Self>, self: Arc<Self>,
listen_addr: SocketAddr, listen_addr: SocketAddr,
public_addr: Option<IpAddr>, public_addr: Option<SocketAddr>,
mut must_exit: watch::Receiver<bool>, mut must_exit: watch::Receiver<bool>,
) { ) {
let listen_params = ListenParams { let listen_params = ListenParams {
@ -298,9 +307,20 @@ impl NetApp {
return Ok(()); return Ok(());
} }
let socket = TcpStream::connect(ip).await?; let stream = match self.bind_outgoing_to {
Some(addr) => {
let socket = if addr.is_ipv4() {
TcpSocket::new_v4()?
} else {
TcpSocket::new_v6()?
};
socket.bind(SocketAddr::new(addr, 0))?;
socket.connect(ip).await?
}
None => TcpStream::connect(ip).await?,
};
info!("Connected to {}, negotiating handshake...", ip); info!("Connected to {}, negotiating handshake...", ip);
ClientConn::init(self, socket, id).await?; ClientConn::init(self, stream, id).await?;
Ok(()) Ok(())
} }
@ -396,8 +416,11 @@ impl NetApp {
} }
if let Some(lp) = self.listen_params.load_full() { if let Some(lp) = self.listen_params.load_full() {
let server_addr = lp.public_addr; let server_addr = lp.public_addr.map(|x| x.ip());
let server_port = lp.listen_addr.port(); let server_port = lp
.public_addr
.map(|x| x.port())
.unwrap_or(lp.listen_addr.port());
let hello_endpoint = self.hello_endpoint.load_full().unwrap(); let hello_endpoint = self.hello_endpoint.load_full().unwrap();
tokio::spawn(async move { tokio::spawn(async move {
hello_endpoint hello_endpoint

View file

@ -102,7 +102,7 @@ fn run_netapp(
Arc<NetApp>, Arc<NetApp>,
Arc<PeeringManager>, Arc<PeeringManager>,
) { ) {
let netapp = NetApp::new(0u64, netid, sk); let netapp = NetApp::new(0u64, netid, sk, None);
let peering = PeeringManager::new(netapp.clone(), bootstrap_peers, None); let peering = PeeringManager::new(netapp.clone(), bootstrap_peers, None);
let peering2 = peering.clone(); let peering2 = peering.clone();

View file

@ -98,7 +98,6 @@ pub struct System {
system_endpoint: Arc<Endpoint<SystemRpc, System>>, system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr, rpc_listen_addr: SocketAddr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr: Option<SocketAddr>, rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<String>, bootstrap_peers: Vec<String>,
@ -325,7 +324,10 @@ impl System {
warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication."); warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication.");
} }
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let bind_outgoing_to = Some(config)
.filter(|x| x.rpc_bind_outgoing)
.map(|x| x.rpc_bind_addr.ip());
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key, bind_outgoing_to);
let peering = PeeringManager::new(netapp.clone(), vec![], rpc_public_addr); let peering = PeeringManager::new(netapp.clone(), vec![], rpc_public_addr);
if let Some(ping_timeout) = config.rpc_ping_timeout_msec { if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
peering.set_ping_timeout_millis(ping_timeout); peering.set_ping_timeout_millis(ping_timeout);
@ -369,7 +371,6 @@ impl System {
replication_mode, replication_mode,
replication_factor, replication_factor,
rpc_listen_addr: config.rpc_bind_addr, rpc_listen_addr: config.rpc_bind_addr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr, rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(), bootstrap_peers: config.bootstrap_peers.clone(),
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
@ -390,9 +391,11 @@ impl System {
/// Perform bootstraping, starting the ping loop /// Perform bootstraping, starting the ping loop
pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) { pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
join!( join!(
self.netapp self.netapp.clone().listen(
.clone() self.rpc_listen_addr,
.listen(self.rpc_listen_addr, None, must_exit.clone()), self.rpc_public_addr,
must_exit.clone()
),
self.peering.clone().run(must_exit.clone()), self.peering.clone().run(must_exit.clone()),
self.discovery_loop(must_exit.clone()), self.discovery_loop(must_exit.clone()),
self.status_exchange_loop(must_exit.clone()), self.status_exchange_loop(must_exit.clone()),

View file

@ -55,6 +55,9 @@ pub struct Config {
pub rpc_secret_file: Option<PathBuf>, pub rpc_secret_file: Option<PathBuf>,
/// Address to bind for RPC /// Address to bind for RPC
pub rpc_bind_addr: SocketAddr, pub rpc_bind_addr: SocketAddr,
/// Bind outgoing sockets to rpc_bind_addr's IP address as well
#[serde(default)]
pub rpc_bind_outgoing: bool,
/// Public IP address of this node /// Public IP address of this node
pub rpc_public_addr: Option<String>, pub rpc_public_addr: Option<String>,