From 1e6e8db264942701db2151c6aa030734cb54dd73 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Fri, 27 Jan 2023 17:04:16 +0000 Subject: [PATCH] block: Ported to opentelemetry 0.18. --- src/block/manager.rs | 29 +++++++++----- src/block/metrics.rs | 94 ++++++++++++++++---------------------------- src/block/resync.rs | 24 +++++++---- 3 files changed, 69 insertions(+), 78 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 051a9f93..6fc8b741 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -129,12 +129,17 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); - let metrics = BlockManagerMetrics::new( - compression_level, - rc.rc.clone(), - resync.queue.clone(), - resync.errors.clone(), - ); + let metrics = + BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); + + match compression_level { + Some(v) => metrics + .compression_level + .observe(&Context::current(), v as u64, &[]), + None => metrics + .compression_level + .observe(&Context::current(), 0_u64, &[]), + } let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); @@ -475,7 +480,9 @@ impl BlockManager { )) .await?; - self.metrics.bytes_written.add(write_size); + self.metrics + .bytes_written + .add(&Context::current(), write_size, &[]); Ok(()) } @@ -510,7 +517,7 @@ impl BlockManager { self.metrics .bytes_read - .add(data.inner_buffer().len() as u64); + .add(&Context::current(), data.inner_buffer().len() as u64, &[]); Ok(data) } @@ -542,7 +549,9 @@ impl BlockManager { }; if data.verify(*hash).is_err() { - self.metrics.corruption_counter.add(1); + self.metrics + .corruption_counter + .add(&Context::current(), 1, &[]); self.lock_mutate(hash) .await @@ -742,7 +751,7 @@ impl BlockManagerLocked { path.set_extension("zst"); } fs::remove_file(path).await?; - mgr.metrics.delete_counter.add(1); + mgr.metrics.delete_counter.add(&Context::current(), 1, &[]); } Ok(()) } diff --git a/src/block/metrics.rs b/src/block/metrics.rs index 6659df32..a1ae0af4 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -5,82 +5,61 @@ use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct BlockManagerMetrics { - pub(crate) _compression_level: ValueObserver, - pub(crate) _rc_size: ValueObserver, - pub(crate) _resync_queue_len: ValueObserver, - pub(crate) _resync_errored_blocks: ValueObserver, + pub(crate) compression_level: ObservableGauge, + pub(crate) _rc_size: ObservableGauge, + pub(crate) _resync_queue_len: ObservableGauge, + pub(crate) _resync_errored_blocks: ObservableGauge, - pub(crate) resync_counter: BoundCounter, - pub(crate) resync_error_counter: BoundCounter, - pub(crate) resync_duration: BoundValueRecorder, + pub(crate) resync_counter: Counter, + pub(crate) resync_error_counter: Counter, + pub(crate) resync_duration: Histogram, pub(crate) resync_send_counter: Counter, - pub(crate) resync_recv_counter: BoundCounter, + pub(crate) resync_recv_counter: Counter, - pub(crate) bytes_read: BoundCounter, - pub(crate) block_read_duration: BoundValueRecorder, - pub(crate) bytes_written: BoundCounter, - pub(crate) block_write_duration: BoundValueRecorder, - pub(crate) delete_counter: BoundCounter, + pub(crate) bytes_read: Counter, + pub(crate) block_read_duration: Histogram, + pub(crate) bytes_written: Counter, + pub(crate) block_write_duration: Histogram, + pub(crate) delete_counter: Counter, - pub(crate) corruption_counter: BoundCounter, + pub(crate) corruption_counter: Counter, } impl BlockManagerMetrics { - pub fn new( - compression_level: Option, - rc_tree: db::Tree, - resync_queue: CountedTree, - resync_errors: CountedTree, - ) -> Self { + pub fn new(rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree) -> Self { let meter = global::meter("garage_model/block"); Self { - _compression_level: meter - .u64_value_observer("block.compression_level", move |observer| { - match compression_level { - Some(v) => observer.observe(v as u64, &[]), - None => observer.observe(0_u64, &[]), - } - }) + compression_level: meter + .u64_observable_gauge("block.compression_level") .with_description("Garage compression level for node") .init(), _rc_size: meter - .u64_value_observer("block.rc_size", move |observer| { - if let Ok(Some(v)) = rc_tree.fast_len() { - observer.observe(v as u64, &[]) - } - }) + .u64_observable_gauge("block.rc_size") .with_description("Number of blocks known to the reference counter") .init(), _resync_queue_len: meter - .u64_value_observer("block.resync_queue_length", move |observer| { - observer.observe(resync_queue.len() as u64, &[]) - }) + .u64_observable_gauge("block.resync_queue_length") .with_description( "Number of block hashes queued for local check and possible resync", ) .init(), _resync_errored_blocks: meter - .u64_value_observer("block.resync_errored_blocks", move |observer| { - observer.observe(resync_errors.len() as u64, &[]) - }) + .u64_observable_gauge("block.resync_errored_blocks") .with_description("Number of block hashes whose last resync resulted in an error") .init(), resync_counter: meter .u64_counter("block.resync_counter") .with_description("Number of calls to resync_block") - .init() - .bind(&[]), + .init(), resync_error_counter: meter .u64_counter("block.resync_error_counter") .with_description("Number of calls to resync_block that returned an error") - .init() - .bind(&[]), + .init(), resync_duration: meter - .f64_value_recorder("block.resync_duration") + .f64_histogram("block.resync_duration") .with_description("Duration of resync_block operations") - .init() - .bind(&[]), + .init(), resync_send_counter: meter .u64_counter("block.resync_send_counter") .with_description("Number of blocks sent to another node in resync operations") @@ -88,40 +67,33 @@ impl BlockManagerMetrics { resync_recv_counter: meter .u64_counter("block.resync_recv_counter") .with_description("Number of blocks received from other nodes in resync operations") - .init() - .bind(&[]), + .init(), bytes_read: meter .u64_counter("block.bytes_read") .with_description("Number of bytes read from disk") - .init() - .bind(&[]), + .init(), block_read_duration: meter - .f64_value_recorder("block.read_duration") + .f64_histogram("block.read_duration") .with_description("Duration of block read operations") - .init() - .bind(&[]), + .init(), bytes_written: meter .u64_counter("block.bytes_written") .with_description("Number of bytes written to disk") - .init() - .bind(&[]), + .init(), block_write_duration: meter - .f64_value_recorder("block.write_duration") + .f64_histogram("block.write_duration") .with_description("Duration of block write operations") - .init() - .bind(&[]), + .init(), delete_counter: meter .u64_counter("block.delete_counter") .with_description("Number of blocks deleted") - .init() - .bind(&[]), + .init(), corruption_counter: meter .u64_counter("block.corruption_counter") .with_description("Data corruptions detected on block reads") - .init() - .bind(&[]), + .init(), } } } diff --git a/src/block/resync.rs b/src/block/resync.rs index ea280ad4..865fb289 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -302,10 +302,16 @@ impl BlockResyncManager { .bound_record_duration(&manager.metrics.resync_duration) .await; - manager.metrics.resync_counter.add(1); + manager + .metrics + .resync_counter + .add(&Context::current(), 1, &[]); if let Err(e) = &res { - manager.metrics.resync_error_counter.add(1); + manager + .metrics + .resync_error_counter + .add(&Context::current(), 1, &[]); error!("Error when resyncing {:?}: {}", hash, e); let err_counter = match self.errors.get(hash.as_slice())? { @@ -413,10 +419,11 @@ impl BlockResyncManager { ); for node in need_nodes.iter() { - manager - .metrics - .resync_send_counter - .add(1, &[KeyValue::new("to", format!("{:?}", node))]); + manager.metrics.resync_send_counter.add( + &Context::current(), + 1, + &[KeyValue::new("to", format!("{:?}", node))], + ); } let block = manager.read_block(hash).await?; @@ -459,7 +466,10 @@ impl BlockResyncManager { let block_data = manager.rpc_get_raw_block(hash, None).await?; - manager.metrics.resync_recv_counter.add(1); + manager + .metrics + .resync_recv_counter + .add(&Context::current(), 1, &[]); manager.write_block(hash, &block_data).await?; }