diff --git a/Cargo.lock b/Cargo.lock index 9cb4b57e..384dda91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1448,6 +1448,7 @@ dependencies = [ "heed", "hexdump", "mktemp", + "opentelemetry", "r2d2", "r2d2_sqlite", "rusqlite", diff --git a/Cargo.nix b/Cargo.nix index c2be3161..a791fecd 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -34,7 +34,7 @@ args@{ ignoreLockHash, }: let - nixifiedLockHash = "fc41fb639a69d62c8c0fb3f9c227162162ebc8142c6fa5cd0599dc381dcd9ebb"; + nixifiedLockHash = "5826893fb6082581d96992c846c892c623f3b1b17cefba3b6b8ca10c9f45c589"; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); lockHashIgnored = if ignoreLockHash @@ -2104,6 +2104,7 @@ in err_derive = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }).out; ${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/lmdb" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/heed" || rootFeatures' ? "garage_db/lmdb" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/lmdb" then "heed" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".heed."0.11.0" { inherit profileName; }).out; hexdump = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; }).out; + opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/r2d2" || rootFeatures' ? "garage_db/sqlite" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/sqlite" then "r2d2" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".r2d2."0.8.10" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/r2d2_sqlite" || rootFeatures' ? "garage_db/sqlite" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/sqlite" then "r2d2_sqlite" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".r2d2_sqlite."0.24.0" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/bundled-libs" || rootFeatures' ? "garage/default" || rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_db/bundled-libs" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/rusqlite" || rootFeatures' ? "garage_db/sqlite" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/sqlite" then "rusqlite" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusqlite."0.31.0" { inherit profileName; }).out; diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index ef5a8659..aefbb960 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -15,6 +15,7 @@ path = "lib.rs" err-derive.workspace = true hexdump.workspace = true tracing.workspace = true +opentelemetry.workspace = true heed = { workspace = true, optional = true } rusqlite = { workspace = true, optional = true, features = ["backup"] } diff --git a/src/db/lib.rs b/src/db/lib.rs index c8f9e13f..504f1b4c 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -3,6 +3,9 @@ extern crate tracing; #[cfg(feature = "lmdb")] pub mod lmdb_adapter; +#[cfg(feature = "lmdb")] +pub mod metric_proxy; + #[cfg(feature = "sqlite")] pub mod sqlite_adapter; diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index d5066664..a8fa15e8 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -39,12 +39,15 @@ pub struct LmdbDb { } impl LmdbDb { - pub fn init(db: Env) -> Db { - let s = Self { + pub fn to_wrap(db: Env) -> Self { + Self { db, trees: RwLock::new((Vec::new(), HashMap::new())), - }; - Db(Arc::new(s)) + } + } + + pub fn init(db: Env) -> Db { + Db(Arc::new(Self::to_wrap(db))) } fn get_tree(&self, i: usize) -> Result { @@ -226,7 +229,7 @@ impl IDb for LmdbDb { // ---- -struct LmdbTx<'a> { +pub(crate) struct LmdbTx<'a> { trees: &'a [Database], tx: RwTxn<'a, 'a>, } diff --git a/src/db/metric_proxy.rs b/src/db/metric_proxy.rs new file mode 100644 index 00000000..e5221a7f --- /dev/null +++ b/src/db/metric_proxy.rs @@ -0,0 +1,217 @@ +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use crate::lmdb_adapter::LmdbDb; +use crate::{ + Bound, Db, IDb, ITx, ITxFn, OnCommit, Result, TxFnResult, TxOpResult, TxResult, TxValueIter, + Value, ValueIter, +}; +use opentelemetry::{ + global, + metrics::{Unit, ValueRecorder}, + KeyValue, +}; + +pub struct MetricDbProxy { + //@FIXME Replace with a template + db: LmdbDb, + op: ValueRecorder, +} + +impl MetricDbProxy { + pub fn init(db: LmdbDb) -> Db { + let meter = global::meter("garage/db"); + let s = Self { + db, + op: meter + .f64_value_recorder("db.op") + .with_description("Duration and amount of operations on the local metadata engine") + .with_unit(Unit::new("ms")) + .init(), + }; + Db(Arc::new(s)) + } + + fn instrument( + &self, + fx: impl FnOnce() -> T, + op: &'static str, + cat: &'static str, + tx: &'static str, + ) -> T { + let metric_tags = [ + KeyValue::new("op", op), + KeyValue::new("cat", cat), + KeyValue::new("tx", tx), + ]; + + let request_start = Instant::now(); + let res = fx(); + let delay_nanos = Instant::now() + .saturating_duration_since(request_start) + .as_nanos(); + let delay_millis: f64 = delay_nanos as f64 / 1_000_000f64; + self.op.record(delay_millis, &metric_tags); + + res + } +} + +impl IDb for MetricDbProxy { + fn engine(&self) -> String { + format!("Metric Proxy on {}", self.db.engine()) + } + + fn open_tree(&self, name: &str) -> Result { + self.instrument(|| self.db.open_tree(name), "open_tree", "control", "no") + } + + fn list_trees(&self) -> Result> { + self.instrument(|| self.db.list_trees(), "list_trees", "control", "no") + } + + fn snapshot(&self, to: &PathBuf) -> Result<()> { + self.instrument(|| self.db.snapshot(to), "snapshot", "control", "no") + } + + // --- + + fn get(&self, tree: usize, key: &[u8]) -> Result> { + self.instrument(|| self.db.get(tree, key), "get", "data", "no") + } + + fn len(&self, tree: usize) -> Result { + self.instrument(|| self.db.len(tree), "len", "data", "no") + } + + fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result> { + self.instrument(|| self.db.insert(tree, key, value), "insert", "data", "no") + } + + fn remove(&self, tree: usize, key: &[u8]) -> Result> { + self.instrument(|| self.db.remove(tree, key), "remove", "data", "no") + } + + fn clear(&self, tree: usize) -> Result<()> { + self.instrument(|| self.db.clear(tree), "clear", "data", "no") + } + + fn iter(&self, tree: usize) -> Result> { + self.instrument(|| self.db.iter(tree), "iter", "data", "no") + } + + fn iter_rev(&self, tree: usize) -> Result> { + self.instrument(|| self.db.iter_rev(tree), "iter_rev", "data", "no") + } + + fn range<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + self.instrument(|| self.db.range(tree, low, high), "range", "data", "no") + } + + fn range_rev<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> Result> { + self.instrument( + || self.db.range_rev(tree, low, high), + "range_rev", + "data", + "no", + ) + } + + // ---- + + fn transaction(&self, f: &dyn ITxFn) -> TxResult { + self.instrument( + || self.db.transaction(&MetricITxFnProxy { f, metrics: self }), + "transaction", + "control", + "yes", + ) + } +} + +struct MetricITxFnProxy<'a> { + f: &'a dyn ITxFn, + metrics: &'a MetricDbProxy, +} +impl<'a> ITxFn for MetricITxFnProxy<'a> { + fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult { + self.f.try_on(&mut MetricTxProxy { + tx, + metrics: self.metrics, + }) + } +} + +struct MetricTxProxy<'a> { + tx: &'a mut dyn ITx, + metrics: &'a MetricDbProxy, +} +impl<'a> ITx for MetricTxProxy<'a> { + fn get(&self, tree: usize, key: &[u8]) -> TxOpResult> { + self.metrics + .instrument(|| self.tx.get(tree, key), "get", "data", "yes") + } + + fn len(&self, tree: usize) -> TxOpResult { + self.metrics + .instrument(|| self.tx.len(tree), "len", "data", "yes") + } + + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult> { + self.metrics + .instrument(|| self.tx.insert(tree, key, value), "insert", "data", "yes") + } + + fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult> { + self.metrics + .instrument(|| self.tx.remove(tree, key), "remove", "data", "yes") + } + + fn clear(&mut self, tree: usize) -> TxOpResult<()> { + self.metrics + .instrument(|| self.tx.clear(tree), "clear", "data", "yes") + } + + fn iter(&self, tree: usize) -> TxOpResult> { + self.metrics + .instrument(|| self.tx.iter(tree), "iter", "data", "yes") + } + fn iter_rev(&self, tree: usize) -> TxOpResult> { + self.metrics + .instrument(|| self.tx.iter_rev(tree), "iter_rev", "data", "yes") + } + fn range<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> TxOpResult> { + self.metrics + .instrument(|| self.tx.range(tree, low, high), "range", "data", "yes") + } + + fn range_rev<'r>( + &self, + tree: usize, + low: Bound<&'r [u8]>, + high: Bound<&'r [u8]>, + ) -> TxOpResult> { + self.metrics.instrument( + || self.tx.range_rev(tree, low, high), + "range_rev", + "data", + "yes", + ) + } +} diff --git a/src/db/open.rs b/src/db/open.rs index b8de3cd7..057c7ae4 100644 --- a/src/db/open.rs +++ b/src/db/open.rs @@ -10,6 +10,7 @@ use crate::{Db, Error, Result}; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Engine { Lmdb, + LmdbWithMetrics, Sqlite, } @@ -18,6 +19,7 @@ impl Engine { pub fn as_str(&self) -> &'static str { match self { Self::Lmdb => "lmdb", + Self::LmdbWithMetrics => "lmdb-with-metrics", Self::Sqlite => "sqlite", } } @@ -35,6 +37,7 @@ impl std::str::FromStr for Engine { fn from_str(text: &str) -> Result { match text { "lmdb" | "heed" => Ok(Self::Lmdb), + "lmdb-with-metrics" | "heed-with-metrics" => Ok(Self::LmdbWithMetrics), "sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite), "sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())), kind => Err(Error( @@ -74,7 +77,7 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result { // ---- LMDB DB ---- #[cfg(feature = "lmdb")] - Engine::Lmdb => { + Engine::Lmdb | Engine::LmdbWithMetrics => { info!("Opening LMDB database at: {}", path.display()); if let Err(e) = std::fs::create_dir_all(&path) { return Err(Error( @@ -92,6 +95,7 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result { env_builder.map_size(map_size); env_builder.max_readers(2048); unsafe { + env_builder.flag(crate::lmdb_adapter::heed::flags::Flags::MdbNoRdAhead); env_builder.flag(crate::lmdb_adapter::heed::flags::Flags::MdbNoMetaSync); if !opt.fsync { env_builder.flag(heed::flags::Flags::MdbNoSync); @@ -109,7 +113,13 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result { )) } Err(e) => Err(Error(format!("Cannot open LMDB database: {}", e).into())), - Ok(db) => Ok(crate::lmdb_adapter::LmdbDb::init(db)), + Ok(db) => match engine { + Engine::LmdbWithMetrics => { + let to_wrap = crate::lmdb_adapter::LmdbDb::to_wrap(db); + Ok(crate::metric_proxy::MetricDbProxy::init(to_wrap)) + } + _ => Ok(crate::lmdb_adapter::LmdbDb::init(db)), + }, } } diff --git a/src/model/garage.rs b/src/model/garage.rs index 363b02dd..91add8ac 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -121,7 +121,7 @@ impl Garage { db::Engine::Sqlite => { db_path.push("db.sqlite"); } - db::Engine::Lmdb => { + db::Engine::Lmdb | db::Engine::LmdbWithMetrics => { db_path.push("db.lmdb"); } } diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index 8165e2cb..22f747b6 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -1,11 +1,13 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use async_trait::async_trait; use futures::future::*; use futures::stream::FuturesUnordered; use futures::StreamExt; +use opentelemetry::{global, metrics::ValueRecorder, KeyValue}; use serde::{Deserialize, Serialize}; use tokio::select; use tokio::sync::{mpsc, watch}; @@ -62,6 +64,7 @@ pub(crate) struct WorkerProcessor { stop_signal: watch::Receiver, worker_chan: mpsc::UnboundedReceiver>, worker_info: Arc>>, + metrics: ValueRecorder, } impl WorkerProcessor { @@ -70,10 +73,15 @@ impl WorkerProcessor { stop_signal: watch::Receiver, worker_info: Arc>>, ) -> Self { + let meter = global::meter("garage/util"); Self { stop_signal, worker_chan, worker_info, + metrics: meter + .f64_value_recorder("util.worker_step") + .with_description("Duration and amount of worker steps executed") + .init(), } } @@ -103,6 +111,7 @@ impl WorkerProcessor { errors: 0, consecutive_errors: 0, last_error: None, + metrics: self.metrics.clone(), }; workers.push(async move { worker.step().await; @@ -183,10 +192,13 @@ struct WorkerHandler { errors: usize, consecutive_errors: usize, last_error: Option<(String, u64)>, + metrics: ValueRecorder, } impl WorkerHandler { async fn step(&mut self) { + let request_start = Instant::now(); + //@FIXME we also want to track errors in metrics but I don't know how yet. match self.state { WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await { Ok(s) => { @@ -229,5 +241,17 @@ impl WorkerHandler { } WorkerState::Done => unreachable!(), } + + // metrics + let metric_tags = [ + KeyValue::new("state", self.state.to_string()), + KeyValue::new("name", self.worker.name()), + KeyValue::new("id", format!("{}", self.task_id)), + ]; + + let delay_secs = Instant::now() + .saturating_duration_since(request_start) + .as_secs_f64(); + self.metrics.record(delay_secs, &metric_tags); } }