Add metrics to API endpoint

This commit is contained in:
Alex 2022-02-22 14:52:41 +01:00
parent 818daa5c78
commit 2a5609b292
Signed by: lx
GPG key ID: 0E496D15096376BE
7 changed files with 135 additions and 38 deletions

View file

@ -1,7 +1,7 @@
use std::convert::Infallible; use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::SystemTime; use std::time::SystemTime;
use std::net::SocketAddr;
use futures::future::*; use futures::future::*;
use hyper::{ use hyper::{

View file

@ -8,12 +8,15 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server}; use hyper::{Body, Method, Request, Response, Server};
use opentelemetry::{ use opentelemetry::{
global,
metrics::{Counter, ValueRecorder},
trace::{FutureExt, TraceContextExt, Tracer}, trace::{FutureExt, TraceContextExt, Tracer},
Context, KeyValue, Context, KeyValue,
}; };
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_util::metrics::RecordDuration;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::key_table::Key; use garage_model::key_table::Key;
@ -35,6 +38,34 @@ use crate::s3_put::*;
use crate::s3_router::{Authorization, Endpoint}; use crate::s3_router::{Authorization, Endpoint};
use crate::s3_website::*; use crate::s3_website::*;
struct ApiMetrics {
request_counter: Counter<u64>,
error_counter: Counter<u64>,
request_duration: ValueRecorder<f64>,
}
impl ApiMetrics {
fn new() -> Self {
let meter = global::meter("garage/api");
Self {
request_counter: meter
.u64_counter("api.request_counter")
.with_description("Number of API calls to the various S3 API endpoints")
.init(),
error_counter: meter
.u64_counter("api.error_counter")
.with_description(
"Number of API calls to the various S3 API endpoints that resulted in errors",
)
.init(),
request_duration: meter
.f64_value_recorder("api.request_duration")
.with_description("Duration of API calls to the various S3 API endpoints")
.init(),
}
}
}
/// Run the S3 API server /// Run the S3 API server
pub async fn run_api_server( pub async fn run_api_server(
garage: Arc<Garage>, garage: Arc<Garage>,
@ -42,30 +73,19 @@ pub async fn run_api_server(
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {
let addr = &garage.config.s3_api.api_bind_addr; let addr = &garage.config.s3_api.api_bind_addr;
let metrics = Arc::new(ApiMetrics::new());
let service = make_service_fn(|conn: &AddrStream| { let service = make_service_fn(|conn: &AddrStream| {
let garage = garage.clone(); let garage = garage.clone();
let metrics = metrics.clone();
let client_addr = conn.remote_addr(); let client_addr = conn.remote_addr();
async move { async move {
Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
let garage = garage.clone(); let garage = garage.clone();
let metrics = metrics.clone();
let tracer = opentelemetry::global::tracer("garage"); handler(garage, metrics, req, client_addr)
let trace_id = gen_uuid();
let span = tracer
.span_builder("S3 API call (unknown)")
.with_trace_id(
opentelemetry::trace::TraceId::from_hex(&hex::encode(
&trace_id.as_slice()[..16],
))
.unwrap(),
)
.with_attributes(vec![
KeyValue::new("method", format!("{}", req.method())),
KeyValue::new("uri", req.uri().path().to_string()),
])
.start(&tracer);
handler(garage, req, client_addr).with_context(Context::current_with_span(span))
})) }))
} }
}); });
@ -81,13 +101,33 @@ pub async fn run_api_server(
async fn handler( async fn handler(
garage: Arc<Garage>, garage: Arc<Garage>,
metrics: Arc<ApiMetrics>,
req: Request<Body>, req: Request<Body>,
addr: SocketAddr, addr: SocketAddr,
) -> Result<Response<Body>, GarageError> { ) -> Result<Response<Body>, GarageError> {
let uri = req.uri().clone(); let uri = req.uri().clone();
info!("{} {} {}", addr, req.method(), uri); info!("{} {} {}", addr, req.method(), uri);
debug!("{:?}", req); debug!("{:?}", req);
match handler_inner(garage.clone(), req).await {
let tracer = opentelemetry::global::tracer("garage");
let trace_id = gen_uuid();
let span = tracer
.span_builder("S3 API call (unknown)")
.with_trace_id(
opentelemetry::trace::TraceId::from_hex(&hex::encode(&trace_id.as_slice()[..16]))
.unwrap(),
)
.with_attributes(vec![
KeyValue::new("method", format!("{}", req.method())),
KeyValue::new("uri", req.uri().path().to_string()),
])
.start(&tracer);
let res = handler_stage2(garage.clone(), metrics, req)
.with_context(Context::current_with_span(span))
.await;
match res {
Ok(x) => { Ok(x) => {
debug!("{} {:?}", x.status(), x.headers()); debug!("{} {:?}", x.status(), x.headers());
Ok(x) Ok(x)
@ -114,7 +154,11 @@ async fn handler(
} }
} }
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> { async fn handler_stage2(
garage: Arc<Garage>,
metrics: Arc<ApiMetrics>,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
let authority = req let authority = req
.headers() .headers()
.get(header::HOST) .get(header::HOST)
@ -137,6 +181,37 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
.span() .span()
.update_name::<String>(format!("S3 API {}", endpoint.name())); .update_name::<String>(format!("S3 API {}", endpoint.name()));
let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())];
let res = handler_stage3(garage, req, endpoint, bucket_name)
.record_duration(&metrics.request_duration, &metrics_tags[..])
.await;
metrics.request_counter.add(1, &metrics_tags[..]);
let status_code = match &res {
Ok(r) => r.status(),
Err(e) => e.http_status_code(),
};
if status_code.is_client_error() || status_code.is_server_error() {
metrics.error_counter.add(
1,
&[
metrics_tags[0].clone(),
KeyValue::new("status_code", status_code.as_str().to_string()),
],
);
}
res
}
async fn handler_stage3(
garage: Arc<Garage>,
req: Request<Body>,
endpoint: Endpoint,
bucket_name: Option<String>,
) -> Result<Response<Body>, Error> {
// Some endpoints are processed early, before we even check for an API key // Some endpoints are processed early, before we even check for an API key
if let Endpoint::PostObject = endpoint { if let Endpoint::PostObject = endpoint {
return handle_post_object(garage, req, bucket_name.unwrap()).await; return handle_post_object(garage, req, bucket_name.unwrap()).await;

View file

@ -69,8 +69,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
)); ));
info!("Configure and run admin web server..."); info!("Configure and run admin web server...");
let admin_server = let admin_server = tokio::spawn(
tokio::spawn(admin_server_init.run(config.admin_api.bind_addr, wait_from(watch_cancel.clone()))); admin_server_init.run(config.admin_api.bind_addr, wait_from(watch_cancel.clone())),
);
// Stuff runs // Stuff runs

View file

@ -1,7 +1,7 @@
use std::convert::TryInto; use std::convert::TryInto;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration}; use std::time::Duration;
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use async_trait::async_trait; use async_trait::async_trait;
@ -21,9 +21,9 @@ use opentelemetry::{
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::metrics::RecordDuration;
use garage_util::time::*; use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
use garage_util::metrics::RecordDuration;
use garage_rpc::system::System; use garage_rpc::system::System;
use garage_rpc::*; use garage_rpc::*;
@ -409,11 +409,14 @@ impl BlockManager {
/// Read block from disk, verifying it's integrity /// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let data = self.read_block_internal(hash) let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration) .bound_record_duration(&self.metrics.block_read_duration)
.await?; .await?;
self.metrics.bytes_read.add(data.inner_buffer().len() as u64); self.metrics
.bytes_read
.add(data.inner_buffer().len() as u64);
Ok(BlockRpc::PutBlock { hash: *hash, data }) Ok(BlockRpc::PutBlock { hash: *hash, data })
} }

View file

@ -1,6 +1,6 @@
//! Contain structs related to making RPCs //! Contain structs related to making RPCs
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration}; use std::time::Duration;
use futures::future::join_all; use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::futures_unordered::FuturesUnordered;
@ -134,7 +134,7 @@ impl RpcHelper {
M: Rpc<Response = Result<S, Error>>, M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>, H: EndpointHandler<M>,
{ {
let metric_tags = [KeyValue::new("endpoint", endpoint.path().to_string())]; let metric_tags = [KeyValue::new("rpc_endpoint", endpoint.path().to_string())];
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
let permit = self let permit = self
@ -147,7 +147,8 @@ impl RpcHelper {
self.0.metrics.rpc_counter.add(1, &metric_tags); self.0.metrics.rpc_counter.add(1, &metric_tags);
let node_id = to.into(); let node_id = to.into();
let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority) let rpc_call = endpoint
.call(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags); .record_duration(&self.0.metrics.rpc_duration, &metric_tags);
select! { select! {

View file

@ -161,7 +161,8 @@ where
partition_key: &F::P, partition_key: &F::P,
sort_key: &F::S, sort_key: &F::S,
) -> Result<Option<F::E>, Error> { ) -> Result<Option<F::E>, Error> {
let res = self.get_internal(partition_key, sort_key) let res = self
.get_internal(partition_key, sort_key)
.bound_record_duration(&self.data.metrics.get_request_duration) .bound_record_duration(&self.data.metrics.get_request_duration)
.await?; .await?;
self.data.metrics.get_request_counter.add(1); self.data.metrics.get_request_counter.add(1);
@ -232,7 +233,8 @@ where
filter: Option<F::Filter>, filter: Option<F::Filter>,
limit: usize, limit: usize,
) -> Result<Vec<F::E>, Error> { ) -> Result<Vec<F::E>, Error> {
let res = self.get_range_internal(partition_key, begin_sort_key, filter, limit) let res = self
.get_range_internal(partition_key, begin_sort_key, filter, limit)
.bound_record_duration(&self.data.metrics.get_request_duration) .bound_record_duration(&self.data.metrics.get_request_duration)
.await?; .await?;
self.data.metrics.get_request_counter.add(1); self.data.metrics.get_request_counter.add(1);

View file

@ -2,26 +2,40 @@ use std::time::SystemTime;
use futures::{future::BoxFuture, Future, FutureExt}; use futures::{future::BoxFuture, Future, FutureExt};
use opentelemetry::{KeyValue, metrics::*}; use opentelemetry::{metrics::*, KeyValue};
pub trait RecordDuration<'a>: 'a { pub trait RecordDuration<'a>: 'a {
type Output; type Output;
fn record_duration(self, r: &'a ValueRecorder<f64>, attributes: &'a [KeyValue]) -> BoxFuture<'a, Self::Output>; fn record_duration(
self,
r: &'a ValueRecorder<f64>,
attributes: &'a [KeyValue],
) -> BoxFuture<'a, Self::Output>;
fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output>; fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output>;
} }
impl<'a, T, O> RecordDuration<'a> for T impl<'a, T, O> RecordDuration<'a> for T
where T: Future<Output=O> + Send + 'a { where
T: Future<Output = O> + Send + 'a,
{
type Output = O; type Output = O;
fn record_duration(self, r: &'a ValueRecorder<f64>, attributes: &'a [KeyValue]) -> BoxFuture<'a, Self::Output> { fn record_duration(
self,
r: &'a ValueRecorder<f64>,
attributes: &'a [KeyValue],
) -> BoxFuture<'a, Self::Output> {
async move { async move {
let request_start = SystemTime::now(); let request_start = SystemTime::now();
let res = self.await; let res = self.await;
r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()), attributes); r.record(
request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()),
attributes,
);
res res
}.boxed() }
.boxed()
} }
fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output> { fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output> {
@ -30,6 +44,7 @@ where T: Future<Output=O> + Send + 'a {
let res = self.await; let res = self.await;
r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
res res
}.boxed() }
.boxed()
} }
} }