table: Ported to opentelemetry 0.18.

This commit is contained in:
Jonathan Davies 2023-01-27 17:29:32 +00:00
parent 1e6e8db264
commit 2d584889f2
4 changed files with 59 additions and 76 deletions

View file

@ -2,6 +2,8 @@ use core::borrow::Borrow;
use std::convert::TryInto; use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use opentelemetry::Context;
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
use tokio::sync::Notify; use tokio::sync::Notify;
@ -242,7 +244,9 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
})?; })?;
if let Some((new_entry, new_bytes_hash)) = changed { if let Some((new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1); self.metrics
.internal_update_counter
.add(&Context::current(), 1, &[]);
let is_tombstone = new_entry.is_tombstone(); let is_tombstone = new_entry.is_tombstone();
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
@ -284,7 +288,9 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
})?; })?;
if removed { if removed {
self.metrics.internal_delete_counter.add(1); self.metrics
.internal_delete_counter
.add(&Context::current(), 1, &[]);
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
} }
Ok(removed) Ok(removed)
@ -312,7 +318,9 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
})?; })?;
if removed { if removed {
self.metrics.internal_delete_counter.add(1); self.metrics
.internal_delete_counter
.add(&Context::current(), 1, &[]);
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
} }
Ok(removed) Ok(removed)

View file

@ -1,22 +1,22 @@
use opentelemetry::{global, metrics::*, KeyValue}; use opentelemetry::{global, metrics::*};
use garage_db as db; use garage_db as db;
use garage_db::counted_tree_hack::CountedTree; use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics /// TableMetrics reference all counter used for metrics
pub struct TableMetrics { pub struct TableMetrics {
pub(crate) _table_size: ValueObserver<u64>, pub(crate) _table_size: ObservableGauge<u64>,
pub(crate) _merkle_tree_size: ValueObserver<u64>, pub(crate) _merkle_tree_size: ObservableGauge<u64>,
pub(crate) _merkle_todo_len: ValueObserver<u64>, pub(crate) _merkle_todo_len: ObservableGauge<u64>,
pub(crate) _gc_todo_len: ValueObserver<u64>, pub(crate) _gc_todo_len: ObservableGauge<u64>,
pub(crate) get_request_counter: BoundCounter<u64>, pub(crate) get_request_counter: Counter<u64>,
pub(crate) get_request_duration: BoundValueRecorder<f64>, pub(crate) get_request_duration: Histogram<f64>,
pub(crate) put_request_counter: BoundCounter<u64>, pub(crate) put_request_counter: Counter<u64>,
pub(crate) put_request_duration: BoundValueRecorder<f64>, pub(crate) put_request_duration: Histogram<f64>,
pub(crate) internal_update_counter: BoundCounter<u64>, pub(crate) internal_update_counter: Counter<u64>,
pub(crate) internal_delete_counter: BoundCounter<u64>, pub(crate) internal_delete_counter: Counter<u64>,
pub(crate) sync_items_sent: Counter<u64>, pub(crate) sync_items_sent: Counter<u64>,
pub(crate) sync_items_received: Counter<u64>, pub(crate) sync_items_received: Counter<u64>,
@ -32,91 +32,47 @@ impl TableMetrics {
let meter = global::meter(table_name); let meter = global::meter(table_name);
TableMetrics { TableMetrics {
_table_size: meter _table_size: meter
.u64_value_observer( .u64_observable_gauge("table.size")
"table.size",
move |observer| {
if let Ok(Some(v)) = store.fast_len() {
observer.observe(
v as u64,
&[KeyValue::new("table_name", table_name)],
);
}
},
)
.with_description("Number of items in table") .with_description("Number of items in table")
.init(), .init(),
_merkle_tree_size: meter _merkle_tree_size: meter
.u64_value_observer( .u64_observable_gauge("table.merkle_tree_size")
"table.merkle_tree_size",
move |observer| {
if let Ok(Some(v)) = merkle_tree.fast_len() {
observer.observe(
v as u64,
&[KeyValue::new("table_name", table_name)],
);
}
},
)
.with_description("Number of nodes in table's Merkle tree") .with_description("Number of nodes in table's Merkle tree")
.init(), .init(),
_merkle_todo_len: meter _merkle_todo_len: meter
.u64_value_observer( .u64_observable_gauge("table.merkle_updater_todo_queue_length")
"table.merkle_updater_todo_queue_length",
move |observer| {
if let Ok(v) = merkle_todo.len() {
observer.observe(
v as u64,
&[KeyValue::new("table_name", table_name)],
);
}
},
)
.with_description("Merkle tree updater TODO queue length") .with_description("Merkle tree updater TODO queue length")
.init(), .init(),
_gc_todo_len: meter _gc_todo_len: meter
.u64_value_observer( .u64_observable_gauge("table.gc_todo_queue_length")
"table.gc_todo_queue_length",
move |observer| {
observer.observe(
gc_todo.len() as u64,
&[KeyValue::new("table_name", table_name)],
);
},
)
.with_description("Table garbage collector TODO queue length") .with_description("Table garbage collector TODO queue length")
.init(), .init(),
get_request_counter: meter get_request_counter: meter
.u64_counter("table.get_request_counter") .u64_counter("table.get_request_counter")
.with_description("Number of get/get_range requests internally made on this table") .with_description("Number of get/get_range requests internally made on this table")
.init() .init(),
.bind(&[KeyValue::new("table_name", table_name)]),
get_request_duration: meter get_request_duration: meter
.f64_value_recorder("table.get_request_duration") .f64_histogram("table.get_request_duration")
.with_description("Duration of get/get_range requests internally made on this table, in seconds") .with_description("Duration of get/get_range requests internally made on this table, in seconds")
.init() .init(),
.bind(&[KeyValue::new("table_name", table_name)]),
put_request_counter: meter put_request_counter: meter
.u64_counter("table.put_request_counter") .u64_counter("table.put_request_counter")
.with_description("Number of insert/insert_many requests internally made on this table") .with_description("Number of insert/insert_many requests internally made on this table")
.init() .init(),
.bind(&[KeyValue::new("table_name", table_name)]),
put_request_duration: meter put_request_duration: meter
.f64_value_recorder("table.put_request_duration") .f64_histogram("table.put_request_duration")
.with_description("Duration of insert/insert_many requests internally made on this table, in seconds") .with_description("Duration of insert/insert_many requests internally made on this table, in seconds")
.init() .init(),
.bind(&[KeyValue::new("table_name", table_name)]),
internal_update_counter: meter internal_update_counter: meter
.u64_counter("table.internal_update_counter") .u64_counter("table.internal_update_counter")
.with_description("Number of value updates where the value actually changes (includes creation of new key and update of existing key)") .with_description("Number of value updates where the value actually changes (includes creation of new key and update of existing key)")
.init() .init(),
.bind(&[KeyValue::new("table_name", table_name)]),
internal_delete_counter: meter internal_delete_counter: meter
.u64_counter("table.internal_delete_counter") .u64_counter("table.internal_delete_counter")
.with_description("Number of value deletions in the tree (due to GC or repartitioning)") .with_description("Number of value deletions in the tree (due to GC or repartitioning)")
.init() .init(),
.bind(&[KeyValue::new("table_name", table_name)]),
sync_items_sent: meter sync_items_sent: meter
.u64_counter("table.sync_items_sent") .u64_counter("table.sync_items_sent")

View file

@ -5,7 +5,7 @@ use std::time::{Duration, Instant};
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use async_trait::async_trait; use async_trait::async_trait;
use futures_util::stream::*; use futures_util::stream::*;
use opentelemetry::KeyValue; use opentelemetry::{Context, KeyValue};
use rand::Rng; use rand::Rng;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
@ -235,6 +235,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
for to in nodes.iter() { for to in nodes.iter() {
self.data.metrics.sync_items_sent.add( self.data.metrics.sync_items_sent.add(
&Context::current(),
values.len() as u64, values.len() as u64,
&[ &[
KeyValue::new("table_name", F::TABLE_NAME), KeyValue::new("table_name", F::TABLE_NAME),
@ -428,6 +429,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
self.data.metrics.sync_items_sent.add( self.data.metrics.sync_items_sent.add(
&Context::current(),
values.len() as u64, values.len() as u64,
&[ &[
KeyValue::new("table_name", F::TABLE_NAME), KeyValue::new("table_name", F::TABLE_NAME),
@ -470,6 +472,7 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync
} }
SyncRpc::Items(items) => { SyncRpc::Items(items) => {
self.data.metrics.sync_items_received.add( self.data.metrics.sync_items_received.add(
&Context::current(),
items.len() as u64, items.len() as u64,
&[ &[
KeyValue::new("table_name", F::TABLE_NAME), KeyValue::new("table_name", F::TABLE_NAME),

View file

@ -9,7 +9,7 @@ use serde_bytes::ByteBuf;
use opentelemetry::{ use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer}, trace::{FutureExt, TraceContextExt, Tracer},
Context, Context, KeyValue,
}; };
use garage_db as db; use garage_db as db;
@ -110,7 +110,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
.with_context(Context::current_with_span(span)) .with_context(Context::current_with_span(span))
.await?; .await?;
self.data.metrics.put_request_counter.add(1); self.data.metrics.put_request_counter.add(
&Context::current(),
1,
&[KeyValue::new("table_name", F::TABLE_NAME)],
);
Ok(()) Ok(())
} }
@ -154,7 +158,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
.with_context(Context::current_with_span(span)) .with_context(Context::current_with_span(span))
.await?; .await?;
self.data.metrics.put_request_counter.add(1); self.data.metrics.put_request_counter.add(
&Context::current(),
1,
&[KeyValue::new("table_name", F::TABLE_NAME)],
);
Ok(()) Ok(())
} }
@ -220,7 +228,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
.with_context(Context::current_with_span(span)) .with_context(Context::current_with_span(span))
.await?; .await?;
self.data.metrics.get_request_counter.add(1); self.data.metrics.get_request_counter.add(
&Context::current(),
1,
&[KeyValue::new("table_name", F::TABLE_NAME)],
);
Ok(res) Ok(res)
} }
@ -306,7 +318,11 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
.with_context(Context::current_with_span(span)) .with_context(Context::current_with_span(span))
.await?; .await?;
self.data.metrics.get_request_counter.add(1); self.data.metrics.get_request_counter.add(
&Context::current(),
1,
&[KeyValue::new("table_name", F::TABLE_NAME)],
);
Ok(res) Ok(res)
} }