WIP: add metrics to the metadata engine #853
2 changed files with 26 additions and 2 deletions
|
@ -9,7 +9,7 @@ use crate::{
|
||||||
};
|
};
|
||||||
use opentelemetry::{
|
use opentelemetry::{
|
||||||
global,
|
global,
|
||||||
metrics::{Counter, Unit, ValueRecorder},
|
metrics::{Unit, ValueRecorder},
|
||||||
KeyValue,
|
KeyValue,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ pub struct MetricDbProxy {
|
||||||
|
|
||||||
impl MetricDbProxy {
|
impl MetricDbProxy {
|
||||||
pub fn init(db: LmdbDb) -> Db {
|
pub fn init(db: LmdbDb) -> Db {
|
||||||
let meter = global::meter("garage/web");
|
let meter = global::meter("garage/db");
|
||||||
let s = Self {
|
let s = Self {
|
||||||
db,
|
db,
|
||||||
op: meter
|
op: meter
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::future::*;
|
use futures::future::*;
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
use opentelemetry::{global, metrics::ValueRecorder, KeyValue};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
|
@ -62,6 +64,7 @@ pub(crate) struct WorkerProcessor {
|
||||||
stop_signal: watch::Receiver<bool>,
|
stop_signal: watch::Receiver<bool>,
|
||||||
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
|
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
|
||||||
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
|
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
|
||||||
|
metrics: ValueRecorder<f64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WorkerProcessor {
|
impl WorkerProcessor {
|
||||||
|
@ -70,10 +73,15 @@ impl WorkerProcessor {
|
||||||
stop_signal: watch::Receiver<bool>,
|
stop_signal: watch::Receiver<bool>,
|
||||||
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
|
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
let meter = global::meter("garage/util");
|
||||||
Self {
|
Self {
|
||||||
stop_signal,
|
stop_signal,
|
||||||
worker_chan,
|
worker_chan,
|
||||||
worker_info,
|
worker_info,
|
||||||
|
metrics: meter
|
||||||
|
.f64_value_recorder("util.worker_step")
|
||||||
|
.with_description("Duration and amount of worker steps executed")
|
||||||
|
.init(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,6 +111,7 @@ impl WorkerProcessor {
|
||||||
errors: 0,
|
errors: 0,
|
||||||
consecutive_errors: 0,
|
consecutive_errors: 0,
|
||||||
last_error: None,
|
last_error: None,
|
||||||
|
metrics: self.metrics.clone(),
|
||||||
};
|
};
|
||||||
workers.push(async move {
|
workers.push(async move {
|
||||||
worker.step().await;
|
worker.step().await;
|
||||||
|
@ -183,10 +192,13 @@ struct WorkerHandler {
|
||||||
errors: usize,
|
errors: usize,
|
||||||
consecutive_errors: usize,
|
consecutive_errors: usize,
|
||||||
last_error: Option<(String, u64)>,
|
last_error: Option<(String, u64)>,
|
||||||
|
metrics: ValueRecorder<f64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WorkerHandler {
|
impl WorkerHandler {
|
||||||
async fn step(&mut self) {
|
async fn step(&mut self) {
|
||||||
|
let request_start = Instant::now();
|
||||||
|
//@FIXME we also want to track errors in metrics but I don't know how yet.
|
||||||
match self.state {
|
match self.state {
|
||||||
WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await {
|
WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await {
|
||||||
Ok(s) => {
|
Ok(s) => {
|
||||||
|
@ -229,5 +241,17 @@ impl WorkerHandler {
|
||||||
}
|
}
|
||||||
WorkerState::Done => unreachable!(),
|
WorkerState::Done => unreachable!(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// metrics
|
||||||
|
let metric_tags = [
|
||||||
|
KeyValue::new("state", self.state.to_string()),
|
||||||
|
KeyValue::new("name", self.worker.name()),
|
||||||
|
KeyValue::new("id", format!("{}", self.task_id)),
|
||||||
|
];
|
||||||
|
|
||||||
|
let delay_secs = Instant::now()
|
||||||
|
.saturating_duration_since(request_start)
|
||||||
|
.as_secs_f64();
|
||||||
|
self.metrics.record(delay_secs, &metric_tags);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue