Compare commits

...

1 commit

Author SHA1 Message Date
ef508f4486
everywhere: use netapp with unix socket support 2023-11-06 00:18:00 +01:00
19 changed files with 573 additions and 224 deletions

537
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -39,6 +39,7 @@ futures-util = "0.3"
pin-project = "1.0.12" pin-project = "1.0.12"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-stream = "0.1" tokio-stream = "0.1"
tokio-unix-tcp = "0.2.0"
form_urlencoded = "1.0.0" form_urlencoded = "1.0.0"
http = "0.2" http = "0.2"

View file

@ -7,6 +7,8 @@ use futures::future::Future;
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
use tokio_unix_tcp::NamedSocketAddr;
use opentelemetry::trace::SpanRef; use opentelemetry::trace::SpanRef;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
@ -17,7 +19,6 @@ use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_rpc::system::ClusterHealthStatus; use garage_rpc::system::ClusterHealthStatus;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress;
use crate::generic_server::*; use crate::generic_server::*;
@ -62,7 +63,7 @@ impl AdminApiServer {
pub async fn run( pub async fn run(
self, self,
bind_addr: UnixOrTCPSocketAddress, bind_addr: NamedSocketAddr,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {
let region = self.garage.config.s3_api.s3_region.clone(); let region = self.garage.config.s3_api.s3_region.clone();

View file

@ -1,9 +1,11 @@
use std::net::SocketAddr; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use hyper::{Body, Request, Response}; use hyper::{Body, Request, Response};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio_unix_tcp::SocketAddr;
use garage_util::crdt::*; use garage_util::crdt::*;
use garage_util::data::*; use garage_util::data::*;

View file

@ -16,6 +16,8 @@ use hyperlocal::UnixServerExt;
use tokio::net::UnixStream; use tokio::net::UnixStream;
use tokio_unix_tcp::NamedSocketAddr;
use opentelemetry::{ use opentelemetry::{
global, global,
metrics::{Counter, ValueRecorder}, metrics::{Counter, ValueRecorder},
@ -26,7 +28,6 @@ use opentelemetry::{
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_util::forwarded_headers; use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration}; use garage_util::metrics::{gen_trace_id, RecordDuration};
use garage_util::socket_address::UnixOrTCPSocketAddress;
pub(crate) trait ApiEndpoint: Send + Sync + 'static { pub(crate) trait ApiEndpoint: Send + Sync + 'static {
fn name(&self) -> &'static str; fn name(&self) -> &'static str;
@ -97,7 +98,7 @@ impl<A: ApiHandler> ApiServer<A> {
pub async fn run_server( pub async fn run_server(
self: Arc<Self>, self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress, bind_addr: NamedSocketAddr,
unix_bind_addr_mode: Option<u32>, unix_bind_addr_mode: Option<u32>,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {
@ -134,13 +135,13 @@ impl<A: ApiHandler> ApiServer<A> {
); );
match bind_addr { match bind_addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => { NamedSocketAddr::Inet(addr) => {
Server::bind(&addr) Server::bind(&addr)
.serve(tcp_service) .serve(tcp_service)
.with_graceful_shutdown(shutdown_signal) .with_graceful_shutdown(shutdown_signal)
.await? .await?
} }
UnixOrTCPSocketAddress::UnixSocket(ref path) => { NamedSocketAddr::Unix(ref path) => {
if path.exists() { if path.exists() {
fs::remove_file(path)? fs::remove_file(path)?
} }

View file

@ -5,10 +5,11 @@ use async_trait::async_trait;
use futures::future::Future; use futures::future::Future;
use hyper::{Body, Method, Request, Response}; use hyper::{Body, Method, Request, Response};
use tokio_unix_tcp::NamedSocketAddr;
use opentelemetry::{trace::SpanRef, KeyValue}; use opentelemetry::{trace::SpanRef, KeyValue};
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress;
use garage_model::garage::Garage; use garage_model::garage::Garage;
@ -37,7 +38,7 @@ pub(crate) struct K2VApiEndpoint {
impl K2VApiServer { impl K2VApiServer {
pub async fn run( pub async fn run(
garage: Arc<Garage>, garage: Arc<Garage>,
bind_addr: UnixOrTCPSocketAddress, bind_addr: NamedSocketAddr,
s3_region: String, s3_region: String,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {

View file

@ -6,10 +6,11 @@ use futures::future::Future;
use hyper::header; use hyper::header;
use hyper::{Body, Request, Response}; use hyper::{Body, Request, Response};
use tokio_unix_tcp::NamedSocketAddr;
use opentelemetry::{trace::SpanRef, KeyValue}; use opentelemetry::{trace::SpanRef, KeyValue};
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::key_table::Key; use garage_model::key_table::Key;
@ -46,7 +47,7 @@ pub(crate) struct S3ApiEndpoint {
impl S3ApiServer { impl S3ApiServer {
pub async fn run( pub async fn run(
garage: Arc<Garage>, garage: Arc<Garage>,
addr: UnixOrTCPSocketAddress, addr: NamedSocketAddr,
s3_region: String, s3_region: String,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {

View file

@ -52,8 +52,9 @@ toml = "0.6"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-unix-tcp = "0.2.0"
netapp = "0.10" netapp = { git = "https://git.deuxfleurs.fr/networkException/netapp.git", rev = "184ad39eeaf5d8615b8800298d8c7e564bb68879" }
opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
opentelemetry-prometheus = { version = "0.10", optional = true } opentelemetry-prometheus = { version = "0.10", optional = true }

View file

@ -2,6 +2,8 @@ use std::path::PathBuf;
use garage_util::error::*; use garage_util::error::*;
use tokio_unix_tcp::NamedSocketAddr;
pub const READ_KEY_ERROR: &str = "Unable to read node key. It will be generated by your garage node the first time is it launched. Ensure that your garage node is currently running. (The node key is supposed to be stored in your metadata directory.)"; pub const READ_KEY_ERROR: &str = "Unable to read node key. It will be generated by your garage node the first time is it launched. Ensure that your garage node is currently running. (The node key is supposed to be stored in your metadata directory.)";
pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> { pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> {
@ -21,12 +23,17 @@ pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> {
let idstr = hex::encode(node_id); let idstr = hex::encode(node_id);
println!("{}", idstr); println!("{}", idstr);
match config.rpc_bind_addr {
NamedSocketAddr::Inet(addr) => {
if !quiet { if !quiet {
warn!("WARNING: I don't know the public address to reach this node."); warn!("WARNING: I don't know the public address to reach this node.");
warn!("In all of the instructions below, replace 127.0.0.1:{} by the appropriate address and port.", config.rpc_bind_addr.port()); warn!("In all of the instructions below, replace 127.0.0.1:{} by the appropriate address and port.", addr.port());
} }
format!("{}@127.0.0.1:{}", idstr, config.rpc_bind_addr.port()) format!("{}@127.0.0.1:{}", idstr, addr.port())
}
NamedSocketAddr::Unix(path) => format!("{}@{}", idstr, path.to_string_lossy()),
}
}; };
if !quiet { if !quiet {

View file

@ -21,6 +21,7 @@ compile_error!("Only one of bundled-libs and system-libs Cargo features must be
compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite."); compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite.");
use std::net::SocketAddr; use std::net::SocketAddr;
use std::net;
use std::path::PathBuf; use std::path::PathBuf;
use structopt::StructOpt; use structopt::StructOpt;
@ -36,6 +37,8 @@ use garage_rpc::*;
use garage_model::helper::error::Error as HelperError; use garage_model::helper::error::Error as HelperError;
use tokio_unix_tcp::NamedSocketAddr;
use admin::*; use admin::*;
use cli::*; use cli::*;
@ -218,8 +221,8 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Find and parse the address of the target host // Find and parse the address of the target host
let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host { let (id, addr, is_default_addr) = if let Some(h) = opt.rpc_host {
let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port>.", h))?; let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port> or <pubkey>@<path> for unix domain sockets.", h))?;
(id, addrs[0], false) (id, addrs[0].clone(), false)
} else { } else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
.err_context(READ_KEY_ERROR)?; .err_context(READ_KEY_ERROR)?;
@ -230,18 +233,24 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
.ok_or_message("unable to resolve rpc_public_addr specified in config file")? .ok_or_message("unable to resolve rpc_public_addr specified in config file")?
.next() .next()
.ok_or_message("unable to resolve rpc_public_addr specified in config file")?; .ok_or_message("unable to resolve rpc_public_addr specified in config file")?;
(node_id, a, false) (node_id, NamedSocketAddr::Inet(a), false)
} else { } else {
let default_addr = SocketAddr::new( let default_addr =
"127.0.0.1".parse().unwrap(), config
config.as_ref().unwrap().rpc_bind_addr.port(), .as_ref()
); .unwrap()
.rpc_bind_addr
.clone()
.map_inet(|socket_addr| {
net::SocketAddr::new("127.0.0.1".parse().unwrap(), socket_addr.port())
});
(node_id, default_addr, true) (node_id, default_addr, true)
} }
}; };
// Connect to target host // Connect to target host
if let Err(e) = netapp.clone().try_connect(addr, id).await { if let Err(e) = netapp.clone().try_connect(addr.clone().into(), id).await {
if is_default_addr { if is_default_addr {
warn!( warn!(
"Tried to contact Garage node at default address {}, which didn't work. If that address is wrong, consider setting rpc_public_addr in your config file.", "Tried to contact Garage node at default address {}, which didn't work. If that address is wrong, consider setting rpc_public_addr in your config file.",

View file

@ -39,7 +39,7 @@ futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
opentelemetry = "0.17" opentelemetry = "0.17"
netapp = "0.10" netapp = { git = "https://git.deuxfleurs.fr/networkException/netapp.git", rev = "184ad39eeaf5d8615b8800298d8c7e564bb68879" }
[features] [features]
default = [ "sled", "lmdb", "sqlite" ] default = [ "sled", "lmdb", "sqlite" ]

View file

@ -47,9 +47,10 @@ futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-stream = { version = "0.1", features = ["net"] } tokio-stream = { version = "0.1", features = ["net"] }
tokio-unix-tcp = "0.2.0"
opentelemetry = "0.17" opentelemetry = "0.17"
netapp = { version = "0.10", features = ["telemetry"] } netapp = { git = "https://git.deuxfleurs.fr/networkException/netapp.git", rev = "184ad39eeaf5d8615b8800298d8c7e564bb68879", features = ["telemetry"] }
[features] [features]
kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ] kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ]

View file

@ -1,8 +1,9 @@
//! Module containing structs related to membership management //! Module containing structs related to membership management
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr}; use std::net::{self, IpAddr};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -16,6 +17,8 @@ use tokio::select;
use tokio::sync::watch; use tokio::sync::watch;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio_unix_tcp::{NamedSocketAddr, SocketAddr};
use netapp::endpoint::{Endpoint, EndpointHandler}; use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::message::*; use netapp::message::*;
use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::peering::fullmesh::FullMeshPeeringStrategy;
@ -77,7 +80,7 @@ impl Rpc for SystemRpc {
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct PeerList(Vec<(Uuid, SocketAddr)>); pub struct PeerList(Vec<(Uuid, NamedSocketAddr)>);
impl garage_util::migrate::InitialFormat for PeerList {} impl garage_util::migrate::InitialFormat for PeerList {}
/// This node's membership manager /// This node's membership manager
@ -97,9 +100,9 @@ pub struct System {
system_endpoint: Arc<Endpoint<SystemRpc, System>>, system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr, rpc_listen_addr: NamedSocketAddr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr: Option<SocketAddr>, rpc_public_addr: Option<SocketBindAddr>,
bootstrap_peers: Vec<String>, bootstrap_peers: Vec<String>,
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
@ -289,10 +292,13 @@ impl System {
let ring = Ring::new(cluster_layout, replication_factor); let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring)); let (update_ring, ring) = watch::channel(Arc::new(ring));
let rpc_public_addr = match &config.rpc_public_addr { let rpc_public_addr: Option<NamedSocketAddr> = match &config.rpc_public_addr {
Some(a_str) => { Some(a_str) => {
// FIXME: Remove this unwrap
match NamedSocketAddr::from_str(a_str).unwrap() {
NamedSocketAddr::Inet(address) => {
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
match a_str.to_socket_addrs() { match address.to_socket_addrs() {
Err(e) => { Err(e) => {
error!( error!(
"Cannot resolve rpc_public_addr {} from config file: {}.", "Cannot resolve rpc_public_addr {} from config file: {}.",
@ -303,19 +309,30 @@ impl System {
Ok(a) => { Ok(a) => {
let a = a.collect::<Vec<_>>(); let a = a.collect::<Vec<_>>();
if a.is_empty() { if a.is_empty() {
error!("rpc_public_addr {} resolve to no known IP address", a_str); error!(
"rpc_public_addr {} resolve to no known IP address",
a_str
);
} }
if a.len() > 1 { if a.len() > 1 {
warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a); warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
} }
a.into_iter().next() a.into_iter().next().map(NamedSocketAddr::Inet)
} }
} }
} }
NamedSocketAddr::Unix(path) => Some(path.into()),
}
}
None => { None => {
let addr = let addr: Option<NamedSocketAddr> = match &config.rpc_bind_addr {
get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port())); NamedSocketAddr::Inet(address) => {
if let Some(a) = addr { get_default_ip().map(|ip| net::SocketAddr::new(ip, address.port()).into())
}
NamedSocketAddr::Unix(path) => Some(path.to_owned().into()),
};
if let Some(a) = &addr {
warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a); warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
} }
addr addr
@ -368,7 +385,7 @@ impl System {
system_endpoint, system_endpoint,
replication_mode, replication_mode,
replication_factor, replication_factor,
rpc_listen_addr: config.rpc_bind_addr, rpc_listen_addr: config.rpc_bind_addr.clone().into(),
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr, rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(), bootstrap_peers: config.bootstrap_peers.clone(),
@ -392,7 +409,7 @@ impl System {
join!( join!(
self.netapp self.netapp
.clone() .clone()
.listen(self.rpc_listen_addr, None, must_exit.clone()), .listen(self.rpc_listen_addr.clone(), None, must_exit.clone()),
self.fullmesh.clone().run(must_exit.clone()), self.fullmesh.clone().run(must_exit.clone()),
self.discovery_loop(must_exit.clone()), self.discovery_loop(must_exit.clone()),
self.status_exchange_loop(must_exit.clone()), self.status_exchange_loop(must_exit.clone()),
@ -410,7 +427,7 @@ impl System {
.iter() .iter()
.map(|n| KnownNodeInfo { .map(|n| KnownNodeInfo {
id: n.id.into(), id: n.id.into(),
addr: n.addr, addr: n.addr.clone().to_socket_addr(),
is_up: n.is_up(), is_up: n.is_up(),
last_seen_secs_ago: n last_seen_secs_ago: n
.last_seen .last_seen
@ -448,12 +465,17 @@ impl System {
})?; })?;
let mut errors = vec![]; let mut errors = vec![];
for addr in addrs.iter() { for addr in addrs.iter() {
match self.netapp.clone().try_connect(*addr, pubkey).await { match self
.netapp
.clone()
.try_connect(addr.clone().into(), pubkey)
.await
{
Ok(()) => return Ok(()), Ok(()) => return Ok(()),
Err(e) => { Err(e) => {
errors.push(( errors.push((
*addr, addr.clone(),
Error::Message(connect_error_message(*addr, pubkey, e)), Error::Message(connect_error_message(addr.clone().into(), pubkey, e)),
)); ));
} }
} }
@ -742,7 +764,12 @@ impl System {
// Add peer list from list stored on disk // Add peer list from list stored on disk
if let Ok(peers) = self.persist_peer_list.load_async().await { if let Ok(peers) = self.persist_peer_list.load_async().await {
ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr))) ping_list.extend(
peers
.0
.iter()
.map(|(id, addr)| ((*id).into(), addr.clone())),
)
} }
// Fetch peer list from Consul // Fetch peer list from Consul
@ -783,8 +810,13 @@ impl System {
for (node_id, node_addr) in ping_list { for (node_id, node_addr) in ping_list {
let self2 = self.clone(); let self2 = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await { if let Err(e) = self2
error!("{}", connect_error_message(node_addr, node_id, e)); .netapp
.clone()
.try_connect(node_addr.clone(), node_id)
.await
{
error!("{}", connect_error_message(node_addr.into(), node_id, e));
} }
}); });
} }
@ -814,7 +846,7 @@ impl System {
.fullmesh .fullmesh
.get_peer_list() .get_peer_list()
.iter() .iter()
.map(|n| (n.id.into(), n.addr)) .map(|n| (n.id.into(), n.addr.clone()))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// Before doing it, we read the current peer list file (if it exists) // Before doing it, we read the current peer list file (if it exists)
@ -963,7 +995,7 @@ fn get_default_ip() -> Option<IpAddr> {
.map(|a| a.ip()) .map(|a| a.ip())
} }
async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, NamedSocketAddr)> {
let mut ret = vec![]; let mut ret = vec![];
for peer in peers.iter() { for peer in peers.iter() {

View file

@ -39,8 +39,9 @@ toml = "0.6"
futures = "0.3" futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-unix-tcp = { version = "0.2.0", features = ["serde"] }
netapp = "0.10" netapp = { git = "https://git.deuxfleurs.fr/networkException/netapp.git", rev = "184ad39eeaf5d8615b8800298d8c7e564bb68879" }
http = "0.2" http = "0.2"
hyper = "0.14" hyper = "0.14"

View file

@ -1,13 +1,13 @@
//! Contains type and functions related to Garage configuration file //! Contains type and functions related to Garage configuration file
use std::convert::TryFrom; use std::convert::TryFrom;
use std::io::Read; use std::io::Read;
use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use tokio_unix_tcp::NamedSocketAddr;
use serde::{de, Deserialize}; use serde::{de, Deserialize};
use crate::error::Error; use crate::error::Error;
use crate::socket_address::UnixOrTCPSocketAddress;
/// Represent the whole configuration /// Represent the whole configuration
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
@ -51,7 +51,8 @@ pub struct Config {
pub rpc_secret_file: Option<String>, pub rpc_secret_file: Option<String>,
/// Address to bind for RPC /// Address to bind for RPC
pub rpc_bind_addr: SocketAddr, #[serde(deserialize_with = "NamedSocketAddr::deserialize_from_str")]
pub rpc_bind_addr: NamedSocketAddr,
/// Public IP address of this node /// Public IP address of this node
pub rpc_public_addr: Option<String>, pub rpc_public_addr: Option<String>,
@ -130,7 +131,8 @@ pub struct DataDir {
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct S3ApiConfig { pub struct S3ApiConfig {
/// Address and port to bind for api serving /// Address and port to bind for api serving
pub api_bind_addr: Option<UnixOrTCPSocketAddress>, #[serde(deserialize_with = "NamedSocketAddr::deserialize_from_option_str")]
pub api_bind_addr: Option<NamedSocketAddr>,
/// S3 region to use /// S3 region to use
pub s3_region: String, pub s3_region: String,
/// Suffix to remove from domain name to find bucket. If None, /// Suffix to remove from domain name to find bucket. If None,
@ -142,14 +144,16 @@ pub struct S3ApiConfig {
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct K2VApiConfig { pub struct K2VApiConfig {
/// Address and port to bind for api serving /// Address and port to bind for api serving
pub api_bind_addr: UnixOrTCPSocketAddress, #[serde(deserialize_with = "NamedSocketAddr::deserialize_from_str")]
pub api_bind_addr: NamedSocketAddr,
} }
/// Configuration for serving files as normal web server /// Configuration for serving files as normal web server
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct WebConfig { pub struct WebConfig {
/// Address and port to bind for web serving /// Address and port to bind for web serving
pub bind_addr: UnixOrTCPSocketAddress, #[serde(deserialize_with = "NamedSocketAddr::deserialize_from_str")]
pub bind_addr: NamedSocketAddr,
/// Suffix to remove from domain name to find bucket /// Suffix to remove from domain name to find bucket
pub root_domain: String, pub root_domain: String,
} }
@ -158,7 +162,8 @@ pub struct WebConfig {
#[derive(Deserialize, Debug, Clone, Default)] #[derive(Deserialize, Debug, Clone, Default)]
pub struct AdminConfig { pub struct AdminConfig {
/// Address and port to bind for admin API serving /// Address and port to bind for admin API serving
pub api_bind_addr: Option<UnixOrTCPSocketAddress>, #[serde(deserialize_with = "NamedSocketAddr::deserialize_from_option_str")]
pub api_bind_addr: Option<NamedSocketAddr>,
/// Bearer token to use to scrape metrics /// Bearer token to use to scrape metrics
pub metrics_token: Option<String>, pub metrics_token: Option<String>,

View file

@ -14,7 +14,6 @@ pub mod forwarded_headers;
pub mod metrics; pub mod metrics;
pub mod migrate; pub mod migrate;
pub mod persister; pub mod persister;
pub mod socket_address;
pub mod time; pub mod time;
pub mod tranquilizer; pub mod tranquilizer;
pub mod version; pub mod version;

View file

@ -1,44 +0,0 @@
use std::fmt::{Debug, Display, Formatter};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use serde::de::Error;
use serde::{Deserialize, Deserializer};
#[derive(Debug, Clone)]
pub enum UnixOrTCPSocketAddress {
TCPSocket(SocketAddr),
UnixSocket(PathBuf),
}
impl Display for UnixOrTCPSocketAddress {
fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
match self {
UnixOrTCPSocketAddress::TCPSocket(address) => write!(formatter, "http://{}", address),
UnixOrTCPSocketAddress::UnixSocket(path) => {
write!(formatter, "http+unix://{}", path.to_string_lossy())
}
}
}
}
impl<'de> Deserialize<'de> for UnixOrTCPSocketAddress {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let string = String::deserialize(deserializer)?;
let string = string.as_str();
if string.starts_with("/") {
Ok(UnixOrTCPSocketAddress::UnixSocket(
PathBuf::from_str(string).map_err(Error::custom)?,
))
} else {
Ok(UnixOrTCPSocketAddress::TCPSocket(
SocketAddr::from_str(string).map_err(Error::custom)?,
))
}
}
}

View file

@ -30,5 +30,6 @@ hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "st
hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] } hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] }
tokio = { version = "1.0", default-features = false, features = ["net"] } tokio = { version = "1.0", default-features = false, features = ["net"] }
tokio-unix-tcp = "0.2.0"
opentelemetry = "0.17" opentelemetry = "0.17"

View file

@ -13,6 +13,8 @@ use hyper::{
use hyperlocal::UnixServerExt; use hyperlocal::UnixServerExt;
use tokio_unix_tcp::NamedSocketAddr;
use tokio::net::UnixStream; use tokio::net::UnixStream;
use opentelemetry::{ use opentelemetry::{
@ -38,7 +40,6 @@ use garage_util::data::Uuid;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_util::forwarded_headers; use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration}; use garage_util::metrics::{gen_trace_id, RecordDuration};
use garage_util::socket_address::UnixOrTCPSocketAddress;
struct WebMetrics { struct WebMetrics {
request_counter: Counter<u64>, request_counter: Counter<u64>,
@ -76,7 +77,7 @@ impl WebServer {
/// Run a web server /// Run a web server
pub async fn run( pub async fn run(
garage: Arc<Garage>, garage: Arc<Garage>,
addr: UnixOrTCPSocketAddress, addr: NamedSocketAddr,
root_domain: String, root_domain: String,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {
@ -116,13 +117,13 @@ impl WebServer {
info!("Web server listening on {}", addr); info!("Web server listening on {}", addr);
match addr { match addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => { NamedSocketAddr::Inet(addr) => {
Server::bind(&addr) Server::bind(&addr)
.serve(tcp_service) .serve(tcp_service)
.with_graceful_shutdown(shutdown_signal) .with_graceful_shutdown(shutdown_signal)
.await? .await?
} }
UnixOrTCPSocketAddress::UnixSocket(ref path) => { NamedSocketAddr::Unix(ref path) => {
if path.exists() { if path.exists() {
fs::remove_file(path)? fs::remove_file(path)?
} }