From efc87a8b8e73c422995f70ad0851c41b55d8de6f Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 14 Aug 2024 22:20:08 +0200 Subject: [PATCH 1/8] add proxy to instrument LmdbDB with otel --- Cargo.lock | 1 + src/db/Cargo.toml | 1 + src/db/lib.rs | 3 + src/db/lmdb_adapter.rs | 2 +- src/db/metric_proxy.rs | 140 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 src/db/metric_proxy.rs diff --git a/Cargo.lock b/Cargo.lock index 9cb4b57e..384dda91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1448,6 +1448,7 @@ dependencies = [ "heed", "hexdump", "mktemp", + "opentelemetry", "r2d2", "r2d2_sqlite", "rusqlite", diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index ef5a8659..aefbb960 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -15,6 +15,7 @@ path = "lib.rs" err-derive.workspace = true hexdump.workspace = true tracing.workspace = true +opentelemetry.workspace = true heed = { workspace = true, optional = true } rusqlite = { workspace = true, optional = true, features = ["backup"] } diff --git a/src/db/lib.rs b/src/db/lib.rs index c8f9e13f..504f1b4c 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -3,6 +3,9 @@ extern crate tracing; #[cfg(feature = "lmdb")] pub mod lmdb_adapter; +#[cfg(feature = "lmdb")] +pub mod metric_proxy; + #[cfg(feature = "sqlite")] pub mod sqlite_adapter; diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index d5066664..ecfe5e1e 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -226,7 +226,7 @@ impl IDb for LmdbDb { // ---- -struct LmdbTx<'a> { +pub(crate) struct LmdbTx<'a> { trees: &'a [Database], tx: RwTxn<'a, 'a>, } diff --git a/src/db/metric_proxy.rs b/src/db/metric_proxy.rs new file mode 100644 index 00000000..e387715e --- /dev/null +++ b/src/db/metric_proxy.rs @@ -0,0 +1,140 @@ +use std::path::PathBuf; +use std::time::Instant; + +use crate::lmdb_adapter::{LmdbDb, LmdbTx}; +use crate::{Bound, IDb, ITx, ITxFn, OnCommit, Result, TxResult, Value, ValueIter}; +use opentelemetry::{ + global, + metrics::{Counter, ValueRecorder}, + KeyValue, +}; + +pub struct MetricDbProxy { + db: LmdbDb, + op_counter: Counter, + op_duration: ValueRecorder, +} + +impl MetricDbProxy { + pub fn init(db: LmdbDb) -> MetricDbProxy { + let meter = global::meter("garage/web"); + Self { + db, + op_counter: meter + .u64_counter("db.op_counter") + .with_description("Number of operations on the local metadata engine") + .init(), + op_duration: meter + .f64_value_recorder("db.op_duration") + .with_description("Duration of operations on the local metadata engine") + .init(), + } + } + + fn instrument( + &self, + fx: impl FnOnce() -> T, + op: &'static str, + cat: &'static str, + tx: &'static str, + ) -> T { + let metric_tags = [ + KeyValue::new("op", op), + KeyValue::new("cat", cat), + KeyValue::new("tx", tx), + ]; + self.op_counter.add(1, &metric_tags); + + let request_start = Instant::now(); + let res = fx(); + self.op_duration.record( + Instant::now() + .saturating_duration_since(request_start) + .as_secs_f64(), + &metric_tags, + ); + + res + } +} + +impl IDb for MetricDbProxy { + fn engine(&self) -> String { + format!("Metric Proxy on {}", self.db.engine()) + } + + fn open_tree(&self, name: &str) -> Result { + self.instrument(|| self.db.open_tree(name), "open_tree", "control", "no") + } + + fn list_trees(&self) -> Result> { + self.instrument(|| self.db.list_trees(), "list_trees", "control", "no") + } + + fn snapshot(&self, to: &PathBuf) -> Result<()> { + self.instrument(|| self.db.snapshot(to), "snapshot", "control", "no") + } + + // --- + + fn get(&self, tree: usize, key: &[u8]) -> Result> { + self.instrument(|| self.db.get(tree, key), "get", "data", "no") + } + + fn len(&self, tree: usize) -> Result { + self.instrument(|| self.db.len(tree), "len", "data", "no") + } + + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result> { + self.instrument(|| self.db.insert(tree, key, value), "insert", "data", "no") + } + + fn remove(&self, tree: usize, key: &[u8]) -> Result> { + self.instrument(|| self.db.remove(tree, key), "remove", "data", "no") + } + + fn clear(&self, tree: usize) -> Result<()> { + self.instrument(|| self.db.clear(tree), "clear", "data", "no") + } + + fn iter(&self, tree: usize) -> Result> { + self.instrument(|| self.db.iter(tree), "iter", "data", "no") + } + + fn iter_rev(&self, tree: usize) -> Result> { + self.instrument(|| self.db.iter_rev(tree), "iter_rev", "data", "no") + } + + fn range<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + self.instrument(|| self.db.range(tree, low, high), "range", "data", "no") + } + + fn range_rev<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + self.instrument( + || self.db.range_rev(tree, low, high), + "range_rev", + "data", + "no", + ) + } + + // ---- + + fn transaction(&self, f: &dyn ITxFn) -> TxResult { + self.instrument(|| self.db.transaction(f), "transaction", "control", "yes") + } +} + +struct MetricTxProxy<'a> { + tx: LmdbTx<'a>, +} -- 2.45.2 From 56de00945fbad7a5f6c0363143223ea01e1d9126 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 14 Aug 2024 23:12:46 +0200 Subject: [PATCH 2/8] proxy finalized, use it by setting 'lmdb-with-metrics' --- src/db/lmdb_adapter.rs | 11 +++-- src/db/metric_proxy.rs | 97 +++++++++++++++++++++++++++++++++++++++--- src/db/open.rs | 13 +++++- src/model/garage.rs | 2 +- 4 files changed, 109 insertions(+), 14 deletions(-) diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index ecfe5e1e..a8fa15e8 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -39,12 +39,15 @@ pub struct LmdbDb { } impl LmdbDb { - pub fn init(db: Env) -> Db { - let s = Self { + pub fn to_wrap(db: Env) -> Self { + Self { db, trees: RwLock::new((Vec::new(), HashMap::new())), - }; - Db(Arc::new(s)) + } + } + + pub fn init(db: Env) -> Db { + Db(Arc::new(Self::to_wrap(db))) } fn get_tree(&self, i: usize) -> Result { diff --git a/src/db/metric_proxy.rs b/src/db/metric_proxy.rs index e387715e..0449620a 100644 --- a/src/db/metric_proxy.rs +++ b/src/db/metric_proxy.rs @@ -1,8 +1,12 @@ use std::path::PathBuf; +use std::sync::Arc; use std::time::Instant; -use crate::lmdb_adapter::{LmdbDb, LmdbTx}; -use crate::{Bound, IDb, ITx, ITxFn, OnCommit, Result, TxResult, Value, ValueIter}; +use crate::lmdb_adapter::LmdbDb; +use crate::{ + Bound, Db, IDb, ITx, ITxFn, OnCommit, Result, TxFnResult, TxOpResult, TxResult, TxValueIter, + Value, ValueIter, +}; use opentelemetry::{ global, metrics::{Counter, ValueRecorder}, @@ -10,15 +14,16 @@ use opentelemetry::{ }; pub struct MetricDbProxy { + //@FIXME Replace with a template db: LmdbDb, op_counter: Counter, op_duration: ValueRecorder, } impl MetricDbProxy { - pub fn init(db: LmdbDb) -> MetricDbProxy { + pub fn init(db: LmdbDb) -> Db { let meter = global::meter("garage/web"); - Self { + let s = Self { db, op_counter: meter .u64_counter("db.op_counter") @@ -28,7 +33,8 @@ impl MetricDbProxy { .f64_value_recorder("db.op_duration") .with_description("Duration of operations on the local metadata engine") .init(), - } + }; + Db(Arc::new(s)) } fn instrument( @@ -131,10 +137,87 @@ impl IDb for MetricDbProxy { // ---- fn transaction(&self, f: &dyn ITxFn) -> TxResult { - self.instrument(|| self.db.transaction(f), "transaction", "control", "yes") + self.instrument( + || self.db.transaction(&MetricITxFnProxy { f, metrics: self }), + "transaction", + "control", + "yes", + ) + } +} + +struct MetricITxFnProxy<'a> { + f: &'a dyn ITxFn, + metrics: &'a MetricDbProxy, +} +impl<'a> ITxFn for MetricITxFnProxy<'a> { + fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult { + self.f.try_on(&mut MetricTxProxy { + tx, + metrics: self.metrics, + }) } } struct MetricTxProxy<'a> { - tx: LmdbTx<'a>, + tx: &'a mut dyn ITx, + metrics: &'a MetricDbProxy, +} +impl<'a> ITx for MetricTxProxy<'a> { + fn get(&self, tree: usize, key: &[u8]) -> TxOpResult> { + self.metrics + .instrument(|| self.tx.get(tree, key), "get", "data", "yes") + } + + fn len(&self, tree: usize) -> TxOpResult { + self.metrics + .instrument(|| self.tx.len(tree), "len", "data", "yes") + } + + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult> { + self.metrics + .instrument(|| self.tx.insert(tree, key, value), "insert", "data", "yes") + } + + fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult> { + self.metrics + .instrument(|| self.tx.remove(tree, key), "remove", "data", "yes") + } + + fn clear(&mut self, tree: usize) -> TxOpResult<()> { + self.metrics + .instrument(|| self.tx.clear(tree), "clear", "data", "yes") + } + + fn iter(&self, tree: usize) -> TxOpResult> { + self.metrics + .instrument(|| self.tx.iter(tree), "iter", "data", "yes") + } + fn iter_rev(&self, tree: usize) -> TxOpResult> { + self.metrics + .instrument(|| self.tx.iter_rev(tree), "iter_rev", "data", "yes") + } + fn range<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> TxOpResult> { + self.metrics + .instrument(|| self.tx.range(tree, low, high), "range", "data", "yes") + } + + fn range_rev<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> TxOpResult> { + self.metrics.instrument( + || self.tx.range_rev(tree, low, high), + "range_rev", + "data", + "yes", + ) + } } diff --git a/src/db/open.rs b/src/db/open.rs index b8de3cd7..7114fed6 100644 --- a/src/db/open.rs +++ b/src/db/open.rs @@ -10,6 +10,7 @@ use crate::{Db, Error, Result}; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Engine { Lmdb, + LmdbWithMetrics, Sqlite, } @@ -18,6 +19,7 @@ impl Engine { pub fn as_str(&self) -> &'static str { match self { Self::Lmdb => "lmdb", + Self::LmdbWithMetrics => "lmdb-with-metrics", Self::Sqlite => "sqlite", } } @@ -35,6 +37,7 @@ impl std::str::FromStr for Engine { fn from_str(text: &str) -> Result { match text { "lmdb" | "heed" => Ok(Self::Lmdb), + "lmdb-with-metrics" | "heed-with-metrics" => Ok(Self::LmdbWithMetrics), "sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite), "sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())), kind => Err(Error( @@ -74,7 +77,7 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result { // ---- LMDB DB ---- #[cfg(feature = "lmdb")] - Engine::Lmdb => { + Engine::Lmdb | Engine::LmdbWithMetrics => { info!("Opening LMDB database at: {}", path.display()); if let Err(e) = std::fs::create_dir_all(&path) { return Err(Error( @@ -109,7 +112,13 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result { )) } Err(e) => Err(Error(format!("Cannot open LMDB database: {}", e).into())), - Ok(db) => Ok(crate::lmdb_adapter::LmdbDb::init(db)), + Ok(db) => match engine { + Engine::LmdbWithMetrics => { + let to_wrap = crate::lmdb_adapter::LmdbDb::to_wrap(db); + Ok(crate::metric_proxy::MetricDbProxy::init(to_wrap)) + } + _ => Ok(crate::lmdb_adapter::LmdbDb::init(db)), + }, } } diff --git a/src/model/garage.rs b/src/model/garage.rs index 363b02dd..91add8ac 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -121,7 +121,7 @@ impl Garage { db::Engine::Sqlite => { db_path.push("db.sqlite"); } - db::Engine::Lmdb => { + db::Engine::Lmdb | db::Engine::LmdbWithMetrics => { db_path.push("db.lmdb"); } } -- 2.45.2 From 9d01a9870ca35e5d768d37bf058fb932f698c89b Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 15 Aug 2024 09:10:39 +0200 Subject: [PATCH 3/8] fix cargo.nix --- Cargo.nix | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.nix b/Cargo.nix index c2be3161..a791fecd 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -34,7 +34,7 @@ args@{ ignoreLockHash, }: let - nixifiedLockHash = "fc41fb639a69d62c8c0fb3f9c227162162ebc8142c6fa5cd0599dc381dcd9ebb"; + nixifiedLockHash = "5826893fb6082581d96992c846c892c623f3b1b17cefba3b6b8ca10c9f45c589"; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); lockHashIgnored = if ignoreLockHash @@ -2104,6 +2104,7 @@ in err_derive = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }).out; ${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/lmdb" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/heed" || rootFeatures' ? "garage_db/lmdb" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/lmdb" then "heed" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".heed."0.11.0" { inherit profileName; }).out; hexdump = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; }).out; + opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/r2d2" || rootFeatures' ? "garage_db/sqlite" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/sqlite" then "r2d2" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".r2d2."0.8.10" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/r2d2_sqlite" || rootFeatures' ? "garage_db/sqlite" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/sqlite" then "r2d2_sqlite" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".r2d2_sqlite."0.24.0" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/bundled-libs" || rootFeatures' ? "garage/default" || rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_db/bundled-libs" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/rusqlite" || rootFeatures' ? "garage_db/sqlite" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/sqlite" then "rusqlite" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusqlite."0.31.0" { inherit profileName; }).out; -- 2.45.2 From 1685d83c04f569ed66e706f3a548c20e9eb423b8 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 15 Aug 2024 14:32:01 +0200 Subject: [PATCH 4/8] switch from sec to us --- src/db/metric_proxy.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/db/metric_proxy.rs b/src/db/metric_proxy.rs index 0449620a..c0856a57 100644 --- a/src/db/metric_proxy.rs +++ b/src/db/metric_proxy.rs @@ -9,7 +9,7 @@ use crate::{ }; use opentelemetry::{ global, - metrics::{Counter, ValueRecorder}, + metrics::{Counter, Unit, ValueRecorder}, KeyValue, }; @@ -32,6 +32,7 @@ impl MetricDbProxy { op_duration: meter .f64_value_recorder("db.op_duration") .with_description("Duration of operations on the local metadata engine") + .with_unit(Unit::new("us")) .init(), }; Db(Arc::new(s)) @@ -53,12 +54,12 @@ impl MetricDbProxy { let request_start = Instant::now(); let res = fx(); - self.op_duration.record( - Instant::now() - .saturating_duration_since(request_start) - .as_secs_f64(), - &metric_tags, - ); + let delay_nanos = Instant::now() + .saturating_duration_since(request_start) + .as_nanos(); + let delay_micro: f64 = delay_nanos as f64 / 1000.0f64; + println!("delay {}", delay_micro); + self.op_duration.record(delay_micro, &metric_tags); res } -- 2.45.2 From 2d439c388c03eaf36f0c8675df9cac57d08ff9f6 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 15 Aug 2024 15:30:56 +0200 Subject: [PATCH 5/8] switch from micros to millis --- src/db/metric_proxy.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/db/metric_proxy.rs b/src/db/metric_proxy.rs index c0856a57..469f8088 100644 --- a/src/db/metric_proxy.rs +++ b/src/db/metric_proxy.rs @@ -32,7 +32,7 @@ impl MetricDbProxy { op_duration: meter .f64_value_recorder("db.op_duration") .with_description("Duration of operations on the local metadata engine") - .with_unit(Unit::new("us")) + .with_unit(Unit::new("ms")) .init(), }; Db(Arc::new(s)) @@ -57,9 +57,8 @@ impl MetricDbProxy { let delay_nanos = Instant::now() .saturating_duration_since(request_start) .as_nanos(); - let delay_micro: f64 = delay_nanos as f64 / 1000.0f64; - println!("delay {}", delay_micro); - self.op_duration.record(delay_micro, &metric_tags); + let delay_millis: f64 = delay_nanos as f64 / 1_000_000f64; + self.op_duration.record(delay_millis, &metric_tags); res } -- 2.45.2 From 14163b5853aea4357f4e17bb341ccd50ba3c89f1 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 15 Aug 2024 15:43:15 +0200 Subject: [PATCH 6/8] switch to ms, simplify collected metrics --- src/db/metric_proxy.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/db/metric_proxy.rs b/src/db/metric_proxy.rs index 469f8088..d246e7fe 100644 --- a/src/db/metric_proxy.rs +++ b/src/db/metric_proxy.rs @@ -16,8 +16,7 @@ use opentelemetry::{ pub struct MetricDbProxy { //@FIXME Replace with a template db: LmdbDb, - op_counter: Counter, - op_duration: ValueRecorder, + op: ValueRecorder, } impl MetricDbProxy { @@ -25,13 +24,9 @@ impl MetricDbProxy { let meter = global::meter("garage/web"); let s = Self { db, - op_counter: meter - .u64_counter("db.op_counter") - .with_description("Number of operations on the local metadata engine") - .init(), - op_duration: meter - .f64_value_recorder("db.op_duration") - .with_description("Duration of operations on the local metadata engine") + op: meter + .f64_value_recorder("db.op") + .with_description("Duration and amount of operations on the local metadata engine") .with_unit(Unit::new("ms")) .init(), }; @@ -50,7 +45,6 @@ impl MetricDbProxy { KeyValue::new("cat", cat), KeyValue::new("tx", tx), ]; - self.op_counter.add(1, &metric_tags); let request_start = Instant::now(); let res = fx(); @@ -58,7 +52,7 @@ impl MetricDbProxy { .saturating_duration_since(request_start) .as_nanos(); let delay_millis: f64 = delay_nanos as f64 / 1_000_000f64; - self.op_duration.record(delay_millis, &metric_tags); + self.op.record(delay_millis, &metric_tags); res } -- 2.45.2 From 306a74379a7b5e56699ac6cdd98ae2c6e98efc0d Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Sat, 17 Aug 2024 13:16:55 +0200 Subject: [PATCH 7/8] add metrics to workers --- src/db/metric_proxy.rs | 4 ++-- src/util/background/worker.rs | 24 ++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/src/db/metric_proxy.rs b/src/db/metric_proxy.rs index d246e7fe..e5221a7f 100644 --- a/src/db/metric_proxy.rs +++ b/src/db/metric_proxy.rs @@ -9,7 +9,7 @@ use crate::{ }; use opentelemetry::{ global, - metrics::{Counter, Unit, ValueRecorder}, + metrics::{Unit, ValueRecorder}, KeyValue, }; @@ -21,7 +21,7 @@ pub struct MetricDbProxy { impl MetricDbProxy { pub fn init(db: LmdbDb) -> Db { - let meter = global::meter("garage/web"); + let meter = global::meter("garage/db"); let s = Self { db, op: meter diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index 8165e2cb..22f747b6 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -1,11 +1,13 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use async_trait::async_trait; use futures::future::*; use futures::stream::FuturesUnordered; use futures::StreamExt; +use opentelemetry::{global, metrics::ValueRecorder, KeyValue}; use serde::{Deserialize, Serialize}; use tokio::select; use tokio::sync::{mpsc, watch}; @@ -62,6 +64,7 @@ pub(crate) struct WorkerProcessor { stop_signal: watch::Receiver, worker_chan: mpsc::UnboundedReceiver>, worker_info: Arc>>, + metrics: ValueRecorder, } impl WorkerProcessor { @@ -70,10 +73,15 @@ impl WorkerProcessor { stop_signal: watch::Receiver, worker_info: Arc>>, ) -> Self { + let meter = global::meter("garage/util"); Self { stop_signal, worker_chan, worker_info, + metrics: meter + .f64_value_recorder("util.worker_step") + .with_description("Duration and amount of worker steps executed") + .init(), } } @@ -103,6 +111,7 @@ impl WorkerProcessor { errors: 0, consecutive_errors: 0, last_error: None, + metrics: self.metrics.clone(), }; workers.push(async move { worker.step().await; @@ -183,10 +192,13 @@ struct WorkerHandler { errors: usize, consecutive_errors: usize, last_error: Option<(String, u64)>, + metrics: ValueRecorder, } impl WorkerHandler { async fn step(&mut self) { + let request_start = Instant::now(); + //@FIXME we also want to track errors in metrics but I don't know how yet. match self.state { WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await { Ok(s) => { @@ -229,5 +241,17 @@ impl WorkerHandler { } WorkerState::Done => unreachable!(), } + + // metrics + let metric_tags = [ + KeyValue::new("state", self.state.to_string()), + KeyValue::new("name", self.worker.name()), + KeyValue::new("id", format!("{}", self.task_id)), + ]; + + let delay_secs = Instant::now() + .saturating_duration_since(request_start) + .as_secs_f64(); + self.metrics.record(delay_secs, &metric_tags); } } -- 2.45.2 From 1ebaf7aa17672bdc6e83e6c04c4c13e142f57629 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Sat, 17 Aug 2024 13:17:16 +0200 Subject: [PATCH 8/8] force flag "no read ahead" on LMDB --- src/db/open.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/db/open.rs b/src/db/open.rs index 7114fed6..057c7ae4 100644 --- a/src/db/open.rs +++ b/src/db/open.rs @@ -95,6 +95,7 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result { env_builder.map_size(map_size); env_builder.max_readers(2048); unsafe { + env_builder.flag(crate::lmdb_adapter::heed::flags::Flags::MdbNoRdAhead); env_builder.flag(crate::lmdb_adapter::heed::flags::Flags::MdbNoMetaSync); if !opt.fsync { env_builder.flag(heed::flags::Flags::MdbNoSync); -- 2.45.2