Refactor admin API to be in api/admin and use common code

This commit is contained in:
Alex 2022-05-05 10:29:45 +02:00
parent 5768bf3622
commit 633958c7b1
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
16 changed files with 228 additions and 220 deletions

26
Cargo.lock generated
View file

@ -839,7 +839,6 @@ dependencies = [
"chrono",
"futures",
"futures-util",
"garage_admin",
"garage_api",
"garage_model 0.7.0",
"garage_rpc 0.7.0",
@ -853,7 +852,11 @@ dependencies = [
"hyper",
"kuska-sodiumoxide",
"netapp 0.4.4",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"pretty_env_logger",
"prometheus",
"rand 0.8.5",
"rmp-serde 0.15.5",
"serde",
@ -868,23 +871,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "garage_admin"
version = "0.7.0"
dependencies = [
"futures",
"futures-util",
"garage_util 0.7.0",
"hex",
"http",
"hyper",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"prometheus",
"tracing",
]
[[package]]
name = "garage_api"
version = "0.7.0"
@ -914,8 +900,11 @@ dependencies = [
"multer",
"nom",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"percent-encoding",
"pin-project 1.0.10",
"prometheus",
"quick-xml",
"roxmltree",
"serde",
@ -1040,7 +1029,6 @@ dependencies = [
"bytes 1.1.0",
"futures",
"futures-util",
"garage_admin",
"garage_util 0.7.0",
"gethostname",
"hex",

View file

@ -5,7 +5,6 @@ members = [
"src/table",
"src/block",
"src/model",
"src/admin",
"src/api",
"src/web",
"src/garage"

View file

@ -1,29 +0,0 @@
[package]
name = "garage_admin"
version = "0.7.0"
authors = ["Maximilien Richer <code@mricher.fr>"]
edition = "2018"
license = "AGPL-3.0"
description = "Administration and metrics REST HTTP server for Garage"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
[lib]
path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
garage_util = { version = "0.7.0", path = "../util" }
hex = "0.4"
futures = "0.3"
futures-util = "0.3"
http = "0.2"
hyper = "0.14"
tracing = "0.1.30"
opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
opentelemetry-prometheus = "0.10"
opentelemetry-otlp = "0.10"
prometheus = "0.13"

View file

@ -1,6 +0,0 @@
//! Crate for handling the admin and metric HTTP APIs
#[macro_use]
extern crate tracing;
pub mod metrics;
pub mod tracing_setup;

View file

@ -1,146 +0,0 @@
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::SystemTime;
use futures::future::*;
use hyper::{
header::CONTENT_TYPE,
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server,
};
use opentelemetry::{
global,
metrics::{BoundCounter, BoundValueRecorder},
trace::{FutureExt, TraceContextExt, Tracer},
Context,
};
use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder};
use garage_util::error::Error as GarageError;
use garage_util::metrics::*;
// serve_req on metric endpoint
async fn serve_req(
req: Request<Body>,
admin_server: Arc<AdminServer>,
) -> Result<Response<Body>, hyper::Error> {
debug!("Receiving request at path {}", req.uri());
let request_start = SystemTime::now();
admin_server.metrics.http_counter.add(1);
let response = match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
let mut buffer = vec![];
let encoder = TextEncoder::new();
let tracer = opentelemetry::global::tracer("garage");
let metric_families = tracer.in_span("admin/gather_metrics", |_| {
admin_server.exporter.registry().gather()
});
encoder.encode(&metric_families, &mut buffer).unwrap();
admin_server
.metrics
.http_body_gauge
.record(buffer.len() as u64);
Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}
_ => Response::builder()
.status(404)
.body(Body::from("Not implemented"))
.unwrap(),
};
admin_server
.metrics
.http_req_histogram
.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
Ok(response)
}
// AdminServer hold the admin server internal admin_server and the metric exporter
pub struct AdminServer {
exporter: PrometheusExporter,
metrics: AdminServerMetrics,
}
// GarageMetricadmin_server holds the metrics counter definition for Garage
// FIXME: we would rather have that split up among the different libraries?
struct AdminServerMetrics {
http_counter: BoundCounter<u64>,
http_body_gauge: BoundValueRecorder<u64>,
http_req_histogram: BoundValueRecorder<f64>,
}
impl AdminServer {
/// init initilialize the AdminServer and background metric server
pub fn init() -> AdminServer {
let exporter = opentelemetry_prometheus::exporter().init();
let meter = global::meter("garage/admin_server");
AdminServer {
exporter,
metrics: AdminServerMetrics {
http_counter: meter
.u64_counter("admin.http_requests_total")
.with_description("Total number of HTTP requests made.")
.init()
.bind(&[]),
http_body_gauge: meter
.u64_value_recorder("admin.http_response_size_bytes")
.with_description("The metrics HTTP response sizes in bytes.")
.init()
.bind(&[]),
http_req_histogram: meter
.f64_value_recorder("admin.http_request_duration_seconds")
.with_description("The HTTP request latencies in seconds.")
.init()
.bind(&[]),
},
}
}
/// run execute the admin server on the designated HTTP port and listen for requests
pub async fn run(
self,
bind_addr: SocketAddr,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
let admin_server = Arc::new(self);
// For every connection, we must make a `Service` to handle all
// incoming HTTP requests on said connection.
let make_svc = make_service_fn(move |_conn| {
let admin_server = admin_server.clone();
// This is the `Service` that will handle the connection.
// `service_fn` is a helper to convert a function that
// returns a Response into a `Service`.
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer
.span_builder("admin/request")
.with_trace_id(gen_trace_id())
.start(&tracer);
serve_req(req, admin_server.clone())
.with_context(Context::current_with_span(span))
}))
}
});
let server = Server::bind(&bind_addr).serve(make_svc);
let graceful = server.with_graceful_shutdown(shutdown_signal);
info!("Admin server listening on http://{}", bind_addr);
graceful.await?;
Ok(())
}
}

View file

@ -54,6 +54,9 @@ quick-xml = { version = "0.21", features = [ "serialize" ] }
url = "2.1"
opentelemetry = "0.17"
opentelemetry-prometheus = "0.10"
opentelemetry-otlp = "0.10"
prometheus = "0.13"
[features]
k2v = [ "garage_util/k2v", "garage_model/k2v" ]

128
src/api/admin/api_server.rs Normal file
View file

@ -0,0 +1,128 @@
use std::sync::Arc;
use async_trait::async_trait;
use futures::future::Future;
use http::header::CONTENT_TYPE;
use hyper::{Body, Request, Response};
use opentelemetry::trace::{SpanRef, Tracer};
use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage;
use garage_util::error::Error as GarageError;
use crate::error::*;
use crate::generic_server::*;
use crate::admin::router::{Authorization, Endpoint};
pub struct AdminApiServer {
garage: Arc<Garage>,
exporter: PrometheusExporter,
metrics_token: Option<String>,
admin_token: Option<String>,
}
impl AdminApiServer {
pub fn new(garage: Arc<Garage>) -> Self {
let exporter = opentelemetry_prometheus::exporter().init();
let cfg = &garage.config.admin;
let metrics_token = cfg
.metrics_token
.as_ref()
.map(|tok| format!("Bearer {}", tok));
let admin_token = cfg
.admin_token
.as_ref()
.map(|tok| format!("Bearer {}", tok));
Self {
garage,
exporter,
metrics_token,
admin_token,
}
}
pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<(), GarageError> {
if let Some(bind_addr) = self.garage.config.admin.api_bind_addr {
let region = self.garage.config.s3_api.s3_region.clone();
ApiServer::new(region, self)
.run_server(bind_addr, shutdown_signal)
.await
} else {
Ok(())
}
}
fn handle_metrics(&self) -> Result<Response<Body>, Error> {
let mut buffer = vec![];
let encoder = TextEncoder::new();
let tracer = opentelemetry::global::tracer("garage");
let metric_families = tracer.in_span("admin/gather_metrics", |_| {
self.exporter.registry().gather()
});
encoder
.encode(&metric_families, &mut buffer)
.ok_or_internal_error("Could not serialize metrics")?;
Ok(Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))?)
}
}
#[async_trait]
impl ApiHandler for AdminApiServer {
const API_NAME: &'static str = "admin";
const API_NAME_DISPLAY: &'static str = "Admin";
type Endpoint = Endpoint;
fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> {
Endpoint::from_request(req)
}
async fn handle(
&self,
req: Request<Body>,
endpoint: Endpoint,
) -> Result<Response<Body>, Error> {
let expected_auth_header = match endpoint.authorization_type() {
Authorization::MetricsToken => self.metrics_token.as_ref(),
Authorization::AdminToken => self.admin_token.as_ref(),
};
if let Some(h) = expected_auth_header {
match req.headers().get("Authorization") {
None => Err(Error::Forbidden(
"Authorization token must be provided".into(),
)),
Some(v) if v.to_str().map(|hv| hv == h).unwrap_or(false) => Ok(()),
_ => Err(Error::Forbidden(
"Invalid authorization token provided".into(),
)),
}?;
}
match endpoint {
Endpoint::Metrics => self.handle_metrics(),
_ => Err(Error::NotImplemented(format!(
"Admin endpoint {} not implemented yet",
endpoint.name()
))),
}
}
}
impl ApiEndpoint for Endpoint {
fn name(&self) -> &'static str {
Endpoint::name(self)
}
fn add_span_attributes(&self, _span: SpanRef<'_>) {}
}

2
src/api/admin/mod.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod api_server;
mod router;

59
src/api/admin/router.rs Normal file
View file

@ -0,0 +1,59 @@
use crate::error::*;
use hyper::{Method, Request};
use crate::router_macros::router_match;
pub enum Authorization {
MetricsToken,
AdminToken,
}
router_match! {@func
/// List of all Admin API endpoints.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Endpoint {
Metrics,
Options,
GetClusterStatus,
GetClusterLayout,
UpdateClusterLayout,
ApplyClusterLayout,
RevertClusterLayout,
}}
impl Endpoint {
/// Determine which S3 endpoint a request is for using the request, and a bucket which was
/// possibly extracted from the Host header.
/// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
pub fn from_request<T>(req: &Request<T>) -> Result<Self, Error> {
let path = req.uri().path();
use Endpoint::*;
let res = match (req.method(), path) {
(&Method::OPTIONS, _) => Options,
(&Method::GET, "/metrics") => Metrics,
(&Method::GET, "/status") => GetClusterStatus,
(&Method::GET, "/layout") => GetClusterLayout,
(&Method::POST, "/layout") => UpdateClusterLayout,
(&Method::POST, "/layout/apply") => ApplyClusterLayout,
(&Method::POST, "/layout/revert") => RevertClusterLayout,
(m, p) => {
return Err(Error::BadRequest(format!(
"Unknown API endpoint: {} {}",
m, p
)))
}
};
Ok(res)
}
/// Get the kind of authorization which is required to perform the operation.
pub fn authorization_type(&self) -> Authorization {
match self {
Self::Metrics => Authorization::MetricsToken,
_ => Authorization::AdminToken,
}
}
}

View file

@ -12,6 +12,7 @@ mod router_macros;
/// This mode is public only to help testing. Don't expect stability here
pub mod signature;
pub mod admin;
#[cfg(feature = "k2v")]
pub mod k2v;
pub mod s3;

View file

@ -27,7 +27,6 @@ garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.7.0", path = "../web" }
garage_admin = { version = "0.7.0", path = "../admin" }
bytes = "1.0"
git-version = "0.3.4"
@ -54,6 +53,11 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
#netapp = { version = "0.4", path = "../../../netapp" }
netapp = "0.4"
opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
opentelemetry-prometheus = "0.10"
opentelemetry-otlp = "0.10"
prometheus = "0.13"
[dev-dependencies]
aws-sdk-s3 = "0.8"
chrono = "0.4"

View file

@ -8,6 +8,7 @@ mod admin;
mod cli;
mod repair;
mod server;
mod tracing_setup;
use std::net::SocketAddr;
use std::path::PathBuf;

View file

@ -6,8 +6,7 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
use garage_admin::metrics::*;
use garage_admin::tracing_setup::*;
use garage_api::admin::api_server::AdminApiServer;
use garage_api::s3::api_server::S3ApiServer;
use garage_model::garage::Garage;
use garage_web::run_web_server;
@ -16,6 +15,7 @@ use garage_web::run_web_server;
use garage_api::k2v::api_server::K2VApiServer;
use crate::admin::*;
use crate::tracing_setup::*;
async fn wait_from(mut chan: watch::Receiver<bool>) {
while !*chan.borrow() {
@ -39,9 +39,6 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
.open()
.expect("Unable to open sled DB");
info!("Initialize admin web server and metric backend...");
let admin_server_init = AdminServer::init();
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
@ -54,6 +51,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
init_tracing(&export_to, garage.system.id)?;
}
info!("Initialize Admin API server and metrics collector...");
let admin_server = AdminApiServer::new(garage.clone());
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");
@ -80,32 +80,32 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
wait_from(watch_cancel.clone()),
));
let admin_server = if let Some(admin_bind_addr) = config.admin.api_bind_addr {
info!("Configure and run admin web server...");
Some(tokio::spawn(
admin_server_init.run(admin_bind_addr, wait_from(watch_cancel.clone())),
))
} else {
None
};
info!("Initializing Admin server...");
let admin_server = tokio::spawn(admin_server.run(wait_from(watch_cancel.clone())));
// Stuff runs
// When a cancel signal is sent, stuff stops
if let Err(e) = s3_api_server.await? {
warn!("S3 API server exited with error: {}", e);
} else {
info!("S3 API server exited without error.");
}
#[cfg(feature = "k2v")]
if let Err(e) = k2v_api_server.await? {
warn!("K2V API server exited with error: {}", e);
} else {
info!("K2V API server exited without error.");
}
if let Err(e) = web_server.await? {
warn!("Web server exited with error: {}", e);
} else {
info!("Web server exited without error.");
}
if let Some(a) = admin_server {
if let Err(e) = a.await? {
if let Err(e) = admin_server.await? {
warn!("Admin web server exited with error: {}", e);
}
} else {
info!("Admin API server exited without error.");
}
// Remove RPC handlers for system to break reference cycles
@ -113,6 +113,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
// Await for netapp RPC system to end
run_system.await?;
info!("Netapp exited");
// Drop all references so that stuff can terminate properly
drop(garage);

View file

@ -15,7 +15,6 @@ path = "lib.rs"
[dependencies]
garage_util = { version = "0.7.0", path = "../util" }
garage_admin = { version = "0.7.0", path = "../admin" }
arc-swap = "1.0"
bytes = "1.0"

View file

@ -121,6 +121,10 @@ pub struct WebConfig {
pub struct AdminConfig {
/// Address and port to bind for admin API serving
pub api_bind_addr: Option<SocketAddr>,
/// Bearer token to use to scrape metrics
pub metrics_token: Option<String>,
/// Bearer token to use to access Admin API endpoints
pub admin_token: Option<String>,
/// OTLP server to where to export traces
pub trace_sink: Option<String>,
}