tricot/src/metrics.rs

111 lines
2.9 KiB
Rust

use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use anyhow::Result;
use futures::future::*;
use tracing::*;
use hyper::{
header::CONTENT_TYPE,
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server,
};
use opentelemetry::sdk::metrics;
use prometheus::{Encoder, TextEncoder};
pub struct MetricsServer {
bind_addr: Option<SocketAddr>,
registry: prometheus::Registry,
}
impl MetricsServer {
pub fn init(bind_addr: Option<SocketAddr>) -> MetricsServer {
let registry = prometheus::Registry::new();
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.with_aggregation_selector(AggregationSelector)
.without_counter_suffixes()
.build()
.expect("build prometheus registry");
let mp = metrics::MeterProvider::builder()
.with_reader(exporter)
.build();
opentelemetry::global::set_meter_provider(mp);
Self {
bind_addr,
registry,
}
}
pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<()> {
if let Some(addr) = self.bind_addr {
let metrics_server = Arc::new(self);
let make_svc = make_service_fn(move |_conn| {
let metrics_server = metrics_server.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
metrics_server.clone().serve_req(req)
}))
}
});
let server = Server::bind(&addr).serve(make_svc);
let graceful = server.with_graceful_shutdown(shutdown_signal);
info!("Metrics server listening on http://{}", addr);
graceful.await?;
} else {
info!("Metrics server is disabled");
}
Ok(())
}
async fn serve_req(
self: Arc<MetricsServer>,
req: Request<Body>,
) -> Result<Response<Body>, hyper::Error> {
debug!("{} {}", req.method(), req.uri());
let response = match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
let mut buffer = vec![];
let encoder = TextEncoder::new();
let metric_families = self.registry.gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}
_ => Response::builder()
.status(404)
.body(Body::from("Not implemented"))
.unwrap(),
};
Ok(response)
}
}
struct AggregationSelector;
impl metrics::reader::AggregationSelector for AggregationSelector {
fn aggregation(&self, kind: metrics::InstrumentKind) -> metrics::Aggregation {
match kind {
metrics::InstrumentKind::Histogram => metrics::Aggregation::ExplicitBucketHistogram {
boundaries: vec![
0.001, 0.0015, 0.002, 0.003, 0.005, 0.007, 0.01, 0.015, 0.02, 0.03, 0.05, 0.07,
0.1, 0.15, 0.2, 0.3, 0.5, 0.7, 1., 1.5, 2., 3., 5., 7., 10., 15., 20., 30.,
40., 50., 60., 70., 100.,
],
record_min_max: true,
},
_ => metrics::reader::DefaultAggregationSelector::new().aggregation(kind),
}
}
}