Make all HTTP services optionnal

This commit is contained in:
Alex 2022-09-07 17:54:16 +02:00
parent 28d86e7602
commit 2559f63e9b
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
7 changed files with 299 additions and 273 deletions

View file

@ -1,3 +1,4 @@
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
@ -52,15 +53,15 @@ impl AdminApiServer {
} }
} }
pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<(), GarageError> { pub async fn run(
if let Some(bind_addr) = self.garage.config.admin.api_bind_addr { self,
let region = self.garage.config.s3_api.s3_region.clone(); bind_addr: SocketAddr,
ApiServer::new(region, self) shutdown_signal: impl Future<Output = ()>,
.run_server(bind_addr, shutdown_signal) ) -> Result<(), GarageError> {
.await let region = self.garage.config.s3_api.s3_region.clone();
} else { ApiServer::new(region, self)
Ok(()) .run_server(bind_addr, shutdown_signal)
} .await
} }
fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> { fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> {

View file

@ -1,3 +1,4 @@
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
@ -36,20 +37,13 @@ pub(crate) struct K2VApiEndpoint {
impl K2VApiServer { impl K2VApiServer {
pub async fn run( pub async fn run(
garage: Arc<Garage>, garage: Arc<Garage>,
bind_addr: SocketAddr,
s3_region: String,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {
if let Some(cfg) = &garage.config.k2v_api { ApiServer::new(s3_region, K2VApiServer { garage })
let bind_addr = cfg.api_bind_addr;
ApiServer::new(
garage.config.s3_api.s3_region.clone(),
K2VApiServer { garage },
)
.run_server(bind_addr, shutdown_signal) .run_server(bind_addr, shutdown_signal)
.await .await
} else {
Ok(())
}
} }
} }

View file

@ -1,3 +1,4 @@
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
@ -43,16 +44,13 @@ pub(crate) struct S3ApiEndpoint {
impl S3ApiServer { impl S3ApiServer {
pub async fn run( pub async fn run(
garage: Arc<Garage>, garage: Arc<Garage>,
addr: SocketAddr,
s3_region: String,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {
let addr = garage.config.s3_api.api_bind_addr; ApiServer::new(s3_region, S3ApiServer { garage })
.run_server(addr, shutdown_signal)
ApiServer::new( .await
garage.config.s3_api.s3_region.clone(),
S3ApiServer { garage },
)
.run_server(addr, shutdown_signal)
.await
} }
async fn handle_request_without_bucket( async fn handle_request_without_bucket(

View file

@ -9,7 +9,7 @@ use garage_util::error::Error;
use garage_api::admin::api_server::AdminApiServer; use garage_api::admin::api_server::AdminApiServer;
use garage_api::s3::api_server::S3ApiServer; use garage_api::s3::api_server::S3ApiServer;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_web::run_web_server; use garage_web::WebServer;
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
use garage_api::k2v::api_server::K2VApiServer; use garage_api::k2v::api_server::K2VApiServer;
@ -30,6 +30,8 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Loading configuration..."); info!("Loading configuration...");
let config = read_config(config_file)?; let config = read_config(config_file)?;
// ---- Initialize Garage internals ----
info!("Initializing background runner..."); info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c(); let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
@ -44,7 +46,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
init_tracing(config.admin.trace_sink.as_ref().unwrap(), garage.system.id)?; init_tracing(config.admin.trace_sink.as_ref().unwrap(), garage.system.id)?;
#[cfg(not(feature = "telemetry-otlp"))] #[cfg(not(feature = "telemetry-otlp"))]
warn!("Garage was built without OTLP exporter, admin.trace_sink is ignored."); error!("Garage was built without OTLP exporter, admin.trace_sink is ignored.");
} }
info!("Initialize Admin API server and metrics collector..."); info!("Initialize Admin API server and metrics collector...");
@ -56,53 +58,73 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Create admin RPC handler..."); info!("Create admin RPC handler...");
AdminRpcHandler::new(garage.clone()); AdminRpcHandler::new(garage.clone());
info!("Initializing S3 API server..."); // ---- Launch public-facing API servers ----
let s3_api_server = tokio::spawn(S3ApiServer::run(
garage.clone(),
wait_from(watch_cancel.clone()),
));
#[cfg(feature = "k2v")] let mut servers = vec![];
let k2v_api_server = {
info!("Initializing K2V API server...");
tokio::spawn(K2VApiServer::run(
garage.clone(),
wait_from(watch_cancel.clone()),
))
};
info!("Initializing web server..."); if let Some(s3_bind_addr) = &config.s3_api.api_bind_addr {
let web_server = tokio::spawn(run_web_server( info!("Initializing S3 API server...");
garage.clone(), servers.push((
wait_from(watch_cancel.clone()), "S3 API",
)); tokio::spawn(S3ApiServer::run(
garage.clone(),
*s3_bind_addr,
config.s3_api.s3_region.clone(),
wait_from(watch_cancel.clone()),
)),
));
}
info!("Launching Admin API server..."); if config.k2v_api.is_some() {
let admin_server = tokio::spawn(admin_server.run(wait_from(watch_cancel.clone()))); #[cfg(feature = "k2v")]
{
info!("Initializing K2V API server...");
servers.push((
"K2V API",
tokio::spawn(K2VApiServer::run(
garage.clone(),
config.k2v_api.as_ref().unwrap().api_bind_addr,
config.s3_api.s3_region.clone(),
wait_from(watch_cancel.clone()),
)),
));
}
#[cfg(not(feature = "k2v"))]
error!("K2V is not enabled in this build, cannot start K2V API server");
}
if let Some(web_config) = &config.s3_web {
info!("Initializing web server...");
servers.push((
"Web",
tokio::spawn(WebServer::run(
garage.clone(),
web_config.bind_addr,
web_config.root_domain.clone(),
wait_from(watch_cancel.clone()),
)),
));
}
if let Some(admin_bind_addr) = &config.admin.api_bind_addr {
info!("Launching Admin API server...");
servers.push((
"Admin",
tokio::spawn(admin_server.run(*admin_bind_addr, wait_from(watch_cancel.clone()))),
));
}
// Stuff runs // Stuff runs
// When a cancel signal is sent, stuff stops // When a cancel signal is sent, stuff stops
if let Err(e) = s3_api_server.await? {
warn!("S3 API server exited with error: {}", e); // Collect stuff
} else { for (desc, join_handle) in servers {
info!("S3 API server exited without error."); if let Err(e) = join_handle.await? {
} error!("{} server exited with error: {}", desc, e);
#[cfg(feature = "k2v")] } else {
if let Err(e) = k2v_api_server.await? { info!("{} server exited without error.", desc);
warn!("K2V API server exited with error: {}", e); }
} else {
info!("K2V API server exited without error.");
}
if let Err(e) = web_server.await? {
warn!("Web server exited with error: {}", e);
} else {
info!("Web server exited without error.");
}
if let Err(e) = admin_server.await? {
warn!("Admin web server exited with error: {}", e);
} else {
info!("Admin API server exited without error.");
} }
// Remove RPC handlers for system to break reference cycles // Remove RPC handlers for system to break reference cycles

View file

@ -81,11 +81,10 @@ pub struct Config {
pub s3_api: S3ApiConfig, pub s3_api: S3ApiConfig,
/// Configuration for K2V api /// Configuration for K2V api
#[cfg(feature = "k2v")]
pub k2v_api: Option<K2VApiConfig>, pub k2v_api: Option<K2VApiConfig>,
/// Configuration for serving files as normal web server /// Configuration for serving files as normal web server
pub s3_web: WebConfig, pub s3_web: Option<WebConfig>,
/// Configuration for the admin API endpoint /// Configuration for the admin API endpoint
#[serde(default = "Default::default")] #[serde(default = "Default::default")]
@ -96,7 +95,7 @@ pub struct Config {
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct S3ApiConfig { pub struct S3ApiConfig {
/// Address and port to bind for api serving /// Address and port to bind for api serving
pub api_bind_addr: SocketAddr, pub api_bind_addr: Option<SocketAddr>,
/// S3 region to use /// S3 region to use
pub s3_region: String, pub s3_region: String,
/// Suffix to remove from domain name to find bucket. If None, /// Suffix to remove from domain name to find bucket. If None,
@ -105,7 +104,6 @@ pub struct S3ApiConfig {
} }
/// Configuration for K2V api /// Configuration for K2V api
#[cfg(feature = "k2v")]
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct K2VApiConfig { pub struct K2VApiConfig {
/// Address and port to bind for api serving /// Address and port to bind for api serving

View file

@ -6,4 +6,4 @@ mod error;
pub use error::Error; pub use error::Error;
mod web_server; mod web_server;
pub use web_server::run_web_server; pub use web_server::WebServer;

View file

@ -57,90 +57,226 @@ impl WebMetrics {
} }
} }
/// Run a web server pub struct WebServer {
pub async fn run_web_server(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
let addr = &garage.config.s3_web.bind_addr;
let metrics = Arc::new(WebMetrics::new());
let service = make_service_fn(|conn: &AddrStream| {
let garage = garage.clone();
let metrics = metrics.clone();
let client_addr = conn.remote_addr();
async move {
Ok::<_, Error>(service_fn(move |req: Request<Body>| {
let garage = garage.clone();
let metrics = metrics.clone();
handle_request(garage, metrics, req, client_addr)
}))
}
});
let server = Server::bind(addr).serve(service);
let graceful = server.with_graceful_shutdown(shutdown_signal);
info!("Web server listening on http://{}", addr);
graceful.await?;
Ok(())
}
async fn handle_request(
garage: Arc<Garage>, garage: Arc<Garage>,
metrics: Arc<WebMetrics>, metrics: Arc<WebMetrics>,
req: Request<Body>, root_domain: String,
addr: SocketAddr, }
) -> Result<Response<Body>, Infallible> {
info!("{} {} {}", addr, req.method(), req.uri());
// Lots of instrumentation impl WebServer {
let tracer = opentelemetry::global::tracer("garage"); /// Run a web server
let span = tracer pub async fn run(
.span_builder(format!("Web {} request", req.method())) garage: Arc<Garage>,
.with_trace_id(gen_trace_id()) addr: SocketAddr,
.with_attributes(vec![ root_domain: String,
KeyValue::new("method", format!("{}", req.method())), shutdown_signal: impl Future<Output = ()>,
KeyValue::new("uri", req.uri().to_string()), ) -> Result<(), GarageError> {
]) let metrics = Arc::new(WebMetrics::new());
.start(&tracer); let web_server = Arc::new(WebServer {
garage,
metrics,
root_domain,
});
let metrics_tags = &[KeyValue::new("method", req.method().to_string())]; let service = make_service_fn(|conn: &AddrStream| {
let web_server = web_server.clone();
// The actual handler let client_addr = conn.remote_addr();
let res = serve_file(garage, &req) async move {
.with_context(Context::current_with_span(span)) Ok::<_, Error>(service_fn(move |req: Request<Body>| {
.record_duration(&metrics.request_duration, &metrics_tags[..]) let web_server = web_server.clone();
.await;
// More instrumentation web_server.handle_request(req, client_addr)
metrics.request_counter.add(1, &metrics_tags[..]); }))
}
});
// Returning the result let server = Server::bind(&addr).serve(service);
match res { let graceful = server.with_graceful_shutdown(shutdown_signal);
Ok(res) => { info!("Web server listening on http://{}", addr);
debug!("{} {} {}", req.method(), res.status(), req.uri());
Ok(res) graceful.await?;
Ok(())
}
async fn handle_request(
self: Arc<Self>,
req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, Infallible> {
info!("{} {} {}", addr, req.method(), req.uri());
// Lots of instrumentation
let tracer = opentelemetry::global::tracer("garage");
let span = tracer
.span_builder(format!("Web {} request", req.method()))
.with_trace_id(gen_trace_id())
.with_attributes(vec![
KeyValue::new("method", format!("{}", req.method())),
KeyValue::new("uri", req.uri().to_string()),
])
.start(&tracer);
let metrics_tags = &[KeyValue::new("method", req.method().to_string())];
// The actual handler
let res = self
.serve_file(&req)
.with_context(Context::current_with_span(span))
.record_duration(&self.metrics.request_duration, &metrics_tags[..])
.await;
// More instrumentation
self.metrics.request_counter.add(1, &metrics_tags[..]);
// Returning the result
match res {
Ok(res) => {
debug!("{} {} {}", req.method(), res.status(), req.uri());
Ok(res)
}
Err(error) => {
info!(
"{} {} {} {}",
req.method(),
error.http_status_code(),
req.uri(),
error
);
self.metrics.error_counter.add(
1,
&[
metrics_tags[0].clone(),
KeyValue::new("status_code", error.http_status_code().to_string()),
],
);
Ok(error_to_res(error))
}
} }
Err(error) => { }
info!(
"{} {} {} {}", async fn serve_file(self: &Arc<Self>, req: &Request<Body>) -> Result<Response<Body>, Error> {
req.method(), // Get http authority string (eg. [::1]:3902 or garage.tld:80)
error.http_status_code(), let authority = req
req.uri(), .headers()
error .get(HOST)
); .ok_or_bad_request("HOST header required")?
metrics.error_counter.add( .to_str()?;
1,
&[ // Get bucket
metrics_tags[0].clone(), let host = authority_to_host(authority)?;
KeyValue::new("status_code", error.http_status_code().to_string()),
], let bucket_name = host_to_bucket(&host, &self.root_domain).unwrap_or(&host);
); let bucket_id = self
Ok(error_to_res(error)) .garage
.bucket_alias_table
.get(&EmptyKey, &bucket_name.to_string())
.await?
.and_then(|x| x.state.take())
.ok_or(Error::NotFound)?;
// Check bucket isn't deleted and has website access enabled
let bucket = self
.garage
.bucket_table
.get(&EmptyKey, &bucket_id)
.await?
.ok_or(Error::NotFound)?;
let website_config = bucket
.params()
.ok_or(Error::NotFound)?
.website_config
.get()
.as_ref()
.ok_or(Error::NotFound)?;
// Get path
let path = req.uri().path().to_string();
let index = &website_config.index_document;
let key = path_to_key(&path, index)?;
debug!(
"Selected bucket: \"{}\" {:?}, selected key: \"{}\"",
bucket_name, bucket_id, key
);
let ret_doc = match *req.method() {
Method::OPTIONS => handle_options_for_bucket(req, &bucket),
Method::HEAD => handle_head(self.garage.clone(), req, bucket_id, &key, None).await,
Method::GET => handle_get(self.garage.clone(), req, bucket_id, &key, None).await,
_ => Err(ApiError::bad_request("HTTP method not supported")),
}
.map_err(Error::from);
match ret_doc {
Err(error) => {
// For a HEAD or OPTIONS method, and for non-4xx errors,
// we don't return the error document as content,
// we return above and just return the error message
// by relying on err_to_res that is called when we return an Err.
if *req.method() == Method::HEAD
|| *req.method() == Method::OPTIONS
|| !error.http_status_code().is_client_error()
{
return Err(error);
}
// If no error document is set: just return the error directly
let error_document = match &website_config.error_document {
Some(ed) => ed.trim_start_matches('/').to_owned(),
None => return Err(error),
};
// We want to return the error document
// Create a fake HTTP request with path = the error document
let req2 = Request::builder()
.uri(format!("http://{}/{}", host, &error_document))
.body(Body::empty())
.unwrap();
match handle_get(self.garage.clone(), &req2, bucket_id, &error_document, None).await
{
Ok(mut error_doc) => {
// The error won't be logged back in handle_request,
// so log it here
info!(
"{} {} {} {}",
req.method(),
req.uri(),
error.http_status_code(),
error
);
*error_doc.status_mut() = error.http_status_code();
error.add_headers(error_doc.headers_mut());
// Preserve error message in a special header
for error_line in error.to_string().split('\n') {
if let Ok(v) = HeaderValue::from_bytes(error_line.as_bytes()) {
error_doc.headers_mut().append("X-Garage-Error", v);
}
}
Ok(error_doc)
}
Err(error_doc_error) => {
warn!(
"Couldn't get error document {} for bucket {:?}: {}",
error_document, bucket_id, error_doc_error
);
Err(error)
}
}
}
Ok(mut resp) => {
// Maybe add CORS headers
if let Some(rule) = find_matching_cors_rule(&bucket, req)? {
add_cors_headers(&mut resp, rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}
Ok(resp)
}
} }
} }
} }
@ -160,129 +296,6 @@ fn error_to_res(e: Error) -> Response<Body> {
http_error http_error
} }
async fn serve_file(garage: Arc<Garage>, req: &Request<Body>) -> Result<Response<Body>, Error> {
// Get http authority string (eg. [::1]:3902 or garage.tld:80)
let authority = req
.headers()
.get(HOST)
.ok_or_bad_request("HOST header required")?
.to_str()?;
// Get bucket
let host = authority_to_host(authority)?;
let root = &garage.config.s3_web.root_domain;
let bucket_name = host_to_bucket(&host, root).unwrap_or(&host);
let bucket_id = garage
.bucket_alias_table
.get(&EmptyKey, &bucket_name.to_string())
.await?
.and_then(|x| x.state.take())
.ok_or(Error::NotFound)?;
// Check bucket isn't deleted and has website access enabled
let bucket = garage
.bucket_table
.get(&EmptyKey, &bucket_id)
.await?
.ok_or(Error::NotFound)?;
let website_config = bucket
.params()
.ok_or(Error::NotFound)?
.website_config
.get()
.as_ref()
.ok_or(Error::NotFound)?;
// Get path
let path = req.uri().path().to_string();
let index = &website_config.index_document;
let key = path_to_key(&path, index)?;
debug!(
"Selected bucket: \"{}\" {:?}, selected key: \"{}\"",
bucket_name, bucket_id, key
);
let ret_doc = match *req.method() {
Method::OPTIONS => handle_options_for_bucket(req, &bucket),
Method::HEAD => handle_head(garage.clone(), req, bucket_id, &key, None).await,
Method::GET => handle_get(garage.clone(), req, bucket_id, &key, None).await,
_ => Err(ApiError::bad_request("HTTP method not supported")),
}
.map_err(Error::from);
match ret_doc {
Err(error) => {
// For a HEAD or OPTIONS method, and for non-4xx errors,
// we don't return the error document as content,
// we return above and just return the error message
// by relying on err_to_res that is called when we return an Err.
if *req.method() == Method::HEAD
|| *req.method() == Method::OPTIONS
|| !error.http_status_code().is_client_error()
{
return Err(error);
}
// If no error document is set: just return the error directly
let error_document = match &website_config.error_document {
Some(ed) => ed.trim_start_matches('/').to_owned(),
None => return Err(error),
};
// We want to return the error document
// Create a fake HTTP request with path = the error document
let req2 = Request::builder()
.uri(format!("http://{}/{}", host, &error_document))
.body(Body::empty())
.unwrap();
match handle_get(garage, &req2, bucket_id, &error_document, None).await {
Ok(mut error_doc) => {
// The error won't be logged back in handle_request,
// so log it here
info!(
"{} {} {} {}",
req.method(),
req.uri(),
error.http_status_code(),
error
);
*error_doc.status_mut() = error.http_status_code();
error.add_headers(error_doc.headers_mut());
// Preserve error message in a special header
for error_line in error.to_string().split('\n') {
if let Ok(v) = HeaderValue::from_bytes(error_line.as_bytes()) {
error_doc.headers_mut().append("X-Garage-Error", v);
}
}
Ok(error_doc)
}
Err(error_doc_error) => {
warn!(
"Couldn't get error document {} for bucket {:?}: {}",
error_document, bucket_id, error_doc_error
);
Err(error)
}
}
}
Ok(mut resp) => {
// Maybe add CORS headers
if let Some(rule) = find_matching_cors_rule(&bucket, req)? {
add_cors_headers(&mut resp, rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}
Ok(resp)
}
}
}
/// Path to key /// Path to key
/// ///
/// Convert the provided path to the internal key /// Convert the provided path to the internal key