use std::fs::{self, Permissions}; use std::os::unix::fs::PermissionsExt; use std::sync::Arc; use async_trait::async_trait; use futures::future::Future; use futures::stream::{futures_unordered::FuturesUnordered, StreamExt}; use http_body_util::BodyExt; use hyper::header::HeaderValue; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{body::Incoming as IncomingBody, Request, Response}; use hyper::{HeaderMap, StatusCode}; use hyper_util::rt::TokioIo; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream}; use opentelemetry::{ global, metrics::{Counter, ValueRecorder}, trace::{FutureExt, SpanRef, TraceContextExt, Tracer}, Context, KeyValue, }; use garage_util::error::Error as GarageError; use garage_util::forwarded_headers; use garage_util::metrics::{gen_trace_id, RecordDuration}; use garage_util::socket_address::UnixOrTCPSocketAddress; use crate::helpers::{BoxBody, BytesBody}; pub(crate) trait ApiEndpoint: Send + Sync + 'static { fn name(&self) -> &'static str; fn add_span_attributes(&self, span: SpanRef<'_>); } pub trait ApiError: std::error::Error + Send + Sync + 'static { fn http_status_code(&self) -> StatusCode; fn add_http_headers(&self, header_map: &mut HeaderMap); fn http_body(&self, garage_region: &str, path: &str) -> BytesBody; } #[async_trait] pub(crate) trait ApiHandler: Send + Sync + 'static { const API_NAME: &'static str; const API_NAME_DISPLAY: &'static str; type Endpoint: ApiEndpoint; type Error: ApiError; fn parse_endpoint(&self, r: &Request) -> Result; async fn handle( &self, req: Request, endpoint: Self::Endpoint, ) -> Result>, Self::Error>; } pub(crate) struct ApiServer { region: String, api_handler: A, // Metrics request_counter: Counter, error_counter: Counter, request_duration: ValueRecorder, } impl ApiServer { pub fn new(region: String, api_handler: A) -> Arc { let meter = global::meter("garage/api"); Arc::new(Self { region, api_handler, request_counter: meter .u64_counter(format!("api.{}.request_counter", A::API_NAME)) .with_description(format!( "Number of API calls to the various {} API endpoints", A::API_NAME_DISPLAY )) .init(), error_counter: meter .u64_counter(format!("api.{}.error_counter", A::API_NAME)) .with_description(format!( "Number of API calls to the various {} API endpoints that resulted in errors", A::API_NAME_DISPLAY )) .init(), request_duration: meter .f64_value_recorder(format!("api.{}.request_duration", A::API_NAME)) .with_description(format!( "Duration of API calls to the various {} API endpoints", A::API_NAME_DISPLAY )) .init(), }) } pub async fn run_server( self: Arc, bind_addr: UnixOrTCPSocketAddress, unix_bind_addr_mode: Option, shutdown_signal: impl Future, ) -> Result<(), GarageError> { info!( "{} API server listening on {}", A::API_NAME_DISPLAY, bind_addr ); match bind_addr { UnixOrTCPSocketAddress::TCPSocket(addr) => { let listener = TcpListener::bind(addr).await?; let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); server_loop(listener, handler, shutdown_signal).await } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { fs::remove_file(path)? } let listener = UnixListener::bind(path)?; let listener = UnixListenerOn(listener, path.display().to_string()); fs::set_permissions( path, Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)), )?; let handler = move |request, socketaddr| self.clone().handler(request, socketaddr); server_loop(listener, handler, shutdown_signal).await } } } async fn handler( self: Arc, req: Request, addr: String, ) -> Result>, http::Error> { let uri = req.uri().clone(); if let Ok(forwarded_for_ip_addr) = forwarded_headers::handle_forwarded_for_headers(req.headers()) { info!( "{} (via {}) {} {}", forwarded_for_ip_addr, addr, req.method(), uri ); } else { info!("{} {} {}", addr, req.method(), uri); } debug!("{:?}", req); let tracer = opentelemetry::global::tracer("garage"); let span = tracer .span_builder(format!("{} API call (unknown)", A::API_NAME_DISPLAY)) .with_trace_id(gen_trace_id()) .with_attributes(vec![ KeyValue::new("method", format!("{}", req.method())), KeyValue::new("uri", req.uri().to_string()), ]) .start(&tracer); let res = self .handler_stage2(req) .with_context(Context::current_with_span(span)) .await; match res { Ok(x) => { debug!("{} {:?}", x.status(), x.headers()); Ok(x) } Err(e) => { let body = e.http_body(&self.region, uri.path()); let mut http_error_builder = Response::builder().status(e.http_status_code()); if let Some(header_map) = http_error_builder.headers_mut() { e.add_http_headers(header_map) } let http_error = http_error_builder.body(body)?; if e.http_status_code().is_server_error() { warn!("Response: error {}, {}", e.http_status_code(), e); } else { info!("Response: error {}, {}", e.http_status_code(), e); } Ok(http_error.map(|body| BoxBody::new(body.map_err(|_| unreachable!())))) } } } async fn handler_stage2( &self, req: Request, ) -> Result>, A::Error> { let endpoint = self.api_handler.parse_endpoint(&req)?; debug!("Endpoint: {}", endpoint.name()); let current_context = Context::current(); let current_span = current_context.span(); current_span.update_name::(format!( "{} API {}", A::API_NAME_DISPLAY, endpoint.name() )); current_span.set_attribute(KeyValue::new("endpoint", endpoint.name())); endpoint.add_span_attributes(current_span); let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())]; let res = self .api_handler .handle(req, endpoint) .record_duration(&self.request_duration, &metrics_tags[..]) .await; self.request_counter.add(1, &metrics_tags[..]); let status_code = match &res { Ok(r) => r.status(), Err(e) => e.http_status_code(), }; if status_code.is_client_error() || status_code.is_server_error() { self.error_counter.add( 1, &[ metrics_tags[0].clone(), KeyValue::new("status_code", status_code.as_str().to_string()), ], ); } res } } // ==== helper functions ==== #[async_trait] pub trait Accept: Send + Sync + 'static { type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static; async fn accept(&self) -> std::io::Result<(Self::Stream, String)>; } #[async_trait] impl Accept for TcpListener { type Stream = TcpStream; async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { self.accept() .await .map(|(stream, addr)| (stream, addr.to_string())) } } pub struct UnixListenerOn(pub UnixListener, pub String); #[async_trait] impl Accept for UnixListenerOn { type Stream = UnixStream; async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { self.0 .accept() .await .map(|(stream, _addr)| (stream, self.1.clone())) } } pub async fn server_loop( listener: A, handler: H, shutdown_signal: impl Future, ) -> Result<(), GarageError> where A: Accept, H: Fn(Request, String) -> F + Send + Sync + Clone + 'static, F: Future>, http::Error>> + Send + 'static, E: Send + Sync + std::error::Error + 'static, { tokio::pin!(shutdown_signal); let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel(); let connection_collector = tokio::spawn(async move { let mut collection = FuturesUnordered::new(); loop { let collect_next = async { if collection.is_empty() { futures::future::pending().await } else { collection.next().await } }; tokio::select! { result = collect_next => { trace!("HTTP connection finished: {:?}", result); } new_fut = conn_out.recv() => { match new_fut { Some(f) => collection.push(f), None => break, } } } } debug!("Collecting last open HTTP connections."); while let Some(conn_res) = collection.next().await { trace!("HTTP connection finished: {:?}", conn_res); } debug!("No more HTTP connections to collect"); }); loop { let (stream, client_addr) = tokio::select! { acc = listener.accept() => acc?, _ = &mut shutdown_signal => break, }; let io = TokioIo::new(stream); let handler = handler.clone(); let serve = move |req: Request| handler(req, client_addr.clone()); let fut = tokio::task::spawn(async move { let io = Box::pin(io); if let Err(e) = http1::Builder::new() .serve_connection(io, service_fn(serve)) .await { debug!("Error handling HTTP connection: {}", e); } }); conn_in.send(fut)?; } connection_collector.await?; Ok(()) }