Add metrics to API endpoint
This commit is contained in:
parent
c4a9a8e0bb
commit
0d04799694
7 changed files with 135 additions and 38 deletions
|
@ -1,7 +1,7 @@
|
|||
use std::convert::Infallible;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use futures::future::*;
|
||||
use hyper::{
|
||||
|
|
|
@ -8,12 +8,15 @@ use hyper::service::{make_service_fn, service_fn};
|
|||
use hyper::{Body, Method, Request, Response, Server};
|
||||
|
||||
use opentelemetry::{
|
||||
global,
|
||||
metrics::{Counter, ValueRecorder},
|
||||
trace::{FutureExt, TraceContextExt, Tracer},
|
||||
Context, KeyValue,
|
||||
};
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error as GarageError;
|
||||
use garage_util::metrics::RecordDuration;
|
||||
|
||||
use garage_model::garage::Garage;
|
||||
use garage_model::key_table::Key;
|
||||
|
@ -35,6 +38,34 @@ use crate::s3_put::*;
|
|||
use crate::s3_router::{Authorization, Endpoint};
|
||||
use crate::s3_website::*;
|
||||
|
||||
struct ApiMetrics {
|
||||
request_counter: Counter<u64>,
|
||||
error_counter: Counter<u64>,
|
||||
request_duration: ValueRecorder<f64>,
|
||||
}
|
||||
|
||||
impl ApiMetrics {
|
||||
fn new() -> Self {
|
||||
let meter = global::meter("garage/api");
|
||||
Self {
|
||||
request_counter: meter
|
||||
.u64_counter("api.request_counter")
|
||||
.with_description("Number of API calls to the various S3 API endpoints")
|
||||
.init(),
|
||||
error_counter: meter
|
||||
.u64_counter("api.error_counter")
|
||||
.with_description(
|
||||
"Number of API calls to the various S3 API endpoints that resulted in errors",
|
||||
)
|
||||
.init(),
|
||||
request_duration: meter
|
||||
.f64_value_recorder("api.request_duration")
|
||||
.with_description("Duration of API calls to the various S3 API endpoints")
|
||||
.init(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the S3 API server
|
||||
pub async fn run_api_server(
|
||||
garage: Arc<Garage>,
|
||||
|
@ -42,30 +73,19 @@ pub async fn run_api_server(
|
|||
) -> Result<(), GarageError> {
|
||||
let addr = &garage.config.s3_api.api_bind_addr;
|
||||
|
||||
let metrics = Arc::new(ApiMetrics::new());
|
||||
|
||||
let service = make_service_fn(|conn: &AddrStream| {
|
||||
let garage = garage.clone();
|
||||
let metrics = metrics.clone();
|
||||
|
||||
let client_addr = conn.remote_addr();
|
||||
async move {
|
||||
Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
|
||||
let garage = garage.clone();
|
||||
let metrics = metrics.clone();
|
||||
|
||||
let tracer = opentelemetry::global::tracer("garage");
|
||||
let trace_id = gen_uuid();
|
||||
let span = tracer
|
||||
.span_builder("S3 API call (unknown)")
|
||||
.with_trace_id(
|
||||
opentelemetry::trace::TraceId::from_hex(&hex::encode(
|
||||
&trace_id.as_slice()[..16],
|
||||
))
|
||||
.unwrap(),
|
||||
)
|
||||
.with_attributes(vec![
|
||||
KeyValue::new("method", format!("{}", req.method())),
|
||||
KeyValue::new("uri", req.uri().path().to_string()),
|
||||
])
|
||||
.start(&tracer);
|
||||
|
||||
handler(garage, req, client_addr).with_context(Context::current_with_span(span))
|
||||
handler(garage, metrics, req, client_addr)
|
||||
}))
|
||||
}
|
||||
});
|
||||
|
@ -81,13 +101,33 @@ pub async fn run_api_server(
|
|||
|
||||
async fn handler(
|
||||
garage: Arc<Garage>,
|
||||
metrics: Arc<ApiMetrics>,
|
||||
req: Request<Body>,
|
||||
addr: SocketAddr,
|
||||
) -> Result<Response<Body>, GarageError> {
|
||||
let uri = req.uri().clone();
|
||||
info!("{} {} {}", addr, req.method(), uri);
|
||||
debug!("{:?}", req);
|
||||
match handler_inner(garage.clone(), req).await {
|
||||
|
||||
let tracer = opentelemetry::global::tracer("garage");
|
||||
let trace_id = gen_uuid();
|
||||
let span = tracer
|
||||
.span_builder("S3 API call (unknown)")
|
||||
.with_trace_id(
|
||||
opentelemetry::trace::TraceId::from_hex(&hex::encode(&trace_id.as_slice()[..16]))
|
||||
.unwrap(),
|
||||
)
|
||||
.with_attributes(vec![
|
||||
KeyValue::new("method", format!("{}", req.method())),
|
||||
KeyValue::new("uri", req.uri().path().to_string()),
|
||||
])
|
||||
.start(&tracer);
|
||||
|
||||
let res = handler_stage2(garage.clone(), metrics, req)
|
||||
.with_context(Context::current_with_span(span))
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(x) => {
|
||||
debug!("{} {:?}", x.status(), x.headers());
|
||||
Ok(x)
|
||||
|
@ -114,7 +154,11 @@ async fn handler(
|
|||
}
|
||||
}
|
||||
|
||||
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||
async fn handler_stage2(
|
||||
garage: Arc<Garage>,
|
||||
metrics: Arc<ApiMetrics>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let authority = req
|
||||
.headers()
|
||||
.get(header::HOST)
|
||||
|
@ -137,6 +181,37 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
|
|||
.span()
|
||||
.update_name::<String>(format!("S3 API {}", endpoint.name()));
|
||||
|
||||
let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())];
|
||||
|
||||
let res = handler_stage3(garage, req, endpoint, bucket_name)
|
||||
.record_duration(&metrics.request_duration, &metrics_tags[..])
|
||||
.await;
|
||||
|
||||
metrics.request_counter.add(1, &metrics_tags[..]);
|
||||
|
||||
let status_code = match &res {
|
||||
Ok(r) => r.status(),
|
||||
Err(e) => e.http_status_code(),
|
||||
};
|
||||
if status_code.is_client_error() || status_code.is_server_error() {
|
||||
metrics.error_counter.add(
|
||||
1,
|
||||
&[
|
||||
metrics_tags[0].clone(),
|
||||
KeyValue::new("status_code", status_code.as_str().to_string()),
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
async fn handler_stage3(
|
||||
garage: Arc<Garage>,
|
||||
req: Request<Body>,
|
||||
endpoint: Endpoint,
|
||||
bucket_name: Option<String>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
// Some endpoints are processed early, before we even check for an API key
|
||||
if let Endpoint::PostObject = endpoint {
|
||||
return handle_post_object(garage, req, bucket_name.unwrap()).await;
|
||||
|
|
|
@ -69,8 +69,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
|||
));
|
||||
|
||||
info!("Configure and run admin web server...");
|
||||
let admin_server =
|
||||
tokio::spawn(admin_server_init.run(config.admin_api.bind_addr, wait_from(watch_cancel.clone())));
|
||||
let admin_server = tokio::spawn(
|
||||
admin_server_init.run(config.admin_api.bind_addr, wait_from(watch_cancel.clone())),
|
||||
);
|
||||
|
||||
// Stuff runs
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::convert::TryInto;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration};
|
||||
use std::time::Duration;
|
||||
|
||||
use arc_swap::ArcSwapOption;
|
||||
use async_trait::async_trait;
|
||||
|
@ -21,9 +21,9 @@ use opentelemetry::{
|
|||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::*;
|
||||
use garage_util::metrics::RecordDuration;
|
||||
use garage_util::time::*;
|
||||
use garage_util::tranquilizer::Tranquilizer;
|
||||
use garage_util::metrics::RecordDuration;
|
||||
|
||||
use garage_rpc::system::System;
|
||||
use garage_rpc::*;
|
||||
|
@ -409,11 +409,14 @@ impl BlockManager {
|
|||
|
||||
/// Read block from disk, verifying it's integrity
|
||||
async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
|
||||
let data = self.read_block_internal(hash)
|
||||
let data = self
|
||||
.read_block_internal(hash)
|
||||
.bound_record_duration(&self.metrics.block_read_duration)
|
||||
.await?;
|
||||
|
||||
self.metrics.bytes_read.add(data.inner_buffer().len() as u64);
|
||||
self.metrics
|
||||
.bytes_read
|
||||
.add(data.inner_buffer().len() as u64);
|
||||
|
||||
Ok(BlockRpc::PutBlock { hash: *hash, data })
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Contain structs related to making RPCs
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::future::join_all;
|
||||
use futures::stream::futures_unordered::FuturesUnordered;
|
||||
|
@ -134,7 +134,7 @@ impl RpcHelper {
|
|||
M: Rpc<Response = Result<S, Error>>,
|
||||
H: EndpointHandler<M>,
|
||||
{
|
||||
let metric_tags = [KeyValue::new("endpoint", endpoint.path().to_string())];
|
||||
let metric_tags = [KeyValue::new("rpc_endpoint", endpoint.path().to_string())];
|
||||
|
||||
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
|
||||
let permit = self
|
||||
|
@ -147,7 +147,8 @@ impl RpcHelper {
|
|||
self.0.metrics.rpc_counter.add(1, &metric_tags);
|
||||
|
||||
let node_id = to.into();
|
||||
let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority)
|
||||
let rpc_call = endpoint
|
||||
.call(&node_id, msg, strat.rs_priority)
|
||||
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
|
||||
|
||||
select! {
|
||||
|
|
|
@ -161,7 +161,8 @@ where
|
|||
partition_key: &F::P,
|
||||
sort_key: &F::S,
|
||||
) -> Result<Option<F::E>, Error> {
|
||||
let res = self.get_internal(partition_key, sort_key)
|
||||
let res = self
|
||||
.get_internal(partition_key, sort_key)
|
||||
.bound_record_duration(&self.data.metrics.get_request_duration)
|
||||
.await?;
|
||||
self.data.metrics.get_request_counter.add(1);
|
||||
|
@ -232,7 +233,8 @@ where
|
|||
filter: Option<F::Filter>,
|
||||
limit: usize,
|
||||
) -> Result<Vec<F::E>, Error> {
|
||||
let res = self.get_range_internal(partition_key, begin_sort_key, filter, limit)
|
||||
let res = self
|
||||
.get_range_internal(partition_key, begin_sort_key, filter, limit)
|
||||
.bound_record_duration(&self.data.metrics.get_request_duration)
|
||||
.await?;
|
||||
self.data.metrics.get_request_counter.add(1);
|
||||
|
|
|
@ -2,26 +2,40 @@ use std::time::SystemTime;
|
|||
|
||||
use futures::{future::BoxFuture, Future, FutureExt};
|
||||
|
||||
use opentelemetry::{KeyValue, metrics::*};
|
||||
use opentelemetry::{metrics::*, KeyValue};
|
||||
|
||||
pub trait RecordDuration<'a>: 'a {
|
||||
type Output;
|
||||
|
||||
fn record_duration(self, r: &'a ValueRecorder<f64>, attributes: &'a [KeyValue]) -> BoxFuture<'a, Self::Output>;
|
||||
fn record_duration(
|
||||
self,
|
||||
r: &'a ValueRecorder<f64>,
|
||||
attributes: &'a [KeyValue],
|
||||
) -> BoxFuture<'a, Self::Output>;
|
||||
fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output>;
|
||||
}
|
||||
|
||||
impl<'a, T, O> RecordDuration<'a> for T
|
||||
where T: Future<Output=O> + Send + 'a {
|
||||
where
|
||||
T: Future<Output = O> + Send + 'a,
|
||||
{
|
||||
type Output = O;
|
||||
|
||||
fn record_duration(self, r: &'a ValueRecorder<f64>, attributes: &'a [KeyValue]) -> BoxFuture<'a, Self::Output> {
|
||||
fn record_duration(
|
||||
self,
|
||||
r: &'a ValueRecorder<f64>,
|
||||
attributes: &'a [KeyValue],
|
||||
) -> BoxFuture<'a, Self::Output> {
|
||||
async move {
|
||||
let request_start = SystemTime::now();
|
||||
let res = self.await;
|
||||
r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()), attributes);
|
||||
r.record(
|
||||
request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()),
|
||||
attributes,
|
||||
);
|
||||
res
|
||||
}.boxed()
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output> {
|
||||
|
@ -30,6 +44,7 @@ where T: Future<Output=O> + Send + 'a {
|
|||
let res = self.await;
|
||||
r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
|
||||
res
|
||||
}.boxed()
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue