Add support for binding to unix domain sockets #640
6 changed files with 108 additions and 28 deletions
|
@ -1,5 +1,4 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::SocketAddr;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
@ -18,6 +17,7 @@ use prometheus::{Encoder, TextEncoder};
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_rpc::system::ClusterHealthStatus;
|
use garage_rpc::system::ClusterHealthStatus;
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
|
use garage_util::socket_address::UnixOrTCPSocketAddress;
|
||||||
|
|
||||||
use crate::generic_server::*;
|
use crate::generic_server::*;
|
||||||
|
|
||||||
|
@ -61,7 +61,7 @@ impl AdminApiServer {
|
||||||
|
|
||||||
pub async fn run(
|
pub async fn run(
|
||||||
self,
|
self,
|
||||||
bind_addr: SocketAddr,
|
bind_addr: UnixOrTCPSocketAddress,
|
||||||
shutdown_signal: impl Future<Output = ()>,
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
) -> Result<(), GarageError> {
|
) -> Result<(), GarageError> {
|
||||||
let region = self.garage.config.s3_api.s3_region.clone();
|
let region = self.garage.config.s3_api.s3_region.clone();
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use std::net::SocketAddr;
|
use std::fs::{self, Permissions};
|
||||||
|
use std::os::unix::fs::PermissionsExt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
@ -11,6 +12,10 @@ use hyper::service::{make_service_fn, service_fn};
|
||||||
use hyper::{Body, Request, Response, Server};
|
use hyper::{Body, Request, Response, Server};
|
||||||
use hyper::{HeaderMap, StatusCode};
|
use hyper::{HeaderMap, StatusCode};
|
||||||
|
|
||||||
|
use hyperlocal::UnixServerExt;
|
||||||
|
|
||||||
|
use tokio::net::UnixStream;
|
||||||
|
|
||||||
use opentelemetry::{
|
use opentelemetry::{
|
||||||
global,
|
global,
|
||||||
metrics::{Counter, ValueRecorder},
|
metrics::{Counter, ValueRecorder},
|
||||||
|
@ -21,6 +26,7 @@ use opentelemetry::{
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
use garage_util::forwarded_headers;
|
use garage_util::forwarded_headers;
|
||||||
use garage_util::metrics::{gen_trace_id, RecordDuration};
|
use garage_util::metrics::{gen_trace_id, RecordDuration};
|
||||||
|
use garage_util::socket_address::UnixOrTCPSocketAddress;
|
||||||
|
|
||||||
pub(crate) trait ApiEndpoint: Send + Sync + 'static {
|
pub(crate) trait ApiEndpoint: Send + Sync + 'static {
|
||||||
fn name(&self) -> &'static str;
|
fn name(&self) -> &'static str;
|
||||||
|
@ -91,10 +97,10 @@ impl<A: ApiHandler> ApiServer<A> {
|
||||||
|
|
||||||
pub async fn run_server(
|
pub async fn run_server(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
bind_addr: SocketAddr,
|
bind_addr: UnixOrTCPSocketAddress,
|
||||||
shutdown_signal: impl Future<Output = ()>,
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
) -> Result<(), GarageError> {
|
) -> Result<(), GarageError> {
|
||||||
let service = make_service_fn(|conn: &AddrStream| {
|
let tcp_service = make_service_fn(|conn: &AddrStream| {
|
||||||
let this = self.clone();
|
let this = self.clone();
|
||||||
|
|
||||||
let client_addr = conn.remote_addr();
|
let client_addr = conn.remote_addr();
|
||||||
|
@ -102,28 +108,60 @@ impl<A: ApiHandler> ApiServer<A> {
|
||||||
Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
|
Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
|
||||||
let this = this.clone();
|
let this = this.clone();
|
||||||
|
|
||||||
this.handler(req, client_addr)
|
this.handler(req, client_addr.to_string())
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let server = Server::bind(&bind_addr).serve(service);
|
let unix_service = make_service_fn(|_: &UnixStream| {
|
||||||
|
let this = self.clone();
|
||||||
|
|
||||||
|
let path = bind_addr.to_string();
|
||||||
|
async move {
|
||||||
|
Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
|
||||||
|
let this = this.clone();
|
||||||
|
|
||||||
|
this.handler(req, path.clone())
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let graceful = server.with_graceful_shutdown(shutdown_signal);
|
|
||||||
info!(
|
info!(
|
||||||
"{} API server listening on http://{}",
|
"{} API server listening on {}",
|
||||||
A::API_NAME_DISPLAY,
|
A::API_NAME_DISPLAY,
|
||||||
bind_addr
|
bind_addr
|
||||||
);
|
);
|
||||||
|
|
||||||
graceful.await?;
|
match bind_addr {
|
||||||
|
UnixOrTCPSocketAddress::TCPSocket(addr) => {
|
||||||
|
Server::bind(&addr)
|
||||||
|
.serve(tcp_service)
|
||||||
|
.with_graceful_shutdown(shutdown_signal)
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
|
||||||
|
if path.exists() {
|
||||||
|
fs::remove_file(path)?
|
||||||
|
}
|
||||||
|
|
||||||
|
let bound = Server::bind_unix(path)?;
|
||||||
|
|
||||||
|
fs::set_permissions(path, Permissions::from_mode(0o222))?;
|
||||||
|
|
||||||
|
bound
|
||||||
|
.serve(unix_service)
|
||||||
|
.with_graceful_shutdown(shutdown_signal)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handler(
|
async fn handler(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
addr: SocketAddr,
|
addr: String,
|
||||||
) -> Result<Response<Body>, GarageError> {
|
) -> Result<Response<Body>, GarageError> {
|
||||||
let uri = req.uri().clone();
|
let uri = req.uri().clone();
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use std::net::SocketAddr;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
@ -9,6 +8,7 @@ use hyper::{Body, Method, Request, Response};
|
||||||
use opentelemetry::{trace::SpanRef, KeyValue};
|
use opentelemetry::{trace::SpanRef, KeyValue};
|
||||||
|
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
|
use garage_util::socket_address::UnixOrTCPSocketAddress;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ pub(crate) struct K2VApiEndpoint {
|
||||||
impl K2VApiServer {
|
impl K2VApiServer {
|
||||||
pub async fn run(
|
pub async fn run(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
bind_addr: SocketAddr,
|
bind_addr: UnixOrTCPSocketAddress,
|
||||||
s3_region: String,
|
s3_region: String,
|
||||||
shutdown_signal: impl Future<Output = ()>,
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
) -> Result<(), GarageError> {
|
) -> Result<(), GarageError> {
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use std::net::SocketAddr;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
@ -10,6 +9,7 @@ use hyper::{Body, Request, Response};
|
||||||
use opentelemetry::{trace::SpanRef, KeyValue};
|
use opentelemetry::{trace::SpanRef, KeyValue};
|
||||||
|
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
|
use garage_util::socket_address::UnixOrTCPSocketAddress;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::key_table::Key;
|
use garage_model::key_table::Key;
|
||||||
|
@ -44,7 +44,7 @@ pub(crate) struct S3ApiEndpoint {
|
||||||
impl S3ApiServer {
|
impl S3ApiServer {
|
||||||
pub async fn run(
|
pub async fn run(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
addr: SocketAddr,
|
addr: UnixOrTCPSocketAddress,
|
||||||
s3_region: String,
|
s3_region: String,
|
||||||
shutdown_signal: impl Future<Output = ()>,
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
) -> Result<(), GarageError> {
|
) -> Result<(), GarageError> {
|
||||||
|
|
|
@ -79,7 +79,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
|
||||||
"S3 API",
|
"S3 API",
|
||||||
tokio::spawn(S3ApiServer::run(
|
tokio::spawn(S3ApiServer::run(
|
||||||
garage.clone(),
|
garage.clone(),
|
||||||
*s3_bind_addr,
|
s3_bind_addr.clone(),
|
||||||
config.s3_api.s3_region.clone(),
|
config.s3_api.s3_region.clone(),
|
||||||
wait_from(watch_cancel.clone()),
|
wait_from(watch_cancel.clone()),
|
||||||
)),
|
)),
|
||||||
|
@ -94,7 +94,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
|
||||||
"K2V API",
|
"K2V API",
|
||||||
tokio::spawn(K2VApiServer::run(
|
tokio::spawn(K2VApiServer::run(
|
||||||
garage.clone(),
|
garage.clone(),
|
||||||
config.k2v_api.as_ref().unwrap().api_bind_addr,
|
config.k2v_api.as_ref().unwrap().api_bind_addr.clone(),
|
||||||
config.s3_api.s3_region.clone(),
|
config.s3_api.s3_region.clone(),
|
||||||
wait_from(watch_cancel.clone()),
|
wait_from(watch_cancel.clone()),
|
||||||
)),
|
)),
|
||||||
|
@ -110,7 +110,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
|
||||||
"Web",
|
"Web",
|
||||||
tokio::spawn(WebServer::run(
|
tokio::spawn(WebServer::run(
|
||||||
garage.clone(),
|
garage.clone(),
|
||||||
web_config.bind_addr,
|
web_config.bind_addr.clone(),
|
||||||
web_config.root_domain.clone(),
|
web_config.root_domain.clone(),
|
||||||
wait_from(watch_cancel.clone()),
|
wait_from(watch_cancel.clone()),
|
||||||
)),
|
)),
|
||||||
|
@ -121,7 +121,9 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
|
||||||
info!("Launching Admin API server...");
|
info!("Launching Admin API server...");
|
||||||
servers.push((
|
servers.push((
|
||||||
"Admin",
|
"Admin",
|
||||||
tokio::spawn(admin_server.run(*admin_bind_addr, wait_from(watch_cancel.clone()))),
|
tokio::spawn(
|
||||||
|
admin_server.run(admin_bind_addr.clone(), wait_from(watch_cancel.clone())),
|
||||||
|
),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,6 @@
|
||||||
use std::{convert::Infallible, net::SocketAddr, sync::Arc};
|
use std::fs::{self, Permissions};
|
||||||
|
use std::os::unix::prelude::PermissionsExt;
|
||||||
|
use std::{convert::Infallible, sync::Arc};
|
||||||
|
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
|
|
||||||
|
@ -9,6 +11,10 @@ use hyper::{
|
||||||
Body, Method, Request, Response, Server, StatusCode,
|
Body, Method, Request, Response, Server, StatusCode,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use hyperlocal::UnixServerExt;
|
||||||
|
|
||||||
|
use tokio::net::UnixStream;
|
||||||
|
|
||||||
use opentelemetry::{
|
use opentelemetry::{
|
||||||
global,
|
global,
|
||||||
metrics::{Counter, ValueRecorder},
|
metrics::{Counter, ValueRecorder},
|
||||||
|
@ -32,6 +38,7 @@ use garage_util::data::Uuid;
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
use garage_util::forwarded_headers;
|
use garage_util::forwarded_headers;
|
||||||
use garage_util::metrics::{gen_trace_id, RecordDuration};
|
use garage_util::metrics::{gen_trace_id, RecordDuration};
|
||||||
|
use garage_util::socket_address::UnixOrTCPSocketAddress;
|
||||||
|
|
||||||
struct WebMetrics {
|
struct WebMetrics {
|
||||||
request_counter: Counter<u64>,
|
request_counter: Counter<u64>,
|
||||||
|
@ -69,7 +76,7 @@ impl WebServer {
|
||||||
/// Run a web server
|
/// Run a web server
|
||||||
pub async fn run(
|
pub async fn run(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
addr: SocketAddr,
|
addr: UnixOrTCPSocketAddress,
|
||||||
root_domain: String,
|
root_domain: String,
|
||||||
shutdown_signal: impl Future<Output = ()>,
|
shutdown_signal: impl Future<Output = ()>,
|
||||||
) -> Result<(), GarageError> {
|
) -> Result<(), GarageError> {
|
||||||
|
@ -80,7 +87,7 @@ impl WebServer {
|
||||||
root_domain,
|
root_domain,
|
||||||
});
|
});
|
||||||
|
|
||||||
let service = make_service_fn(|conn: &AddrStream| {
|
let tcp_service = make_service_fn(|conn: &AddrStream| {
|
||||||
let web_server = web_server.clone();
|
let web_server = web_server.clone();
|
||||||
|
|
||||||
let client_addr = conn.remote_addr();
|
let client_addr = conn.remote_addr();
|
||||||
|
@ -88,23 +95,56 @@ impl WebServer {
|
||||||
Ok::<_, Error>(service_fn(move |req: Request<Body>| {
|
Ok::<_, Error>(service_fn(move |req: Request<Body>| {
|
||||||
let web_server = web_server.clone();
|
let web_server = web_server.clone();
|
||||||
|
|
||||||
web_server.handle_request(req, client_addr)
|
web_server.handle_request(req, client_addr.to_string())
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let server = Server::bind(&addr).serve(service);
|
let unix_service = make_service_fn(|_: &UnixStream| {
|
||||||
let graceful = server.with_graceful_shutdown(shutdown_signal);
|
let web_server = web_server.clone();
|
||||||
info!("Web server listening on http://{}", addr);
|
|
||||||
|
let path = addr.to_string();
|
||||||
|
async move {
|
||||||
|
Ok::<_, Error>(service_fn(move |req: Request<Body>| {
|
||||||
|
let web_server = web_server.clone();
|
||||||
|
|
||||||
|
web_server.handle_request(req, path.clone())
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
info!("Web server listening on {}", addr);
|
||||||
|
|
||||||
|
match addr {
|
||||||
|
UnixOrTCPSocketAddress::TCPSocket(addr) => {
|
||||||
|
Server::bind(&addr)
|
||||||
|
.serve(tcp_service)
|
||||||
|
.with_graceful_shutdown(shutdown_signal)
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
|
||||||
|
if path.exists() {
|
||||||
|
fs::remove_file(path)?
|
||||||
|
}
|
||||||
|
|
||||||
|
let bound = Server::bind_unix(path)?;
|
||||||
|
|
||||||
|
fs::set_permissions(path, Permissions::from_mode(0o222))?;
|
||||||
|
|
||||||
|
bound
|
||||||
|
.serve(unix_service)
|
||||||
|
.with_graceful_shutdown(shutdown_signal)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
graceful.await?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_request(
|
async fn handle_request(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
addr: SocketAddr,
|
addr: String,
|
||||||
) -> Result<Response<Body>, Infallible> {
|
) -> Result<Response<Body>, Infallible> {
|
||||||
if let Ok(forwarded_for_ip_addr) =
|
if let Ok(forwarded_for_ip_addr) =
|
||||||
forwarded_headers::handle_forwarded_for_headers(req.headers())
|
forwarded_headers::handle_forwarded_for_headers(req.headers())
|
||||||
|
|
Loading…
Reference in a new issue