2022-12-05 17:43:48 +00:00
|
|
|
use std::convert::Infallible;
|
|
|
|
use std::net::SocketAddr;
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
|
|
|
use anyhow::Result;
|
|
|
|
use futures::future::*;
|
|
|
|
use log::*;
|
|
|
|
|
|
|
|
use hyper::{
|
|
|
|
header::CONTENT_TYPE,
|
|
|
|
service::{make_service_fn, service_fn},
|
|
|
|
Body, Method, Request, Response, Server,
|
|
|
|
};
|
|
|
|
use opentelemetry_prometheus::PrometheusExporter;
|
|
|
|
use prometheus::{Encoder, TextEncoder};
|
|
|
|
|
|
|
|
pub struct MetricsServer {
|
|
|
|
bind_addr: Option<SocketAddr>,
|
|
|
|
exporter: PrometheusExporter,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl MetricsServer {
|
|
|
|
pub fn init(bind_addr: Option<SocketAddr>) -> MetricsServer {
|
2022-12-06 14:02:35 +00:00
|
|
|
let exporter = opentelemetry_prometheus::exporter()
|
|
|
|
.with_default_summary_quantiles(vec![0.25, 0.5, 0.75, 0.9, 0.95, 0.99])
|
|
|
|
.with_default_histogram_boundaries(vec![
|
2022-12-07 10:13:43 +00:00
|
|
|
0.001, 0.0015, 0.002, 0.003, 0.005, 0.007, 0.01, 0.015, 0.02, 0.03, 0.05, 0.07,
|
|
|
|
0.1, 0.15, 0.2, 0.3, 0.5, 0.7, 1., 1.5, 2., 3., 5., 7., 10., 15., 20., 30., 40.,
|
|
|
|
50., 60., 70., 100.,
|
2022-12-06 14:02:35 +00:00
|
|
|
])
|
|
|
|
.init();
|
2022-12-05 17:43:48 +00:00
|
|
|
Self {
|
|
|
|
bind_addr,
|
|
|
|
exporter,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn run(self, shutdown_signal: impl Future<Output = ()>) -> Result<()> {
|
|
|
|
if let Some(addr) = self.bind_addr {
|
|
|
|
let metrics_server = Arc::new(self);
|
|
|
|
|
|
|
|
let make_svc = make_service_fn(move |_conn| {
|
|
|
|
let metrics_server = metrics_server.clone();
|
|
|
|
async move {
|
|
|
|
Ok::<_, Infallible>(service_fn(move |req| {
|
|
|
|
metrics_server.clone().serve_req(req)
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
let server = Server::bind(&addr).serve(make_svc);
|
|
|
|
let graceful = server.with_graceful_shutdown(shutdown_signal);
|
|
|
|
info!("Metrics server listening on http://{}", addr);
|
|
|
|
|
|
|
|
graceful.await?;
|
|
|
|
} else {
|
|
|
|
info!("Metrics server is disabled");
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn serve_req(
|
|
|
|
self: Arc<MetricsServer>,
|
|
|
|
req: Request<Body>,
|
|
|
|
) -> Result<Response<Body>, hyper::Error> {
|
|
|
|
debug!("{} {}", req.method(), req.uri());
|
|
|
|
|
|
|
|
let response = match (req.method(), req.uri().path()) {
|
|
|
|
(&Method::GET, "/metrics") => {
|
|
|
|
let mut buffer = vec![];
|
|
|
|
let encoder = TextEncoder::new();
|
|
|
|
let metric_families = self.exporter.registry().gather();
|
|
|
|
encoder.encode(&metric_families, &mut buffer).unwrap();
|
|
|
|
|
|
|
|
Response::builder()
|
|
|
|
.status(200)
|
|
|
|
.header(CONTENT_TYPE, encoder.format_type())
|
|
|
|
.body(Body::from(buffer))
|
|
|
|
.unwrap()
|
|
|
|
}
|
|
|
|
_ => Response::builder()
|
|
|
|
.status(404)
|
|
|
|
.body(Body::from("Not implemented"))
|
|
|
|
.unwrap(),
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok(response)
|
|
|
|
}
|
|
|
|
}
|