Merge branch 'main' into next-0.10

This commit is contained in:
Alex 2024-02-20 17:02:44 +01:00
commit 643d1aabd8
Signed by: lx
GPG key ID: 0E496D15096376BE
10 changed files with 531 additions and 168 deletions

View file

@ -17,7 +17,7 @@ data_fsync = false
db_engine = "lmdb" db_engine = "lmdb"
block_size = 1048576 block_size = "1M"
sled_cache_capacity = "128MiB" sled_cache_capacity = "128MiB"
sled_flush_every_ms = 2000 sled_flush_every_ms = 2000
@ -27,11 +27,12 @@ compression_level = 1
rpc_secret = "4425f5c26c5e11581d3223904324dcb5b5d5dfb14e5e7f35e38c595424f5f1e6" rpc_secret = "4425f5c26c5e11581d3223904324dcb5b5d5dfb14e5e7f35e38c595424f5f1e6"
rpc_bind_addr = "[::]:3901" rpc_bind_addr = "[::]:3901"
rpc_bind_outgoing = false
rpc_public_addr = "[fc00:1::1]:3901" rpc_public_addr = "[fc00:1::1]:3901"
bootstrap_peers = [ bootstrap_peers = [
"563e1ac825ee3323aa441e72c26d1030d6d4414aeb3dd25287c531e7fc2bc95d@[fc00:1::1]:3901", "563e1ac825ee3323aa441e72c26d1030d6d4414aeb3dd25287c531e7fc2bc95d@[fc00:1::1]:3901",
"86f0f26ae4afbd59aaf9cfb059eefac844951efd5b8caeec0d53f4ed6c85f332[fc00:1::2]:3901", "86f0f26ae4afbd59aaf9cfb059eefac844951efd5b8caeec0d53f4ed6c85f332@[fc00:1::2]:3901",
"681456ab91350f92242e80a531a3ec9392cb7c974f72640112f90a600d7921a4@[fc00:B::1]:3901", "681456ab91350f92242e80a531a3ec9392cb7c974f72640112f90a600d7921a4@[fc00:B::1]:3901",
"212fd62eeaca72c122b45a7f4fa0f55e012aa5e24ac384a72a3016413fa724ff@[fc00:F::1]:3901", "212fd62eeaca72c122b45a7f4fa0f55e012aa5e24ac384a72a3016413fa724ff@[fc00:F::1]:3901",
] ]
@ -83,7 +84,7 @@ Top-level configuration options:
[`block_size`](#block_size), [`block_size`](#block_size),
[`bootstrap_peers`](#bootstrap_peers), [`bootstrap_peers`](#bootstrap_peers),
[`compression_level`](#compression_level), [`compression_level`](#compression_level),
[`data_dir`](#metadata_dir), [`data_dir`](#data_dir),
[`data_fsync`](#data_fsync), [`data_fsync`](#data_fsync),
[`db_engine`](#db_engine), [`db_engine`](#db_engine),
[`lmdb_map_size`](#lmdb_map_size), [`lmdb_map_size`](#lmdb_map_size),
@ -91,21 +92,21 @@ Top-level configuration options:
[`metadata_fsync`](#metadata_fsync), [`metadata_fsync`](#metadata_fsync),
[`replication_mode`](#replication_mode), [`replication_mode`](#replication_mode),
[`rpc_bind_addr`](#rpc_bind_addr), [`rpc_bind_addr`](#rpc_bind_addr),
[`rpc_bind_outgoing`](#rpc_bind_outgoing),
[`rpc_public_addr`](#rpc_public_addr), [`rpc_public_addr`](#rpc_public_addr),
[`rpc_secret`](#rpc_secret), [`rpc_secret`/`rpc_secret_file`](#rpc_secret),
[`rpc_secret_file`](#rpc_secret),
[`sled_cache_capacity`](#sled_cache_capacity), [`sled_cache_capacity`](#sled_cache_capacity),
[`sled_flush_every_ms`](#sled_flush_every_ms). [`sled_flush_every_ms`](#sled_flush_every_ms).
The `[consul_discovery]` section: The `[consul_discovery]` section:
[`api`](#consul_api), [`api`](#consul_api),
[`ca_cert`](#consul_ca_cert), [`ca_cert`](#consul_ca_cert),
[`client_cert`](#consul_client_cert), [`client_cert`](#consul_client_cert_and_key),
[`client_key`](#consul_client_cert), [`client_key`](#consul_client_cert_and_key),
[`consul_http_addr`](#consul_http_addr), [`consul_http_addr`](#consul_http_addr),
[`meta`](#consul_tags), [`meta`](#consul_tags_and_meta),
[`service_name`](#consul_service_name), [`service_name`](#consul_service_name),
[`tags`](#consul_tags), [`tags`](#consul_tags_and_meta),
[`tls_skip_verify`](#consul_tls_skip_verify), [`tls_skip_verify`](#consul_tls_skip_verify),
[`token`](#consul_token). [`token`](#consul_token).
@ -125,10 +126,8 @@ The `[s3_web]` section:
The `[admin]` section: The `[admin]` section:
[`api_bind_addr`](#admin_api_bind_addr), [`api_bind_addr`](#admin_api_bind_addr),
[`metrics_token`](#admin_metrics_token), [`metrics_token`/`metrics_token_file`](#admin_metrics_token),
[`metrics_token_file`](#admin_metrics_token), [`admin_token`/`admin_token_file`](#admin_token),
[`admin_token`](#admin_token),
[`admin_token_file`](#admin_token),
[`trace_sink`](#admin_trace_sink), [`trace_sink`](#admin_trace_sink),
@ -418,6 +417,17 @@ the node, even in the case of a NAT: the NAT should be configured to forward the
port number to the same internal port nubmer. This means that if you have several nodes running port number to the same internal port nubmer. This means that if you have several nodes running
behind a NAT, they should each use a different RPC port number. behind a NAT, they should each use a different RPC port number.
#### `rpc_bind_outgoing` {#rpc_bind_outgoing} (since v0.9.2)
If enabled, pre-bind all sockets for outgoing connections to the same IP address
used for listening (the IP address specified in `rpc_bind_addr`) before
trying to connect to remote nodes.
This can be necessary if a node has multiple IP addresses,
but only one is allowed or able to reach the other nodes,
for instance due to firewall rules or specific routing configuration.
Disabled by default.
#### `rpc_public_addr` {#rpc_public_addr} #### `rpc_public_addr` {#rpc_public_addr}
The address and port that other nodes need to use to contact this node for The address and port that other nodes need to use to contact this node for
@ -474,7 +484,7 @@ the `/v1/catalog` endpoints, enabling mTLS if `client_cert` and `client_key` are
`service_name` should be set to the service name under which Garage's `service_name` should be set to the service name under which Garage's
RPC ports are announced. RPC ports are announced.
#### `client_cert`, `client_key` {#consul_client_cert} #### `client_cert`, `client_key` {#consul_client_cert_and_key}
TLS client certificate and client key to use when communicating with Consul over TLS. Both are mandatory when doing so. TLS client certificate and client key to use when communicating with Consul over TLS. Both are mandatory when doing so.
Only available when `api = "catalog"`. Only available when `api = "catalog"`.
@ -508,7 +518,7 @@ node_prefix "" {
} }
``` ```
#### `tags` and `meta` {#consul_tags} #### `tags` and `meta` {#consul_tags_and_meta}
Additional list of tags and map of service meta to add during service registration. Additional list of tags and map of service meta to add during service registration.

View file

@ -27,6 +27,112 @@ Exposes the Garage replication factor configured on the node
garage_replication_factor 3 garage_replication_factor 3
``` ```
#### `garage_local_disk_avail` and `garage_local_disk_total` (gauge)
Reports the available and total disk space on each node, for data and metadata separately.
```
garage_local_disk_avail{volume="data"} 540341960704
garage_local_disk_avail{volume="metadata"} 540341960704
garage_local_disk_total{volume="data"} 763063566336
garage_local_disk_total{volume="metadata"} 763063566336
```
### Cluster health status metrics
#### `cluster_healthy` (gauge)
Whether all storage nodes are connected (0 or 1)
```
cluster_healthy 0
```
#### `cluster_available` (gauge)
Whether all requests can be served, even if some storage nodes are disconnected
```
cluster_available 1
```
#### `cluster_connected_nodes` (gauge)
Number of nodes currently connected
```
cluster_connected_nodes 3
```
#### `cluster_known_nodes` (gauge)
Number of nodes already seen once in the cluster
```
cluster_known_nodes 3
```
#### `cluster_layout_node_connected` (gauge)
Connection status for individual nodes of the cluster layout
```
cluster_layout_node_connected{id="62b218d848e86a64",role_capacity="1000000000",role_gateway="0",role_zone="dc1"} 1
cluster_layout_node_connected{id="a11c7cf18af29737",role_capacity="1000000000",role_gateway="0",role_zone="dc1"} 0
cluster_layout_node_connected{id="a235ac7695e0c54d",role_capacity="1000000000",role_gateway="0",role_zone="dc1"} 1
cluster_layout_node_connected{id="b10c110e4e854e5a",role_capacity="1000000000",role_gateway="0",role_zone="dc1"} 1
```
#### `cluster_layout_node_disconnected_time` (gauge)
Time (in seconds) since last connection to individual nodes of the cluster layout
```
cluster_layout_node_disconnected_time{id="62b218d848e86a64",role_capacity="1000000000",role_gateway="0",role_zone="dc1"} 0
cluster_layout_node_disconnected_time{id="a235ac7695e0c54d",role_capacity="1000000000",role_gateway="0",role_zone="dc1"} 0
cluster_layout_node_disconnected_time{id="b10c110e4e854e5a",role_capacity="1000000000",role_gateway="0",role_zone="dc1"} 0
```
#### `cluster_storage_nodes` (gauge)
Number of storage nodes declared in the current layout
```
cluster_storage_nodes 4
```
#### `cluster_storage_nodes_ok` (gauge)
Number of storage nodes currently connected
```
cluster_storage_nodes_ok 3
```
#### `cluster_partitions` (gauge)
Number of partitions in the layout (this is always 256)
```
cluster_partitions 256
```
#### `cluster_partitions_all_ok` (gauge)
Number of partitions for which all storage nodes are connected
```
cluster_partitions_all_ok 64
```
#### `cluster_partitions_quorum` (gauge)
Number of partitions for which we have a quorum of connected nodes and all requests can be served
```
cluster_partitions_quorum 256
```
### Metrics of the API endpoints ### Metrics of the API endpoints
#### `api_admin_request_counter` (counter) #### `api_admin_request_counter` (counter)

View file

@ -203,7 +203,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Generate a temporary keypair for our RPC client // Generate a temporary keypair for our RPC client
let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair(); let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair();
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk); let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, sk, None);
// Find and parse the address of the target host // Find and parse the address of the target host
let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host { let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host {

View file

@ -162,6 +162,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Netapp exited"); info!("Netapp exited");
// Drop all references so that stuff can terminate properly // Drop all references so that stuff can terminate properly
garage.system.cleanup();
drop(garage); drop(garage);
// Await for all background tasks to end // Await for all background tasks to end

View file

@ -13,7 +13,7 @@ use sodiumoxide::crypto::sign::ed25519;
use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
@ -38,6 +38,11 @@ pub(crate) type VersionTag = [u8; 16];
/// Value of the Netapp version used in the version tag /// Value of the Netapp version used in the version tag
pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6e65746170700005; // netapp 0x0005 pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6e65746170700005; // netapp 0x0005
/// HelloMessage is sent by the client on a Netapp connection to indicate
/// that they are also a server and ready to recieve incoming connections
/// at the specified address and port. If the client doesn't know their
/// public address, they don't need to specify it and we look at the
/// remote address of the socket is used instead.
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub(crate) struct HelloMessage { pub(crate) struct HelloMessage {
pub server_addr: Option<IpAddr>, pub server_addr: Option<IpAddr>,
@ -56,10 +61,8 @@ type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>;
/// NetApp can be used in a stand-alone fashion or together with a peering strategy. /// NetApp can be used in a stand-alone fashion or together with a peering strategy.
/// If using it alone, you will want to set `on_connect` and `on_disconnect` events /// If using it alone, you will want to set `on_connect` and `on_disconnect` events
/// in order to manage information about the current peer list. /// in order to manage information about the current peer list.
///
/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh
/// and RPS peering strategies take care of the most common use cases.
pub struct NetApp { pub struct NetApp {
bind_outgoing_to: Option<IpAddr>,
listen_params: ArcSwapOption<ListenParams>, listen_params: ArcSwapOption<ListenParams>,
/// Version tag, 8 bytes for netapp version, 8 bytes for app version /// Version tag, 8 bytes for netapp version, 8 bytes for app version
@ -83,7 +86,7 @@ pub struct NetApp {
struct ListenParams { struct ListenParams {
listen_addr: SocketAddr, listen_addr: SocketAddr,
public_addr: Option<IpAddr>, public_addr: Option<SocketAddr>,
} }
impl NetApp { impl NetApp {
@ -92,13 +95,19 @@ impl NetApp {
/// using `.listen()` /// using `.listen()`
/// ///
/// Our Peer ID is the public key associated to the secret key given here. /// Our Peer ID is the public key associated to the secret key given here.
pub fn new(app_version_tag: u64, netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> { pub fn new(
app_version_tag: u64,
netid: auth::Key,
privkey: ed25519::SecretKey,
bind_outgoing_to: Option<IpAddr>,
) -> Arc<Self> {
let mut version_tag = [0u8; 16]; let mut version_tag = [0u8; 16];
version_tag[0..8].copy_from_slice(&u64::to_be_bytes(NETAPP_VERSION_TAG)[..]); version_tag[0..8].copy_from_slice(&u64::to_be_bytes(NETAPP_VERSION_TAG)[..]);
version_tag[8..16].copy_from_slice(&u64::to_be_bytes(app_version_tag)[..]); version_tag[8..16].copy_from_slice(&u64::to_be_bytes(app_version_tag)[..]);
let id = privkey.public_key(); let id = privkey.public_key();
let netapp = Arc::new(Self { let netapp = Arc::new(Self {
bind_outgoing_to,
listen_params: ArcSwapOption::new(None), listen_params: ArcSwapOption::new(None),
version_tag, version_tag,
netid, netid,
@ -180,7 +189,7 @@ impl NetApp {
pub async fn listen( pub async fn listen(
self: Arc<Self>, self: Arc<Self>,
listen_addr: SocketAddr, listen_addr: SocketAddr,
public_addr: Option<IpAddr>, public_addr: Option<SocketAddr>,
mut must_exit: watch::Receiver<bool>, mut must_exit: watch::Receiver<bool>,
) { ) {
let listen_params = ListenParams { let listen_params = ListenParams {
@ -298,9 +307,20 @@ impl NetApp {
return Ok(()); return Ok(());
} }
let socket = TcpStream::connect(ip).await?; let stream = match self.bind_outgoing_to {
Some(addr) => {
let socket = if addr.is_ipv4() {
TcpSocket::new_v4()?
} else {
TcpSocket::new_v6()?
};
socket.bind(SocketAddr::new(addr, 0))?;
socket.connect(ip).await?
}
None => TcpStream::connect(ip).await?,
};
info!("Connected to {}, negotiating handshake...", ip); info!("Connected to {}, negotiating handshake...", ip);
ClientConn::init(self, socket, id).await?; ClientConn::init(self, stream, id).await?;
Ok(()) Ok(())
} }
@ -396,8 +416,11 @@ impl NetApp {
} }
if let Some(lp) = self.listen_params.load_full() { if let Some(lp) = self.listen_params.load_full() {
let server_addr = lp.public_addr; let server_addr = lp.public_addr.map(|x| x.ip());
let server_port = lp.listen_addr.port(); let server_port = lp
.public_addr
.map(|x| x.port())
.unwrap_or(lp.listen_addr.port());
let hello_endpoint = self.hello_endpoint.load_full().unwrap(); let hello_endpoint = self.hello_endpoint.load_full().unwrap();
tokio::spawn(async move { tokio::spawn(async move {
hello_endpoint hello_endpoint

View file

@ -80,6 +80,23 @@ impl PeerInfoInternal {
failed_pings: 0, failed_pings: 0,
} }
} }
fn add_addr(&mut self, addr: SocketAddr) -> bool {
if !self.all_addrs.contains(&addr) {
self.all_addrs.push(addr);
// If we are learning a new address for this node,
// we want to retry connecting
self.state = match self.state {
PeerConnState::Trying(_) => PeerConnState::Trying(0),
PeerConnState::Waiting(_, _) | PeerConnState::Abandonned => {
PeerConnState::Waiting(0, Instant::now())
}
x @ (PeerConnState::Ourself | PeerConnState::Connected) => x,
};
true
} else {
false
}
}
} }
/// Information that the full mesh peering strategy can return about the peers it knows of /// Information that the full mesh peering strategy can return about the peers it knows of
@ -147,23 +164,22 @@ struct KnownHosts {
impl KnownHosts { impl KnownHosts {
fn new() -> Self { fn new() -> Self {
let list = HashMap::new(); let list = HashMap::new();
let hash = Self::calculate_hash(&list); let hash = Self::calculate_hash(vec![]);
Self { list, hash } Self { list, hash }
} }
fn update_hash(&mut self) { fn update_hash(&mut self) {
self.hash = Self::calculate_hash(&self.list); self.hash = Self::calculate_hash(self.connected_peers_vec());
} }
fn map_into_vec(input: &HashMap<NodeID, PeerInfoInternal>) -> Vec<(NodeID, SocketAddr)> { fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> {
let mut list = Vec::with_capacity(input.len()); let mut list = Vec::with_capacity(self.list.len());
for (id, peer) in input.iter() { for (id, peer) in self.list.iter() {
if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { if peer.state.is_up() {
list.push((*id, peer.addr)); list.push((*id, peer.addr));
} }
} }
list list
} }
fn calculate_hash(input: &HashMap<NodeID, PeerInfoInternal>) -> hash::Digest { fn calculate_hash(mut list: Vec<(NodeID, SocketAddr)>) -> hash::Digest {
let mut list = Self::map_into_vec(input);
list.sort(); list.sort();
let mut hash_state = hash::State::new(); let mut hash_state = hash::State::new();
for (id, addr) in list { for (id, addr) in list {
@ -214,6 +230,7 @@ impl PeeringManager {
netapp.id, netapp.id,
PeerInfoInternal::new(addr, PeerConnState::Ourself), PeerInfoInternal::new(addr, PeerConnState::Ourself),
); );
known_hosts.update_hash();
} }
// TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility) // TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility)
@ -234,13 +251,11 @@ impl PeeringManager {
let strat2 = strat.clone(); let strat2 = strat.clone();
netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| {
let strat2 = strat2.clone();
strat2.on_connected(id, addr, is_incoming); strat2.on_connected(id, addr, is_incoming);
}); });
let strat2 = strat.clone(); let strat2 = strat.clone();
netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { netapp.on_disconnected(move |id: NodeID, is_incoming: bool| {
let strat2 = strat2.clone();
strat2.on_disconnected(id, is_incoming); strat2.on_disconnected(id, is_incoming);
}); });
@ -445,7 +460,7 @@ impl PeeringManager {
} }
async fn exchange_peers(self: Arc<Self>, id: &NodeID) { async fn exchange_peers(self: Arc<Self>, id: &NodeID) {
let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); let peer_list = self.known_hosts.read().unwrap().connected_peers_vec();
let pex_message = PeerListMessage { list: peer_list }; let pex_message = PeerListMessage { list: peer_list };
match self match self
.peer_list_endpoint .peer_list_endpoint
@ -465,8 +480,7 @@ impl PeeringManager {
let mut changed = false; let mut changed = false;
for (id, addr) in list.iter() { for (id, addr) in list.iter() {
if let Some(kh) = known_hosts.list.get_mut(id) { if let Some(kh) = known_hosts.list.get_mut(id) {
if !kh.all_addrs.contains(addr) { if kh.add_addr(*addr) {
kh.all_addrs.push(*addr);
changed = true; changed = true;
} }
} else { } else {
@ -534,13 +548,11 @@ impl PeeringManager {
} }
} }
fn on_connected(self: Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) { fn on_connected(self: &Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
let mut known_hosts = self.known_hosts.write().unwrap(); let mut known_hosts = self.known_hosts.write().unwrap();
if is_incoming { if is_incoming {
if let Some(host) = known_hosts.list.get_mut(&id) { if let Some(host) = known_hosts.list.get_mut(&id) {
if !host.all_addrs.contains(&addr) { host.add_addr(addr);
host.all_addrs.push(addr);
}
} else { } else {
known_hosts.list.insert(id, self.new_peer(&id, addr)); known_hosts.list.insert(id, self.new_peer(&id, addr));
} }
@ -553,9 +565,7 @@ impl PeeringManager {
if let Some(host) = known_hosts.list.get_mut(&id) { if let Some(host) = known_hosts.list.get_mut(&id) {
host.state = PeerConnState::Connected; host.state = PeerConnState::Connected;
host.addr = addr; host.addr = addr;
if !host.all_addrs.contains(&addr) { host.add_addr(addr);
host.all_addrs.push(addr);
}
} else { } else {
known_hosts known_hosts
.list .list
@ -566,7 +576,7 @@ impl PeeringManager {
self.update_public_peer_list(&known_hosts); self.update_public_peer_list(&known_hosts);
} }
fn on_disconnected(self: Arc<Self>, id: NodeID, is_incoming: bool) { fn on_disconnected(self: &Arc<Self>, id: NodeID, is_incoming: bool) {
if !is_incoming { if !is_incoming {
info!("Connection to {} was closed", hex::encode(&id[..8])); info!("Connection to {} was closed", hex::encode(&id[..8]));
let mut known_hosts = self.known_hosts.write().unwrap(); let mut known_hosts = self.known_hosts.write().unwrap();
@ -608,7 +618,7 @@ impl EndpointHandler<PeerListMessage> for PeeringManager {
_from: NodeID, _from: NodeID,
) -> PeerListMessage { ) -> PeerListMessage {
self.handle_peer_list(&peer_list.list[..]); self.handle_peer_list(&peer_list.list[..]);
let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); let peer_list = self.known_hosts.read().unwrap().connected_peers_vec();
PeerListMessage { list: peer_list } PeerListMessage { list: peer_list }
} }
} }

View file

@ -102,7 +102,7 @@ fn run_netapp(
Arc<NetApp>, Arc<NetApp>,
Arc<PeeringManager>, Arc<PeeringManager>,
) { ) {
let netapp = NetApp::new(0u64, netid, sk); let netapp = NetApp::new(0u64, netid, sk, None);
let peering = PeeringManager::new(netapp.clone(), bootstrap_peers, None); let peering = PeeringManager::new(netapp.clone(), bootstrap_peers, None);
let peering2 = peering.clone(); let peering2 = peering.clone();

View file

@ -3,11 +3,10 @@ use std::collections::{HashMap, HashSet};
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use arc_swap::ArcSwap; use arc_swap::ArcSwapOption;
use async_trait::async_trait; use async_trait::async_trait;
use futures::join; use futures::join;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -92,7 +91,7 @@ pub struct System {
persist_peer_list: Persister<PeerList>, persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>, pub(crate) local_status: RwLock<NodeStatus>,
node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>, node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
pub netapp: Arc<NetApp>, pub netapp: Arc<NetApp>,
@ -101,7 +100,6 @@ pub struct System {
pub(crate) system_endpoint: Arc<Endpoint<SystemRpc, System>>, pub(crate) system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr, rpc_listen_addr: SocketAddr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr: Option<SocketAddr>, rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<String>, bootstrap_peers: Vec<String>,
@ -112,10 +110,10 @@ pub struct System {
pub layout_manager: Arc<LayoutManager>, pub layout_manager: Arc<LayoutManager>,
metrics: SystemMetrics, metrics: ArcSwapOption<SystemMetrics>,
replication_mode: ReplicationMode, replication_mode: ReplicationMode,
replication_factor: usize, pub(crate) replication_factor: usize,
/// Path to metadata directory /// Path to metadata directory
pub metadata_dir: PathBuf, pub metadata_dir: PathBuf,
@ -171,7 +169,7 @@ pub struct ClusterHealth {
pub partitions_all_ok: usize, pub partitions_all_ok: usize,
} }
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ClusterHealthStatus { pub enum ClusterHealthStatus {
/// All nodes are available /// All nodes are available
Healthy, Healthy,
@ -256,7 +254,10 @@ impl System {
hex::encode(&node_key.public_key()[..8]) hex::encode(&node_key.public_key()[..8])
); );
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let bind_outgoing_to = Some(config)
.filter(|x| x.rpc_bind_outgoing)
.map(|x| x.rpc_bind_addr.ip());
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key, bind_outgoing_to);
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
// ---- setup netapp public listener and full mesh peering strategy ---- // ---- setup netapp public listener and full mesh peering strategy ----
@ -283,11 +284,8 @@ impl System {
replication_mode, replication_mode,
)?; )?;
// ---- set up metrics and status exchange ----
let metrics = SystemMetrics::new(replication_factor);
let mut local_status = NodeStatus::initial(replication_factor, &layout_manager); let mut local_status = NodeStatus::initial(replication_factor, &layout_manager);
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); local_status.update_disk_usage(&config.metadata_dir, &config.data_dir);
// ---- if enabled, set up additionnal peer discovery methods ---- // ---- if enabled, set up additionnal peer discovery methods ----
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
@ -308,11 +306,11 @@ impl System {
warn!("Kubernetes discovery is not enabled in this build."); warn!("Kubernetes discovery is not enabled in this build.");
} }
// ---- done ---- // ---- almost done ----
let sys = Arc::new(System { let sys = Arc::new(System {
id: netapp.id.into(), id: netapp.id.into(),
persist_peer_list, persist_peer_list,
local_status: ArcSwap::new(Arc::new(local_status)), local_status: RwLock::new(local_status),
node_status: RwLock::new(HashMap::new()), node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(), netapp: netapp.clone(),
peering: peering.clone(), peering: peering.clone(),
@ -320,7 +318,6 @@ impl System {
replication_mode, replication_mode,
replication_factor, replication_factor,
rpc_listen_addr: config.rpc_bind_addr, rpc_listen_addr: config.rpc_bind_addr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr, rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(), bootstrap_peers: config.bootstrap_peers.clone(),
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
@ -328,27 +325,39 @@ impl System {
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(), kubernetes_discovery: config.kubernetes_discovery.clone(),
layout_manager, layout_manager,
metrics, metrics: ArcSwapOption::new(None),
metadata_dir: config.metadata_dir.clone(), metadata_dir: config.metadata_dir.clone(),
data_dir: config.data_dir.clone(), data_dir: config.data_dir.clone(),
}); });
sys.system_endpoint.set_handler(sys.clone()); sys.system_endpoint.set_handler(sys.clone());
let metrics = SystemMetrics::new(sys.clone());
sys.metrics.store(Some(Arc::new(metrics)));
Ok(sys) Ok(sys)
} }
/// Perform bootstraping, starting the ping loop /// Perform bootstraping, starting the ping loop
pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) { pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
join!( join!(
self.netapp self.netapp.clone().listen(
.clone() self.rpc_listen_addr,
.listen(self.rpc_listen_addr, None, must_exit.clone()), self.rpc_public_addr,
must_exit.clone()
),
self.peering.clone().run(must_exit.clone()), self.peering.clone().run(must_exit.clone()),
self.discovery_loop(must_exit.clone()), self.discovery_loop(must_exit.clone()),
self.status_exchange_loop(must_exit.clone()), self.status_exchange_loop(must_exit.clone()),
); );
} }
pub fn cleanup(&self) {
// Break reference cycle
self.metrics.store(None);
}
// ---- Public utilities / accessors ---- // ---- Public utilities / accessors ----
pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHelper> { pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHelper> {
@ -511,12 +520,9 @@ impl System {
} }
}; };
let hostname = self.local_status.read().unwrap().hostname.clone();
if let Err(e) = c if let Err(e) = c
.publish_consul_service( .publish_consul_service(self.netapp.id, &hostname, rpc_public_addr)
self.netapp.id,
&self.local_status.load_full().hostname.as_deref().unwrap(),
rpc_public_addr,
)
.await .await
{ {
error!("Error while publishing Consul service: {}", e); error!("Error while publishing Consul service: {}", e);
@ -538,26 +544,17 @@ impl System {
} }
}; };
if let Err(e) = publish_kubernetes_node( let hostname = self.local_status.read().unwrap().hostname.clone();
k, if let Err(e) = publish_kubernetes_node(k, self.netapp.id, &hostname, rpc_public_addr).await
self.netapp.id,
&self.local_status.load_full().hostname.as_deref().unwrap(),
rpc_public_addr,
)
.await
{ {
error!("Error while publishing node to Kubernetes: {}", e); error!("Error while publishing node to Kubernetes: {}", e);
} }
} }
fn update_local_status(&self) { fn update_local_status(&self) {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); let mut local_status = self.local_status.write().unwrap();
local_status.layout_digest = self.layout_manager.layout().digest();
new_si.layout_digest = self.layout_manager.layout().digest(); local_status.update_disk_usage(&self.metadata_dir, &self.data_dir);
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
self.local_status.swap(Arc::new(new_si));
} }
// --- RPC HANDLERS --- // --- RPC HANDLERS ---
@ -577,7 +574,7 @@ impl System {
from: Uuid, from: Uuid,
info: &NodeStatus, info: &NodeStatus,
) -> Result<SystemRpc, Error> { ) -> Result<SystemRpc, Error> {
let local_info = self.local_status.load(); let local_info = self.local_status.read().unwrap();
if local_info.replication_factor < info.replication_factor { if local_info.replication_factor < info.replication_factor {
error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and will lead to data corruption. Shutting down for safety.", error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and will lead to data corruption. Shutting down for safety.",
@ -589,6 +586,8 @@ impl System {
self.layout_manager self.layout_manager
.handle_advertise_status(from, &info.layout_digest); .handle_advertise_status(from, &info.layout_digest);
drop(local_info);
self.node_status self.node_status
.write() .write()
.unwrap() .unwrap()
@ -601,8 +600,10 @@ impl System {
while !*stop_signal.borrow() { while !*stop_signal.borrow() {
let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL; let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL;
// Update local node status that is exchanged.
self.update_local_status(); self.update_local_status();
let local_status: NodeStatus = self.local_status.load().as_ref().clone();
let local_status: NodeStatus = self.local_status.read().unwrap().clone();
let _ = self let _ = self
.rpc_helper() .rpc_helper()
.broadcast( .broadcast(
@ -622,15 +623,17 @@ impl System {
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() { while !*stop_signal.borrow() {
let not_configured = self.cluster_layout().check().is_err(); let n_connected = self
let no_peers = self.peering.get_peer_list().len() < self.replication_factor;
let expected_n_nodes = self.cluster_layout().all_nodes().len();
let bad_peers = self
.peering .peering
.get_peer_list() .get_peer_list()
.iter() .iter()
.filter(|p| p.is_up()) .filter(|p| p.is_up())
.count() != expected_n_nodes; .count();
let not_configured = self.cluster_layout().check().is_err();
let no_peers = n_connected < self.replication_factor;
let expected_n_nodes = self.cluster_layout().all_nodes().len();
let bad_peers = n_connected != expected_n_nodes;
if not_configured || no_peers || bad_peers { if not_configured || no_peers || bad_peers {
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
@ -677,6 +680,14 @@ impl System {
} }
} }
if !not_configured && !no_peers {
// If the layout is configured, and we already have some connections
// to other nodes in the cluster, we can skip trying to connect to
// nodes that are not in the cluster layout.
let layout = self.cluster_layout();
ping_list.retain(|(id, _)| layout.all_nodes().contains(&(*id).into()));
}
for (node_id, node_addr) in ping_list { for (node_id, node_addr) in ping_list {
let self2 = self.clone(); let self2 = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -787,12 +798,7 @@ impl NodeStatus {
} }
} }
fn update_disk_usage( fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &DataDirEnum) {
&mut self,
meta_dir: &Path,
data_dir: &DataDirEnum,
metrics: &SystemMetrics,
) {
use nix::sys::statvfs::statvfs; use nix::sys::statvfs::statvfs;
let mount_avail = |path: &Path| match statvfs(path) { let mount_avail = |path: &Path| match statvfs(path) {
Ok(x) => { Ok(x) => {
@ -828,27 +834,6 @@ impl NodeStatus {
) )
})(), })(),
}; };
if let Some((avail, total)) = self.meta_disk_avail {
metrics
.values
.meta_disk_avail
.store(avail, Ordering::Relaxed);
metrics
.values
.meta_disk_total
.store(total, Ordering::Relaxed);
}
if let Some((avail, total)) = self.data_disk_avail {
metrics
.values
.data_disk_avail
.store(avail, Ordering::Relaxed);
metrics
.values
.data_disk_total
.store(total, Ordering::Relaxed);
}
} }
} }

View file

@ -1,32 +1,57 @@
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, RwLock};
use std::sync::Arc; use std::time::{Duration, Instant};
use opentelemetry::{global, metrics::*, KeyValue}; use opentelemetry::{global, metrics::*, KeyValue};
use crate::system::{ClusterHealthStatus, System};
/// TableMetrics reference all counter used for metrics /// TableMetrics reference all counter used for metrics
pub struct SystemMetrics { pub struct SystemMetrics {
// Static values
pub(crate) _garage_build_info: ValueObserver<u64>, pub(crate) _garage_build_info: ValueObserver<u64>,
pub(crate) _replication_factor: ValueObserver<u64>, pub(crate) _replication_factor: ValueObserver<u64>,
// Disk space values from System::local_status
pub(crate) _disk_avail: ValueObserver<u64>, pub(crate) _disk_avail: ValueObserver<u64>,
pub(crate) _disk_total: ValueObserver<u64>, pub(crate) _disk_total: ValueObserver<u64>,
pub(crate) values: Arc<SystemMetricsValues>,
}
#[derive(Default)] // Health report from System::health()
pub struct SystemMetricsValues { pub(crate) _cluster_healthy: ValueObserver<u64>,
pub(crate) data_disk_total: AtomicU64, pub(crate) _cluster_available: ValueObserver<u64>,
pub(crate) data_disk_avail: AtomicU64, pub(crate) _known_nodes: ValueObserver<u64>,
pub(crate) meta_disk_total: AtomicU64, pub(crate) _connected_nodes: ValueObserver<u64>,
pub(crate) meta_disk_avail: AtomicU64, pub(crate) _storage_nodes: ValueObserver<u64>,
pub(crate) _storage_nodes_ok: ValueObserver<u64>,
pub(crate) _partitions: ValueObserver<u64>,
pub(crate) _partitions_quorum: ValueObserver<u64>,
pub(crate) _partitions_all_ok: ValueObserver<u64>,
// Status report for individual cluster nodes
pub(crate) _layout_node_connected: ValueObserver<u64>,
pub(crate) _layout_node_disconnected_time: ValueObserver<u64>,
} }
impl SystemMetrics { impl SystemMetrics {
pub fn new(replication_factor: usize) -> Self { pub fn new(system: Arc<System>) -> Self {
let meter = global::meter("garage_system"); let meter = global::meter("garage_system");
let values = Arc::new(SystemMetricsValues::default());
let values1 = values.clone(); let health_cache = RwLock::new((Instant::now(), system.health()));
let values2 = values.clone(); let system2 = system.clone();
let get_health = Arc::new(move || {
{
let cache = health_cache.read().unwrap();
if cache.0 > Instant::now() - Duration::from_secs(1) {
return cache.1;
}
}
let health = system2.health();
*health_cache.write().unwrap() = (Instant::now(), health);
health
});
Self { Self {
// Static values
_garage_build_info: meter _garage_build_info: meter
.u64_value_observer("garage_build_info", move |observer| { .u64_value_observer("garage_build_info", move |observer| {
observer.observe( observer.observe(
@ -39,39 +64,239 @@ impl SystemMetrics {
}) })
.with_description("Garage build info") .with_description("Garage build info")
.init(), .init(),
_replication_factor: meter _replication_factor: {
.u64_value_observer("garage_replication_factor", move |observer| { let replication_factor = system.replication_factor;
observer.observe(replication_factor as u64, &[]) meter
.u64_value_observer("garage_replication_factor", move |observer| {
observer.observe(replication_factor as u64, &[])
})
.with_description("Garage replication factor setting")
.init()
},
// Disk space values from System::local_status
_disk_avail: {
let system = system.clone();
meter
.u64_value_observer("garage_local_disk_avail", move |observer| {
let st = system.local_status.read().unwrap();
if let Some((avail, _total)) = st.data_disk_avail {
observer.observe(avail, &[KeyValue::new("volume", "data")]);
}
if let Some((avail, _total)) = st.meta_disk_avail {
observer.observe(avail, &[KeyValue::new("volume", "metadata")]);
}
})
.with_description("Garage available disk space on each node")
.init()
},
_disk_total: {
let system = system.clone();
meter
.u64_value_observer("garage_local_disk_total", move |observer| {
let st = system.local_status.read().unwrap();
if let Some((_avail, total)) = st.data_disk_avail {
observer.observe(total, &[KeyValue::new("volume", "data")]);
}
if let Some((_avail, total)) = st.meta_disk_avail {
observer.observe(total, &[KeyValue::new("volume", "metadata")]);
}
})
.with_description("Garage total disk space on each node")
.init()
},
// Health report from System::()
_cluster_healthy: {
let get_health = get_health.clone();
meter
.u64_value_observer("cluster_healthy", move |observer| {
let h = get_health();
if h.status == ClusterHealthStatus::Healthy {
observer.observe(1, &[]);
} else {
observer.observe(0, &[]);
}
})
.with_description("Whether all storage nodes are connected")
.init()
},
_cluster_available: {
let get_health = get_health.clone();
meter.u64_value_observer("cluster_available", move |observer| {
let h = get_health();
if h.status != ClusterHealthStatus::Unavailable {
observer.observe(1, &[]);
} else {
observer.observe(0, &[]);
}
}) })
.with_description("Garage replication factor setting") .with_description("Whether all requests can be served, even if some storage nodes are disconnected")
.init(), .init()
_disk_avail: meter },
.u64_value_observer("garage_local_disk_avail", move |observer| { _known_nodes: {
match values1.data_disk_avail.load(Ordering::Relaxed) { let get_health = get_health.clone();
0 => (), meter
x => observer.observe(x, &[KeyValue::new("volume", "data")]), .u64_value_observer("cluster_known_nodes", move |observer| {
}; let h = get_health();
match values1.meta_disk_avail.load(Ordering::Relaxed) { observer.observe(h.known_nodes as u64, &[]);
0 => (), })
x => observer.observe(x, &[KeyValue::new("volume", "metadata")]), .with_description("Number of nodes already seen once in the cluster")
}; .init()
}) },
.with_description("Garage available disk space on each node") _connected_nodes: {
.init(), let get_health = get_health.clone();
_disk_total: meter meter
.u64_value_observer("garage_local_disk_total", move |observer| { .u64_value_observer("cluster_connected_nodes", move |observer| {
match values2.data_disk_total.load(Ordering::Relaxed) { let h = get_health();
0 => (), observer.observe(h.connected_nodes as u64, &[]);
x => observer.observe(x, &[KeyValue::new("volume", "data")]), })
}; .with_description("Number of nodes currently connected")
match values2.meta_disk_total.load(Ordering::Relaxed) { .init()
0 => (), },
x => observer.observe(x, &[KeyValue::new("volume", "metadata")]), _storage_nodes: {
}; let get_health = get_health.clone();
}) meter
.with_description("Garage total disk space on each node") .u64_value_observer("cluster_storage_nodes", move |observer| {
.init(), let h = get_health();
values, observer.observe(h.storage_nodes as u64, &[]);
})
.with_description("Number of storage nodes declared in the current layout")
.init()
},
_storage_nodes_ok: {
let get_health = get_health.clone();
meter
.u64_value_observer("cluster_storage_nodes_ok", move |observer| {
let h = get_health();
observer.observe(h.storage_nodes_ok as u64, &[]);
})
.with_description("Number of storage nodes currently connected")
.init()
},
_partitions: {
let get_health = get_health.clone();
meter
.u64_value_observer("cluster_partitions", move |observer| {
let h = get_health();
observer.observe(h.partitions as u64, &[]);
})
.with_description("Number of partitions in the layout")
.init()
},
_partitions_quorum: {
let get_health = get_health.clone();
meter
.u64_value_observer("cluster_partitions_quorum", move |observer| {
let h = get_health();
observer.observe(h.partitions_quorum as u64, &[]);
})
.with_description(
"Number of partitions for which we have a quorum of connected nodes",
)
.init()
},
_partitions_all_ok: {
let get_health = get_health.clone();
meter
.u64_value_observer("cluster_partitions_all_ok", move |observer| {
let h = get_health();
observer.observe(h.partitions_all_ok as u64, &[]);
})
.with_description(
"Number of partitions for which all storage nodes are connected",
)
.init()
},
// Status report for individual cluster nodes
_layout_node_connected: {
let system = system.clone();
meter
.u64_value_observer("cluster_layout_node_connected", move |observer| {
let layout = system.cluster_layout();
let nodes = system.get_known_nodes();
for (id, _, config) in layout.current().roles.items().iter() {
if let Some(role) = &config.0 {
let mut kv = vec![
KeyValue::new("id", format!("{:?}", id)),
KeyValue::new("role_zone", role.zone.clone()),
];
match role.capacity {
Some(cap) => {
kv.push(KeyValue::new("role_capacity", cap as i64));
kv.push(KeyValue::new("role_gateway", 0));
}
None => {
kv.push(KeyValue::new("role_gateway", 1));
}
}
let value;
if let Some(node) = nodes.iter().find(|n| n.id == *id) {
value = if node.is_up { 1 } else { 0 };
// TODO: if we add address and hostname, and those change, we
// get duplicate metrics, due to bad otel aggregation :(
// Can probably be fixed when we upgrade opentelemetry
// kv.push(KeyValue::new("address", node.addr.to_string()));
// kv.push(KeyValue::new(
// "hostname",
// node.status.hostname.clone(),
// ));
} else {
value = 0;
}
observer.observe(value, &kv);
}
}
})
.with_description("Connection status for nodes in the cluster layout")
.init()
},
_layout_node_disconnected_time: {
let system = system.clone();
meter
.u64_value_observer("cluster_layout_node_disconnected_time", move |observer| {
let layout = system.cluster_layout();
let nodes = system.get_known_nodes();
for (id, _, config) in layout.current().roles.items().iter() {
if let Some(role) = &config.0 {
let mut kv = vec![
KeyValue::new("id", format!("{:?}", id)),
KeyValue::new("role_zone", role.zone.clone()),
];
match role.capacity {
Some(cap) => {
kv.push(KeyValue::new("role_capacity", cap as i64));
kv.push(KeyValue::new("role_gateway", 0));
}
None => {
kv.push(KeyValue::new("role_gateway", 1));
}
}
if let Some(node) = nodes.iter().find(|n| n.id == *id) {
// TODO: see comment above
// kv.push(KeyValue::new("address", node.addr.to_string()));
// kv.push(KeyValue::new(
// "hostname",
// node.status.hostname.clone(),
// ));
if node.is_up {
observer.observe(0, &kv);
} else if let Some(secs) = node.last_seen_secs_ago {
observer.observe(secs, &kv);
}
}
}
}
})
.with_description(
"Time (in seconds) since last connection to nodes in the cluster layout",
)
.init()
},
} }
} }
} }

View file

@ -55,6 +55,9 @@ pub struct Config {
pub rpc_secret_file: Option<PathBuf>, pub rpc_secret_file: Option<PathBuf>,
/// Address to bind for RPC /// Address to bind for RPC
pub rpc_bind_addr: SocketAddr, pub rpc_bind_addr: SocketAddr,
/// Bind outgoing sockets to rpc_bind_addr's IP address as well
#[serde(default)]
pub rpc_bind_outgoing: bool,
/// Public IP address of this node /// Public IP address of this node
pub rpc_public_addr: Option<String>, pub rpc_public_addr: Option<String>,