diff --git a/src/table/data.rs b/src/table/data.rs index 26cc3a5a..d8cd935e 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -2,6 +2,8 @@ use core::borrow::Borrow; use std::convert::TryInto; use std::sync::Arc; +use opentelemetry::Context; + use serde_bytes::ByteBuf; use tokio::sync::Notify; @@ -242,7 +244,9 @@ impl TableData { })?; if let Some((new_entry, new_bytes_hash)) = changed { - self.metrics.internal_update_counter.add(1); + self.metrics + .internal_update_counter + .add(&Context::current(), 1, &[]); let is_tombstone = new_entry.is_tombstone(); self.merkle_todo_notify.notify_one(); @@ -284,7 +288,9 @@ impl TableData { })?; if removed { - self.metrics.internal_delete_counter.add(1); + self.metrics + .internal_delete_counter + .add(&Context::current(), 1, &[]); self.merkle_todo_notify.notify_one(); } Ok(removed) @@ -312,7 +318,9 @@ impl TableData { })?; if removed { - self.metrics.internal_delete_counter.add(1); + self.metrics + .internal_delete_counter + .add(&Context::current(), 1, &[]); self.merkle_todo_notify.notify_one(); } Ok(removed) diff --git a/src/table/metrics.rs b/src/table/metrics.rs index 8318a84f..82cfcb68 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -1,22 +1,22 @@ -use opentelemetry::{global, metrics::*, KeyValue}; +use opentelemetry::{global, metrics::*}; use garage_db as db; use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct TableMetrics { - pub(crate) _table_size: ValueObserver, - pub(crate) _merkle_tree_size: ValueObserver, - pub(crate) _merkle_todo_len: ValueObserver, - pub(crate) _gc_todo_len: ValueObserver, + pub(crate) _table_size: ObservableGauge, + pub(crate) _merkle_tree_size: ObservableGauge, + pub(crate) _merkle_todo_len: ObservableGauge, + pub(crate) _gc_todo_len: ObservableGauge, - pub(crate) get_request_counter: BoundCounter, - pub(crate) get_request_duration: BoundValueRecorder, - pub(crate) put_request_counter: BoundCounter, - pub(crate) put_request_duration: BoundValueRecorder, + pub(crate) get_request_counter: Counter, + pub(crate) get_request_duration: Histogram, + pub(crate) put_request_counter: Counter, + pub(crate) put_request_duration: Histogram, - pub(crate) internal_update_counter: BoundCounter, - pub(crate) internal_delete_counter: BoundCounter, + pub(crate) internal_update_counter: Counter, + pub(crate) internal_delete_counter: Counter, pub(crate) sync_items_sent: Counter, pub(crate) sync_items_received: Counter, @@ -32,91 +32,47 @@ impl TableMetrics { let meter = global::meter(table_name); TableMetrics { _table_size: meter - .u64_value_observer( - "table.size", - move |observer| { - if let Ok(Some(v)) = store.fast_len() { - observer.observe( - v as u64, - &[KeyValue::new("table_name", table_name)], - ); - } - }, - ) + .u64_observable_gauge("table.size") .with_description("Number of items in table") .init(), _merkle_tree_size: meter - .u64_value_observer( - "table.merkle_tree_size", - move |observer| { - if let Ok(Some(v)) = merkle_tree.fast_len() { - observer.observe( - v as u64, - &[KeyValue::new("table_name", table_name)], - ); - } - }, - ) + .u64_observable_gauge("table.merkle_tree_size") .with_description("Number of nodes in table's Merkle tree") .init(), _merkle_todo_len: meter - .u64_value_observer( - "table.merkle_updater_todo_queue_length", - move |observer| { - if let Ok(v) = merkle_todo.len() { - observer.observe( - v as u64, - &[KeyValue::new("table_name", table_name)], - ); - } - }, - ) + .u64_observable_gauge("table.merkle_updater_todo_queue_length") .with_description("Merkle tree updater TODO queue length") .init(), _gc_todo_len: meter - .u64_value_observer( - "table.gc_todo_queue_length", - move |observer| { - observer.observe( - gc_todo.len() as u64, - &[KeyValue::new("table_name", table_name)], - ); - }, - ) + .u64_observable_gauge("table.gc_todo_queue_length") .with_description("Table garbage collector TODO queue length") .init(), get_request_counter: meter .u64_counter("table.get_request_counter") .with_description("Number of get/get_range requests internally made on this table") - .init() - .bind(&[KeyValue::new("table_name", table_name)]), + .init(), get_request_duration: meter - .f64_value_recorder("table.get_request_duration") + .f64_histogram("table.get_request_duration") .with_description("Duration of get/get_range requests internally made on this table, in seconds") - .init() - .bind(&[KeyValue::new("table_name", table_name)]), + .init(), put_request_counter: meter .u64_counter("table.put_request_counter") .with_description("Number of insert/insert_many requests internally made on this table") - .init() - .bind(&[KeyValue::new("table_name", table_name)]), + .init(), put_request_duration: meter - .f64_value_recorder("table.put_request_duration") + .f64_histogram("table.put_request_duration") .with_description("Duration of insert/insert_many requests internally made on this table, in seconds") - .init() - .bind(&[KeyValue::new("table_name", table_name)]), + .init(), internal_update_counter: meter .u64_counter("table.internal_update_counter") .with_description("Number of value updates where the value actually changes (includes creation of new key and update of existing key)") - .init() - .bind(&[KeyValue::new("table_name", table_name)]), + .init(), internal_delete_counter: meter .u64_counter("table.internal_delete_counter") .with_description("Number of value deletions in the tree (due to GC or repartitioning)") - .init() - .bind(&[KeyValue::new("table_name", table_name)]), + .init(), sync_items_sent: meter .u64_counter("table.sync_items_sent") diff --git a/src/table/sync.rs b/src/table/sync.rs index 92a353c6..5cc6b9a9 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -5,7 +5,7 @@ use std::time::{Duration, Instant}; use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures_util::stream::*; -use opentelemetry::KeyValue; +use opentelemetry::{Context, KeyValue}; use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -235,6 +235,7 @@ impl TableSyncer { for to in nodes.iter() { self.data.metrics.sync_items_sent.add( + &Context::current(), values.len() as u64, &[ KeyValue::new("table_name", F::TABLE_NAME), @@ -428,6 +429,7 @@ impl TableSyncer { .collect::>(); self.data.metrics.sync_items_sent.add( + &Context::current(), values.len() as u64, &[ KeyValue::new("table_name", F::TABLE_NAME), @@ -470,6 +472,7 @@ impl EndpointHandler for TableSync } SyncRpc::Items(items) => { self.data.metrics.sync_items_received.add( + &Context::current(), items.len() as u64, &[ KeyValue::new("table_name", F::TABLE_NAME), diff --git a/src/table/table.rs b/src/table/table.rs index 7ad79677..8ea2f09c 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -9,7 +9,7 @@ use serde_bytes::ByteBuf; use opentelemetry::{ trace::{FutureExt, TraceContextExt, Tracer}, - Context, + Context, KeyValue, }; use garage_db as db; @@ -110,7 +110,11 @@ impl Table { .with_context(Context::current_with_span(span)) .await?; - self.data.metrics.put_request_counter.add(1); + self.data.metrics.put_request_counter.add( + &Context::current(), + 1, + &[KeyValue::new("table_name", F::TABLE_NAME)], + ); Ok(()) } @@ -154,7 +158,11 @@ impl Table { .with_context(Context::current_with_span(span)) .await?; - self.data.metrics.put_request_counter.add(1); + self.data.metrics.put_request_counter.add( + &Context::current(), + 1, + &[KeyValue::new("table_name", F::TABLE_NAME)], + ); Ok(()) } @@ -220,7 +228,11 @@ impl Table { .with_context(Context::current_with_span(span)) .await?; - self.data.metrics.get_request_counter.add(1); + self.data.metrics.get_request_counter.add( + &Context::current(), + 1, + &[KeyValue::new("table_name", F::TABLE_NAME)], + ); Ok(res) } @@ -306,7 +318,11 @@ impl Table { .with_context(Context::current_with_span(span)) .await?; - self.data.metrics.get_request_counter.add(1); + self.data.metrics.get_request_counter.add( + &Context::current(), + 1, + &[KeyValue::new("table_name", F::TABLE_NAME)], + ); Ok(res) }