everywhere: use netapp with unix socket support

This commit is contained in:
networkException 2023-11-05 22:27:21 +01:00
parent e6df7089a1
commit ef508f4486
Signed by untrusted user: networkException
GPG key ID: E3877443AE684391
19 changed files with 573 additions and 224 deletions

537
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -39,6 +39,7 @@ futures-util = "0.3"
pin-project = "1.0.12"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-stream = "0.1"
tokio-unix-tcp = "0.2.0"
form_urlencoded = "1.0.0"
http = "0.2"

View file

@ -7,6 +7,8 @@ use futures::future::Future;
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
use hyper::{Body, Request, Response, StatusCode};
use tokio_unix_tcp::NamedSocketAddr;
use opentelemetry::trace::SpanRef;
#[cfg(feature = "metrics")]
@ -17,7 +19,6 @@ use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage;
use garage_rpc::system::ClusterHealthStatus;
use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress;
use crate::generic_server::*;
@ -62,7 +63,7 @@ impl AdminApiServer {
pub async fn run(
self,
bind_addr: UnixOrTCPSocketAddress,
bind_addr: NamedSocketAddr,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
let region = self.garage.config.s3_api.s3_region.clone();

View file

@ -1,9 +1,11 @@
use std::net::SocketAddr;
use std::collections::HashMap;
use std::sync::Arc;
use hyper::{Body, Request, Response};
use serde::{Deserialize, Serialize};
use tokio_unix_tcp::SocketAddr;
use garage_util::crdt::*;
use garage_util::data::*;

View file

@ -16,6 +16,8 @@ use hyperlocal::UnixServerExt;
use tokio::net::UnixStream;
use tokio_unix_tcp::NamedSocketAddr;
use opentelemetry::{
global,
metrics::{Counter, ValueRecorder},
@ -26,7 +28,6 @@ use opentelemetry::{
use garage_util::error::Error as GarageError;
use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration};
use garage_util::socket_address::UnixOrTCPSocketAddress;
pub(crate) trait ApiEndpoint: Send + Sync + 'static {
fn name(&self) -> &'static str;
@ -97,7 +98,7 @@ impl<A: ApiHandler> ApiServer<A> {
pub async fn run_server(
self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress,
bind_addr: NamedSocketAddr,
unix_bind_addr_mode: Option<u32>,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
@ -134,13 +135,13 @@ impl<A: ApiHandler> ApiServer<A> {
);
match bind_addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => {
NamedSocketAddr::Inet(addr) => {
Server::bind(&addr)
.serve(tcp_service)
.with_graceful_shutdown(shutdown_signal)
.await?
}
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
NamedSocketAddr::Unix(ref path) => {
if path.exists() {
fs::remove_file(path)?
}

View file

@ -5,10 +5,11 @@ use async_trait::async_trait;
use futures::future::Future;
use hyper::{Body, Method, Request, Response};
use tokio_unix_tcp::NamedSocketAddr;
use opentelemetry::{trace::SpanRef, KeyValue};
use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress;
use garage_model::garage::Garage;
@ -37,7 +38,7 @@ pub(crate) struct K2VApiEndpoint {
impl K2VApiServer {
pub async fn run(
garage: Arc<Garage>,
bind_addr: UnixOrTCPSocketAddress,
bind_addr: NamedSocketAddr,
s3_region: String,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {

View file

@ -6,10 +6,11 @@ use futures::future::Future;
use hyper::header;
use hyper::{Body, Request, Response};
use tokio_unix_tcp::NamedSocketAddr;
use opentelemetry::{trace::SpanRef, KeyValue};
use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
@ -46,7 +47,7 @@ pub(crate) struct S3ApiEndpoint {
impl S3ApiServer {
pub async fn run(
garage: Arc<Garage>,
addr: UnixOrTCPSocketAddress,
addr: NamedSocketAddr,
s3_region: String,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {

View file

@ -52,8 +52,9 @@ toml = "0.6"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-unix-tcp = "0.2.0"
netapp = "0.10"
netapp = { git = "https://git.deuxfleurs.fr/networkException/netapp.git", rev = "184ad39eeaf5d8615b8800298d8c7e564bb68879" }
opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
opentelemetry-prometheus = { version = "0.10", optional = true }

View file

@ -2,6 +2,8 @@ use std::path::PathBuf;
use garage_util::error::*;
use tokio_unix_tcp::NamedSocketAddr;
pub const READ_KEY_ERROR: &str = "Unable to read node key. It will be generated by your garage node the first time is it launched. Ensure that your garage node is currently running. (The node key is supposed to be stored in your metadata directory.)";
pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> {
@ -21,12 +23,17 @@ pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> {
let idstr = hex::encode(node_id);
println!("{}", idstr);
match config.rpc_bind_addr {
NamedSocketAddr::Inet(addr) => {
if !quiet {
warn!("WARNING: I don't know the public address to reach this node.");
warn!("In all of the instructions below, replace 127.0.0.1:{} by the appropriate address and port.", config.rpc_bind_addr.port());
warn!("In all of the instructions below, replace 127.0.0.1:{} by the appropriate address and port.", addr.port());
}
format!("{}@127.0.0.1:{}", idstr, config.rpc_bind_addr.port())
format!("{}@127.0.0.1:{}", idstr, addr.port())
}
NamedSocketAddr::Unix(path) => format!("{}@{}", idstr, path.to_string_lossy()),
}
};
if !quiet {

View file

@ -21,6 +21,7 @@ compile_error!("Only one of bundled-libs and system-libs Cargo features must be
compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite.");
use std::net::SocketAddr;
use std::net;
use std::path::PathBuf;
use structopt::StructOpt;
@ -36,6 +37,8 @@ use garage_rpc::*;
use garage_model::helper::error::Error as HelperError;
use tokio_unix_tcp::NamedSocketAddr;
use admin::*;
use cli::*;
@ -218,8 +221,8 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Find and parse the address of the target host
let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host {
let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port>.", h))?;
(id, addrs[0], false)
let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port> or <pubkey>@<path> for unix domain sockets.", h))?;
(id, addrs[0].clone(), false)
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
.err_context(READ_KEY_ERROR)?;
@ -230,18 +233,24 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
.ok_or_message("unable to resolve rpc_public_addr specified in config file")?
.next()
.ok_or_message("unable to resolve rpc_public_addr specified in config file")?;
(node_id, a, false)
(node_id, NamedSocketAddr::Inet(a), false)
} else {
let default_addr = SocketAddr::new(
"127.0.0.1".parse().unwrap(),
config.as_ref().unwrap().rpc_bind_addr.port(),
);
let default_addr =
config
.as_ref()
.unwrap()
.rpc_bind_addr
.clone()
.map_inet(|socket_addr| {
net::SocketAddr::new("127.0.0.1".parse().unwrap(), socket_addr.port())
});
(node_id, default_addr, true)
}
};
// Connect to target host
if let Err(e) = netapp.clone().try_connect(addr, id).await {
if let Err(e) = netapp.clone().try_connect(addr.clone().into(), id).await {
if is_default_addr {
warn!(
"Tried to contact Garage node at default address {}, which didn't work. If that address is wrong, consider setting rpc_public_addr in your config file.",

View file

@ -39,7 +39,7 @@ futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
opentelemetry = "0.17"
netapp = "0.10"
netapp = { git = "https://git.deuxfleurs.fr/networkException/netapp.git", rev = "184ad39eeaf5d8615b8800298d8c7e564bb68879" }
[features]
default = [ "sled", "lmdb", "sqlite" ]

View file

@ -47,9 +47,10 @@ futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-unix-tcp = "0.2.0"
opentelemetry = "0.17"
netapp = { version = "0.10", features = ["telemetry"] }
netapp = { git = "https://git.deuxfleurs.fr/networkException/netapp.git", rev = "184ad39eeaf5d8615b8800298d8c7e564bb68879", features = ["telemetry"] }
[features]
kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ]

View file

@ -1,8 +1,9 @@
//! Module containing structs related to membership management
use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::net::{self, IpAddr};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@ -16,6 +17,8 @@ use tokio::select;
use tokio::sync::watch;
use tokio::sync::Mutex;
use tokio_unix_tcp::{NamedSocketAddr, SocketAddr};
use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::message::*;
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
@ -77,7 +80,7 @@ impl Rpc for SystemRpc {
}
#[derive(Serialize, Deserialize)]
pub struct PeerList(Vec<(Uuid, SocketAddr)>);
pub struct PeerList(Vec<(Uuid, NamedSocketAddr)>);
impl garage_util::migrate::InitialFormat for PeerList {}
/// This node's membership manager
@ -97,9 +100,9 @@ pub struct System {
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
rpc_listen_addr: NamedSocketAddr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr: Option<SocketAddr>,
rpc_public_addr: Option<SocketBindAddr>,
bootstrap_peers: Vec<String>,
#[cfg(feature = "consul-discovery")]
@ -289,10 +292,13 @@ impl System {
let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
let rpc_public_addr = match &config.rpc_public_addr {
let rpc_public_addr: Option<NamedSocketAddr> = match &config.rpc_public_addr {
Some(a_str) => {
// FIXME: Remove this unwrap
match NamedSocketAddr::from_str(a_str).unwrap() {
NamedSocketAddr::Inet(address) => {
use std::net::ToSocketAddrs;
match a_str.to_socket_addrs() {
match address.to_socket_addrs() {
Err(e) => {
error!(
"Cannot resolve rpc_public_addr {} from config file: {}.",
@ -303,19 +309,30 @@ impl System {
Ok(a) => {
let a = a.collect::<Vec<_>>();
if a.is_empty() {
error!("rpc_public_addr {} resolve to no known IP address", a_str);
error!(
"rpc_public_addr {} resolve to no known IP address",
a_str
);
}
if a.len() > 1 {
warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
}
a.into_iter().next()
a.into_iter().next().map(NamedSocketAddr::Inet)
}
}
}
NamedSocketAddr::Unix(path) => Some(path.into()),
}
}
None => {
let addr =
get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
if let Some(a) = addr {
let addr: Option<NamedSocketAddr> = match &config.rpc_bind_addr {
NamedSocketAddr::Inet(address) => {
get_default_ip().map(|ip| net::SocketAddr::new(ip, address.port()).into())
}
NamedSocketAddr::Unix(path) => Some(path.to_owned().into()),
};
if let Some(a) = &addr {
warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
}
addr
@ -368,7 +385,7 @@ impl System {
system_endpoint,
replication_mode,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
rpc_listen_addr: config.rpc_bind_addr.clone().into(),
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(),
@ -392,7 +409,7 @@ impl System {
join!(
self.netapp
.clone()
.listen(self.rpc_listen_addr, None, must_exit.clone()),
.listen(self.rpc_listen_addr.clone(), None, must_exit.clone()),
self.fullmesh.clone().run(must_exit.clone()),
self.discovery_loop(must_exit.clone()),
self.status_exchange_loop(must_exit.clone()),
@ -410,7 +427,7 @@ impl System {
.iter()
.map(|n| KnownNodeInfo {
id: n.id.into(),
addr: n.addr,
addr: n.addr.clone().to_socket_addr(),
is_up: n.is_up(),
last_seen_secs_ago: n
.last_seen
@ -448,12 +465,17 @@ impl System {
})?;
let mut errors = vec![];
for addr in addrs.iter() {
match self.netapp.clone().try_connect(*addr, pubkey).await {
match self
.netapp
.clone()
.try_connect(addr.clone().into(), pubkey)
.await
{
Ok(()) => return Ok(()),
Err(e) => {
errors.push((
*addr,
Error::Message(connect_error_message(*addr, pubkey, e)),
addr.clone(),
Error::Message(connect_error_message(addr.clone().into(), pubkey, e)),
));
}
}
@ -742,7 +764,12 @@ impl System {
// Add peer list from list stored on disk
if let Ok(peers) = self.persist_peer_list.load_async().await {
ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)))
ping_list.extend(
peers
.0
.iter()
.map(|(id, addr)| ((*id).into(), addr.clone())),
)
}
// Fetch peer list from Consul
@ -783,8 +810,13 @@ impl System {
for (node_id, node_addr) in ping_list {
let self2 = self.clone();
tokio::spawn(async move {
if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await {
error!("{}", connect_error_message(node_addr, node_id, e));
if let Err(e) = self2
.netapp
.clone()
.try_connect(node_addr.clone(), node_id)
.await
{
error!("{}", connect_error_message(node_addr.into(), node_id, e));
}
});
}
@ -814,7 +846,7 @@ impl System {
.fullmesh
.get_peer_list()
.iter()
.map(|n| (n.id.into(), n.addr))
.map(|n| (n.id.into(), n.addr.clone()))
.collect::<Vec<_>>();
// Before doing it, we read the current peer list file (if it exists)
@ -963,7 +995,7 @@ fn get_default_ip() -> Option<IpAddr> {
.map(|a| a.ip())
}
async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, NamedSocketAddr)> {
let mut ret = vec![];
for peer in peers.iter() {

View file

@ -39,8 +39,9 @@ toml = "0.6"
futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-unix-tcp = { version = "0.2.0", features = ["serde"] }
netapp = "0.10"
netapp = { git = "https://git.deuxfleurs.fr/networkException/netapp.git", rev = "184ad39eeaf5d8615b8800298d8c7e564bb68879" }
http = "0.2"
hyper = "0.14"

View file

@ -1,13 +1,13 @@
//! Contains type and functions related to Garage configuration file
use std::convert::TryFrom;
use std::io::Read;
use std::net::SocketAddr;
use std::path::PathBuf;
use tokio_unix_tcp::NamedSocketAddr;
use serde::{de, Deserialize};
use crate::error::Error;
use crate::socket_address::UnixOrTCPSocketAddress;
/// Represent the whole configuration
#[derive(Deserialize, Debug, Clone)]
@ -51,7 +51,8 @@ pub struct Config {
pub rpc_secret_file: Option<String>,
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr,
#[serde(deserialize_with = "NamedSocketAddr::deserialize_from_str")]
pub rpc_bind_addr: NamedSocketAddr,
/// Public IP address of this node
pub rpc_public_addr: Option<String>,
@ -130,7 +131,8 @@ pub struct DataDir {
#[derive(Deserialize, Debug, Clone)]
pub struct S3ApiConfig {
/// Address and port to bind for api serving
pub api_bind_addr: Option<UnixOrTCPSocketAddress>,
#[serde(deserialize_with = "NamedSocketAddr::deserialize_from_option_str")]
pub api_bind_addr: Option<NamedSocketAddr>,
/// S3 region to use
pub s3_region: String,
/// Suffix to remove from domain name to find bucket. If None,
@ -142,14 +144,16 @@ pub struct S3ApiConfig {
#[derive(Deserialize, Debug, Clone)]
pub struct K2VApiConfig {
/// Address and port to bind for api serving
pub api_bind_addr: UnixOrTCPSocketAddress,
#[serde(deserialize_with = "NamedSocketAddr::deserialize_from_str")]
pub api_bind_addr: NamedSocketAddr,
}
/// Configuration for serving files as normal web server
#[derive(Deserialize, Debug, Clone)]
pub struct WebConfig {
/// Address and port to bind for web serving
pub bind_addr: UnixOrTCPSocketAddress,
#[serde(deserialize_with = "NamedSocketAddr::deserialize_from_str")]
pub bind_addr: NamedSocketAddr,
/// Suffix to remove from domain name to find bucket
pub root_domain: String,
}
@ -158,7 +162,8 @@ pub struct WebConfig {
#[derive(Deserialize, Debug, Clone, Default)]
pub struct AdminConfig {
/// Address and port to bind for admin API serving
pub api_bind_addr: Option<UnixOrTCPSocketAddress>,
#[serde(deserialize_with = "NamedSocketAddr::deserialize_from_option_str")]
pub api_bind_addr: Option<NamedSocketAddr>,
/// Bearer token to use to scrape metrics
pub metrics_token: Option<String>,

View file

@ -14,7 +14,6 @@ pub mod forwarded_headers;
pub mod metrics;
pub mod migrate;
pub mod persister;
pub mod socket_address;
pub mod time;
pub mod tranquilizer;
pub mod version;

View file

@ -1,44 +0,0 @@
use std::fmt::{Debug, Display, Formatter};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use serde::de::Error;
use serde::{Deserialize, Deserializer};
#[derive(Debug, Clone)]
pub enum UnixOrTCPSocketAddress {
TCPSocket(SocketAddr),
UnixSocket(PathBuf),
}
impl Display for UnixOrTCPSocketAddress {
fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
match self {
UnixOrTCPSocketAddress::TCPSocket(address) => write!(formatter, "http://{}", address),
UnixOrTCPSocketAddress::UnixSocket(path) => {
write!(formatter, "http+unix://{}", path.to_string_lossy())
}
}
}
}
impl<'de> Deserialize<'de> for UnixOrTCPSocketAddress {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let string = String::deserialize(deserializer)?;
let string = string.as_str();
if string.starts_with("/") {
Ok(UnixOrTCPSocketAddress::UnixSocket(
PathBuf::from_str(string).map_err(Error::custom)?,
))
} else {
Ok(UnixOrTCPSocketAddress::TCPSocket(
SocketAddr::from_str(string).map_err(Error::custom)?,
))
}
}
}

View file

@ -30,5 +30,6 @@ hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "st
hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] }
tokio = { version = "1.0", default-features = false, features = ["net"] }
tokio-unix-tcp = "0.2.0"
opentelemetry = "0.17"

View file

@ -13,6 +13,8 @@ use hyper::{
use hyperlocal::UnixServerExt;
use tokio_unix_tcp::NamedSocketAddr;
use tokio::net::UnixStream;
use opentelemetry::{
@ -38,7 +40,6 @@ use garage_util::data::Uuid;
use garage_util::error::Error as GarageError;
use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration};
use garage_util::socket_address::UnixOrTCPSocketAddress;
struct WebMetrics {
request_counter: Counter<u64>,
@ -76,7 +77,7 @@ impl WebServer {
/// Run a web server
pub async fn run(
garage: Arc<Garage>,
addr: UnixOrTCPSocketAddress,
addr: NamedSocketAddr,
root_domain: String,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
@ -116,13 +117,13 @@ impl WebServer {
info!("Web server listening on {}", addr);
match addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => {
NamedSocketAddr::Inet(addr) => {
Server::bind(&addr)
.serve(tcp_service)
.with_graceful_shutdown(shutdown_signal)
.await?
}
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
NamedSocketAddr::Unix(ref path) => {
if path.exists() {
fs::remove_file(path)?
}