From 0d0479969487f28b6877cb43e5137f2279d82cf6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 22 Feb 2022 14:52:41 +0100 Subject: [PATCH] Add metrics to API endpoint --- src/admin/metrics.rs | 2 +- src/api/api_server.rs | 113 +++++++++++++++++++++++++++++++++++------- src/garage/server.rs | 5 +- src/model/block.rs | 11 ++-- src/rpc/rpc_helper.rs | 7 +-- src/table/table.rs | 6 ++- src/util/metrics.rs | 29 ++++++++--- 7 files changed, 135 insertions(+), 38 deletions(-) diff --git a/src/admin/metrics.rs b/src/admin/metrics.rs index 64721af2..02549fe9 100644 --- a/src/admin/metrics.rs +++ b/src/admin/metrics.rs @@ -1,7 +1,7 @@ use std::convert::Infallible; +use std::net::SocketAddr; use std::sync::Arc; use std::time::SystemTime; -use std::net::SocketAddr; use futures::future::*; use hyper::{ diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 1bab3aaa..8502d9d8 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -8,12 +8,15 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server}; use opentelemetry::{ + global, + metrics::{Counter, ValueRecorder}, trace::{FutureExt, TraceContextExt, Tracer}, Context, KeyValue, }; use garage_util::data::*; use garage_util::error::Error as GarageError; +use garage_util::metrics::RecordDuration; use garage_model::garage::Garage; use garage_model::key_table::Key; @@ -35,6 +38,34 @@ use crate::s3_put::*; use crate::s3_router::{Authorization, Endpoint}; use crate::s3_website::*; +struct ApiMetrics { + request_counter: Counter, + error_counter: Counter, + request_duration: ValueRecorder, +} + +impl ApiMetrics { + fn new() -> Self { + let meter = global::meter("garage/api"); + Self { + request_counter: meter + .u64_counter("api.request_counter") + .with_description("Number of API calls to the various S3 API endpoints") + .init(), + error_counter: meter + .u64_counter("api.error_counter") + .with_description( + "Number of API calls to the various S3 API endpoints that resulted in errors", + ) + .init(), + request_duration: meter + .f64_value_recorder("api.request_duration") + .with_description("Duration of API calls to the various S3 API endpoints") + .init(), + } + } +} + /// Run the S3 API server pub async fn run_api_server( garage: Arc, @@ -42,30 +73,19 @@ pub async fn run_api_server( ) -> Result<(), GarageError> { let addr = &garage.config.s3_api.api_bind_addr; + let metrics = Arc::new(ApiMetrics::new()); + let service = make_service_fn(|conn: &AddrStream| { let garage = garage.clone(); + let metrics = metrics.clone(); + let client_addr = conn.remote_addr(); async move { Ok::<_, GarageError>(service_fn(move |req: Request| { let garage = garage.clone(); + let metrics = metrics.clone(); - let tracer = opentelemetry::global::tracer("garage"); - let trace_id = gen_uuid(); - let span = tracer - .span_builder("S3 API call (unknown)") - .with_trace_id( - opentelemetry::trace::TraceId::from_hex(&hex::encode( - &trace_id.as_slice()[..16], - )) - .unwrap(), - ) - .with_attributes(vec![ - KeyValue::new("method", format!("{}", req.method())), - KeyValue::new("uri", req.uri().path().to_string()), - ]) - .start(&tracer); - - handler(garage, req, client_addr).with_context(Context::current_with_span(span)) + handler(garage, metrics, req, client_addr) })) } }); @@ -81,13 +101,33 @@ pub async fn run_api_server( async fn handler( garage: Arc, + metrics: Arc, req: Request, addr: SocketAddr, ) -> Result, GarageError> { let uri = req.uri().clone(); info!("{} {} {}", addr, req.method(), uri); debug!("{:?}", req); - match handler_inner(garage.clone(), req).await { + + let tracer = opentelemetry::global::tracer("garage"); + let trace_id = gen_uuid(); + let span = tracer + .span_builder("S3 API call (unknown)") + .with_trace_id( + opentelemetry::trace::TraceId::from_hex(&hex::encode(&trace_id.as_slice()[..16])) + .unwrap(), + ) + .with_attributes(vec![ + KeyValue::new("method", format!("{}", req.method())), + KeyValue::new("uri", req.uri().path().to_string()), + ]) + .start(&tracer); + + let res = handler_stage2(garage.clone(), metrics, req) + .with_context(Context::current_with_span(span)) + .await; + + match res { Ok(x) => { debug!("{} {:?}", x.status(), x.headers()); Ok(x) @@ -114,7 +154,11 @@ async fn handler( } } -async fn handler_inner(garage: Arc, req: Request) -> Result, Error> { +async fn handler_stage2( + garage: Arc, + metrics: Arc, + req: Request, +) -> Result, Error> { let authority = req .headers() .get(header::HOST) @@ -137,6 +181,37 @@ async fn handler_inner(garage: Arc, req: Request) -> Result(format!("S3 API {}", endpoint.name())); + let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())]; + + let res = handler_stage3(garage, req, endpoint, bucket_name) + .record_duration(&metrics.request_duration, &metrics_tags[..]) + .await; + + metrics.request_counter.add(1, &metrics_tags[..]); + + let status_code = match &res { + Ok(r) => r.status(), + Err(e) => e.http_status_code(), + }; + if status_code.is_client_error() || status_code.is_server_error() { + metrics.error_counter.add( + 1, + &[ + metrics_tags[0].clone(), + KeyValue::new("status_code", status_code.as_str().to_string()), + ], + ); + } + + res +} + +async fn handler_stage3( + garage: Arc, + req: Request, + endpoint: Endpoint, + bucket_name: Option, +) -> Result, Error> { // Some endpoints are processed early, before we even check for an API key if let Endpoint::PostObject = endpoint { return handle_post_object(garage, req, bucket_name.unwrap()).await; diff --git a/src/garage/server.rs b/src/garage/server.rs index 739dedbe..b11f8417 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -69,8 +69,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { )); info!("Configure and run admin web server..."); - let admin_server = - tokio::spawn(admin_server_init.run(config.admin_api.bind_addr, wait_from(watch_cancel.clone()))); + let admin_server = tokio::spawn( + admin_server_init.run(config.admin_api.bind_addr, wait_from(watch_cancel.clone())), + ); // Stuff runs diff --git a/src/model/block.rs b/src/model/block.rs index 058c71fd..d97e64a8 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -1,7 +1,7 @@ use std::convert::TryInto; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::{Duration}; +use std::time::Duration; use arc_swap::ArcSwapOption; use async_trait::async_trait; @@ -21,9 +21,9 @@ use opentelemetry::{ use garage_util::data::*; use garage_util::error::*; +use garage_util::metrics::RecordDuration; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; -use garage_util::metrics::RecordDuration; use garage_rpc::system::System; use garage_rpc::*; @@ -409,11 +409,14 @@ impl BlockManager { /// Read block from disk, verifying it's integrity async fn read_block(&self, hash: &Hash) -> Result { - let data = self.read_block_internal(hash) + let data = self + .read_block_internal(hash) .bound_record_duration(&self.metrics.block_read_duration) .await?; - self.metrics.bytes_read.add(data.inner_buffer().len() as u64); + self.metrics + .bytes_read + .add(data.inner_buffer().len() as u64); Ok(BlockRpc::PutBlock { hash: *hash, data }) } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index f8bef47f..099c6429 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -1,6 +1,6 @@ //! Contain structs related to making RPCs use std::sync::Arc; -use std::time::{Duration}; +use std::time::Duration; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; @@ -134,7 +134,7 @@ impl RpcHelper { M: Rpc>, H: EndpointHandler, { - let metric_tags = [KeyValue::new("endpoint", endpoint.path().to_string())]; + let metric_tags = [KeyValue::new("rpc_endpoint", endpoint.path().to_string())]; let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; let permit = self @@ -147,7 +147,8 @@ impl RpcHelper { self.0.metrics.rpc_counter.add(1, &metric_tags); let node_id = to.into(); - let rpc_call = endpoint.call(&node_id, msg, strat.rs_priority) + let rpc_call = endpoint + .call(&node_id, msg, strat.rs_priority) .record_duration(&self.0.metrics.rpc_duration, &metric_tags); select! { diff --git a/src/table/table.rs b/src/table/table.rs index 9ba243c0..69cac41a 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -161,7 +161,8 @@ where partition_key: &F::P, sort_key: &F::S, ) -> Result, Error> { - let res = self.get_internal(partition_key, sort_key) + let res = self + .get_internal(partition_key, sort_key) .bound_record_duration(&self.data.metrics.get_request_duration) .await?; self.data.metrics.get_request_counter.add(1); @@ -232,7 +233,8 @@ where filter: Option, limit: usize, ) -> Result, Error> { - let res = self.get_range_internal(partition_key, begin_sort_key, filter, limit) + let res = self + .get_range_internal(partition_key, begin_sort_key, filter, limit) .bound_record_duration(&self.data.metrics.get_request_duration) .await?; self.data.metrics.get_request_counter.add(1); diff --git a/src/util/metrics.rs b/src/util/metrics.rs index b3b1fc3c..cd5aa182 100644 --- a/src/util/metrics.rs +++ b/src/util/metrics.rs @@ -2,26 +2,40 @@ use std::time::SystemTime; use futures::{future::BoxFuture, Future, FutureExt}; -use opentelemetry::{KeyValue, metrics::*}; +use opentelemetry::{metrics::*, KeyValue}; pub trait RecordDuration<'a>: 'a { type Output; - fn record_duration(self, r: &'a ValueRecorder, attributes: &'a [KeyValue]) -> BoxFuture<'a, Self::Output>; + fn record_duration( + self, + r: &'a ValueRecorder, + attributes: &'a [KeyValue], + ) -> BoxFuture<'a, Self::Output>; fn bound_record_duration(self, r: &'a BoundValueRecorder) -> BoxFuture<'a, Self::Output>; } impl<'a, T, O> RecordDuration<'a> for T -where T: Future + Send + 'a { +where + T: Future + Send + 'a, +{ type Output = O; - fn record_duration(self, r: &'a ValueRecorder, attributes: &'a [KeyValue]) -> BoxFuture<'a, Self::Output> { + fn record_duration( + self, + r: &'a ValueRecorder, + attributes: &'a [KeyValue], + ) -> BoxFuture<'a, Self::Output> { async move { let request_start = SystemTime::now(); let res = self.await; - r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()), attributes); + r.record( + request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()), + attributes, + ); res - }.boxed() + } + .boxed() } fn bound_record_duration(self, r: &'a BoundValueRecorder) -> BoxFuture<'a, Self::Output> { @@ -30,6 +44,7 @@ where T: Future + Send + 'a { let res = self.await; r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); res - }.boxed() + } + .boxed() } }