Compare commits
8 commits
main
...
feat-metri
Author | SHA1 | Date | |
---|---|---|---|
1ebaf7aa17 | |||
306a74379a | |||
14163b5853 | |||
2d439c388c | |||
1685d83c04 | |||
9d01a9870c | |||
56de00945f | |||
efc87a8b8e |
9 changed files with 269 additions and 9 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1448,6 +1448,7 @@ dependencies = [
|
||||||
"heed",
|
"heed",
|
||||||
"hexdump",
|
"hexdump",
|
||||||
"mktemp",
|
"mktemp",
|
||||||
|
"opentelemetry",
|
||||||
"r2d2",
|
"r2d2",
|
||||||
"r2d2_sqlite",
|
"r2d2_sqlite",
|
||||||
"rusqlite",
|
"rusqlite",
|
||||||
|
|
|
@ -34,7 +34,7 @@ args@{
|
||||||
ignoreLockHash,
|
ignoreLockHash,
|
||||||
}:
|
}:
|
||||||
let
|
let
|
||||||
nixifiedLockHash = "fc41fb639a69d62c8c0fb3f9c227162162ebc8142c6fa5cd0599dc381dcd9ebb";
|
nixifiedLockHash = "5826893fb6082581d96992c846c892c623f3b1b17cefba3b6b8ca10c9f45c589";
|
||||||
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
||||||
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
||||||
lockHashIgnored = if ignoreLockHash
|
lockHashIgnored = if ignoreLockHash
|
||||||
|
@ -2104,6 +2104,7 @@ in
|
||||||
err_derive = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }).out;
|
err_derive = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }).out;
|
||||||
${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/lmdb" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/heed" || rootFeatures' ? "garage_db/lmdb" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/lmdb" then "heed" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".heed."0.11.0" { inherit profileName; }).out;
|
${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/lmdb" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/heed" || rootFeatures' ? "garage_db/lmdb" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/lmdb" then "heed" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".heed."0.11.0" { inherit profileName; }).out;
|
||||||
hexdump = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; }).out;
|
hexdump = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; }).out;
|
||||||
|
opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out;
|
||||||
${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/r2d2" || rootFeatures' ? "garage_db/sqlite" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/sqlite" then "r2d2" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".r2d2."0.8.10" { inherit profileName; }).out;
|
${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/r2d2" || rootFeatures' ? "garage_db/sqlite" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/sqlite" then "r2d2" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".r2d2."0.8.10" { inherit profileName; }).out;
|
||||||
${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/r2d2_sqlite" || rootFeatures' ? "garage_db/sqlite" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/sqlite" then "r2d2_sqlite" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".r2d2_sqlite."0.24.0" { inherit profileName; }).out;
|
${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/r2d2_sqlite" || rootFeatures' ? "garage_db/sqlite" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/sqlite" then "r2d2_sqlite" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".r2d2_sqlite."0.24.0" { inherit profileName; }).out;
|
||||||
${ if rootFeatures' ? "garage/bundled-libs" || rootFeatures' ? "garage/default" || rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_db/bundled-libs" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/rusqlite" || rootFeatures' ? "garage_db/sqlite" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/sqlite" then "rusqlite" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusqlite."0.31.0" { inherit profileName; }).out;
|
${ if rootFeatures' ? "garage/bundled-libs" || rootFeatures' ? "garage/default" || rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_db/bundled-libs" || rootFeatures' ? "garage_db/default" || rootFeatures' ? "garage_db/rusqlite" || rootFeatures' ? "garage_db/sqlite" || rootFeatures' ? "garage_model/default" || rootFeatures' ? "garage_model/sqlite" then "rusqlite" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusqlite."0.31.0" { inherit profileName; }).out;
|
||||||
|
|
|
@ -15,6 +15,7 @@ path = "lib.rs"
|
||||||
err-derive.workspace = true
|
err-derive.workspace = true
|
||||||
hexdump.workspace = true
|
hexdump.workspace = true
|
||||||
tracing.workspace = true
|
tracing.workspace = true
|
||||||
|
opentelemetry.workspace = true
|
||||||
|
|
||||||
heed = { workspace = true, optional = true }
|
heed = { workspace = true, optional = true }
|
||||||
rusqlite = { workspace = true, optional = true, features = ["backup"] }
|
rusqlite = { workspace = true, optional = true, features = ["backup"] }
|
||||||
|
|
|
@ -3,6 +3,9 @@ extern crate tracing;
|
||||||
|
|
||||||
#[cfg(feature = "lmdb")]
|
#[cfg(feature = "lmdb")]
|
||||||
pub mod lmdb_adapter;
|
pub mod lmdb_adapter;
|
||||||
|
#[cfg(feature = "lmdb")]
|
||||||
|
pub mod metric_proxy;
|
||||||
|
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
pub mod sqlite_adapter;
|
pub mod sqlite_adapter;
|
||||||
|
|
||||||
|
|
|
@ -39,12 +39,15 @@ pub struct LmdbDb {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LmdbDb {
|
impl LmdbDb {
|
||||||
pub fn init(db: Env) -> Db {
|
pub fn to_wrap(db: Env) -> Self {
|
||||||
let s = Self {
|
Self {
|
||||||
db,
|
db,
|
||||||
trees: RwLock::new((Vec::new(), HashMap::new())),
|
trees: RwLock::new((Vec::new(), HashMap::new())),
|
||||||
};
|
}
|
||||||
Db(Arc::new(s))
|
}
|
||||||
|
|
||||||
|
pub fn init(db: Env) -> Db {
|
||||||
|
Db(Arc::new(Self::to_wrap(db)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_tree(&self, i: usize) -> Result<Database> {
|
fn get_tree(&self, i: usize) -> Result<Database> {
|
||||||
|
@ -226,7 +229,7 @@ impl IDb for LmdbDb {
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
struct LmdbTx<'a> {
|
pub(crate) struct LmdbTx<'a> {
|
||||||
trees: &'a [Database],
|
trees: &'a [Database],
|
||||||
tx: RwTxn<'a, 'a>,
|
tx: RwTxn<'a, 'a>,
|
||||||
}
|
}
|
||||||
|
|
217
src/db/metric_proxy.rs
Normal file
217
src/db/metric_proxy.rs
Normal file
|
@ -0,0 +1,217 @@
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
use crate::lmdb_adapter::LmdbDb;
|
||||||
|
use crate::{
|
||||||
|
Bound, Db, IDb, ITx, ITxFn, OnCommit, Result, TxFnResult, TxOpResult, TxResult, TxValueIter,
|
||||||
|
Value, ValueIter,
|
||||||
|
};
|
||||||
|
use opentelemetry::{
|
||||||
|
global,
|
||||||
|
metrics::{Unit, ValueRecorder},
|
||||||
|
KeyValue,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct MetricDbProxy {
|
||||||
|
//@FIXME Replace with a template
|
||||||
|
db: LmdbDb,
|
||||||
|
op: ValueRecorder<f64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MetricDbProxy {
|
||||||
|
pub fn init(db: LmdbDb) -> Db {
|
||||||
|
let meter = global::meter("garage/db");
|
||||||
|
let s = Self {
|
||||||
|
db,
|
||||||
|
op: meter
|
||||||
|
.f64_value_recorder("db.op")
|
||||||
|
.with_description("Duration and amount of operations on the local metadata engine")
|
||||||
|
.with_unit(Unit::new("ms"))
|
||||||
|
.init(),
|
||||||
|
};
|
||||||
|
Db(Arc::new(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn instrument<T>(
|
||||||
|
&self,
|
||||||
|
fx: impl FnOnce() -> T,
|
||||||
|
op: &'static str,
|
||||||
|
cat: &'static str,
|
||||||
|
tx: &'static str,
|
||||||
|
) -> T {
|
||||||
|
let metric_tags = [
|
||||||
|
KeyValue::new("op", op),
|
||||||
|
KeyValue::new("cat", cat),
|
||||||
|
KeyValue::new("tx", tx),
|
||||||
|
];
|
||||||
|
|
||||||
|
let request_start = Instant::now();
|
||||||
|
let res = fx();
|
||||||
|
let delay_nanos = Instant::now()
|
||||||
|
.saturating_duration_since(request_start)
|
||||||
|
.as_nanos();
|
||||||
|
let delay_millis: f64 = delay_nanos as f64 / 1_000_000f64;
|
||||||
|
self.op.record(delay_millis, &metric_tags);
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IDb for MetricDbProxy {
|
||||||
|
fn engine(&self) -> String {
|
||||||
|
format!("Metric Proxy on {}", self.db.engine())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_tree(&self, name: &str) -> Result<usize> {
|
||||||
|
self.instrument(|| self.db.open_tree(name), "open_tree", "control", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn list_trees(&self) -> Result<Vec<String>> {
|
||||||
|
self.instrument(|| self.db.list_trees(), "list_trees", "control", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot(&self, to: &PathBuf) -> Result<()> {
|
||||||
|
self.instrument(|| self.db.snapshot(to), "snapshot", "control", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
|
||||||
|
self.instrument(|| self.db.get(tree, key), "get", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn len(&self, tree: usize) -> Result<usize> {
|
||||||
|
self.instrument(|| self.db.len(tree), "len", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> {
|
||||||
|
self.instrument(|| self.db.insert(tree, key, value), "insert", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
|
||||||
|
self.instrument(|| self.db.remove(tree, key), "remove", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear(&self, tree: usize) -> Result<()> {
|
||||||
|
self.instrument(|| self.db.clear(tree), "clear", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
|
||||||
|
self.instrument(|| self.db.iter(tree), "iter", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
|
||||||
|
self.instrument(|| self.db.iter_rev(tree), "iter_rev", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range<'r>(
|
||||||
|
&self,
|
||||||
|
tree: usize,
|
||||||
|
low: Bound<&'r [u8]>,
|
||||||
|
high: Bound<&'r [u8]>,
|
||||||
|
) -> Result<ValueIter<'_>> {
|
||||||
|
self.instrument(|| self.db.range(tree, low, high), "range", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range_rev<'r>(
|
||||||
|
&self,
|
||||||
|
tree: usize,
|
||||||
|
low: Bound<&'r [u8]>,
|
||||||
|
high: Bound<&'r [u8]>,
|
||||||
|
) -> Result<ValueIter<'_>> {
|
||||||
|
self.instrument(
|
||||||
|
|| self.db.range_rev(tree, low, high),
|
||||||
|
"range_rev",
|
||||||
|
"data",
|
||||||
|
"no",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
|
||||||
|
self.instrument(
|
||||||
|
|| self.db.transaction(&MetricITxFnProxy { f, metrics: self }),
|
||||||
|
"transaction",
|
||||||
|
"control",
|
||||||
|
"yes",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MetricITxFnProxy<'a> {
|
||||||
|
f: &'a dyn ITxFn,
|
||||||
|
metrics: &'a MetricDbProxy,
|
||||||
|
}
|
||||||
|
impl<'a> ITxFn for MetricITxFnProxy<'a> {
|
||||||
|
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
|
||||||
|
self.f.try_on(&mut MetricTxProxy {
|
||||||
|
tx,
|
||||||
|
metrics: self.metrics,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MetricTxProxy<'a> {
|
||||||
|
tx: &'a mut dyn ITx,
|
||||||
|
metrics: &'a MetricDbProxy,
|
||||||
|
}
|
||||||
|
impl<'a> ITx for MetricTxProxy<'a> {
|
||||||
|
fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.get(tree, key), "get", "data", "yes")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn len(&self, tree: usize) -> TxOpResult<usize> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.len(tree), "len", "data", "yes")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.insert(tree, key, value), "insert", "data", "yes")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.remove(tree, key), "remove", "data", "yes")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear(&mut self, tree: usize) -> TxOpResult<()> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.clear(tree), "clear", "data", "yes")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.iter(tree), "iter", "data", "yes")
|
||||||
|
}
|
||||||
|
fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.iter_rev(tree), "iter_rev", "data", "yes")
|
||||||
|
}
|
||||||
|
fn range<'r>(
|
||||||
|
&self,
|
||||||
|
tree: usize,
|
||||||
|
low: Bound<&'r [u8]>,
|
||||||
|
high: Bound<&'r [u8]>,
|
||||||
|
) -> TxOpResult<TxValueIter<'_>> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.range(tree, low, high), "range", "data", "yes")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range_rev<'r>(
|
||||||
|
&self,
|
||||||
|
tree: usize,
|
||||||
|
low: Bound<&'r [u8]>,
|
||||||
|
high: Bound<&'r [u8]>,
|
||||||
|
) -> TxOpResult<TxValueIter<'_>> {
|
||||||
|
self.metrics.instrument(
|
||||||
|
|| self.tx.range_rev(tree, low, high),
|
||||||
|
"range_rev",
|
||||||
|
"data",
|
||||||
|
"yes",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
|
@ -10,6 +10,7 @@ use crate::{Db, Error, Result};
|
||||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||||
pub enum Engine {
|
pub enum Engine {
|
||||||
Lmdb,
|
Lmdb,
|
||||||
|
LmdbWithMetrics,
|
||||||
Sqlite,
|
Sqlite,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,6 +19,7 @@ impl Engine {
|
||||||
pub fn as_str(&self) -> &'static str {
|
pub fn as_str(&self) -> &'static str {
|
||||||
match self {
|
match self {
|
||||||
Self::Lmdb => "lmdb",
|
Self::Lmdb => "lmdb",
|
||||||
|
Self::LmdbWithMetrics => "lmdb-with-metrics",
|
||||||
Self::Sqlite => "sqlite",
|
Self::Sqlite => "sqlite",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -35,6 +37,7 @@ impl std::str::FromStr for Engine {
|
||||||
fn from_str(text: &str) -> Result<Engine> {
|
fn from_str(text: &str) -> Result<Engine> {
|
||||||
match text {
|
match text {
|
||||||
"lmdb" | "heed" => Ok(Self::Lmdb),
|
"lmdb" | "heed" => Ok(Self::Lmdb),
|
||||||
|
"lmdb-with-metrics" | "heed-with-metrics" => Ok(Self::LmdbWithMetrics),
|
||||||
"sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite),
|
"sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite),
|
||||||
"sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())),
|
"sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())),
|
||||||
kind => Err(Error(
|
kind => Err(Error(
|
||||||
|
@ -74,7 +77,7 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> {
|
||||||
|
|
||||||
// ---- LMDB DB ----
|
// ---- LMDB DB ----
|
||||||
#[cfg(feature = "lmdb")]
|
#[cfg(feature = "lmdb")]
|
||||||
Engine::Lmdb => {
|
Engine::Lmdb | Engine::LmdbWithMetrics => {
|
||||||
info!("Opening LMDB database at: {}", path.display());
|
info!("Opening LMDB database at: {}", path.display());
|
||||||
if let Err(e) = std::fs::create_dir_all(&path) {
|
if let Err(e) = std::fs::create_dir_all(&path) {
|
||||||
return Err(Error(
|
return Err(Error(
|
||||||
|
@ -92,6 +95,7 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> {
|
||||||
env_builder.map_size(map_size);
|
env_builder.map_size(map_size);
|
||||||
env_builder.max_readers(2048);
|
env_builder.max_readers(2048);
|
||||||
unsafe {
|
unsafe {
|
||||||
|
env_builder.flag(crate::lmdb_adapter::heed::flags::Flags::MdbNoRdAhead);
|
||||||
env_builder.flag(crate::lmdb_adapter::heed::flags::Flags::MdbNoMetaSync);
|
env_builder.flag(crate::lmdb_adapter::heed::flags::Flags::MdbNoMetaSync);
|
||||||
if !opt.fsync {
|
if !opt.fsync {
|
||||||
env_builder.flag(heed::flags::Flags::MdbNoSync);
|
env_builder.flag(heed::flags::Flags::MdbNoSync);
|
||||||
|
@ -109,7 +113,13 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> {
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
Err(e) => Err(Error(format!("Cannot open LMDB database: {}", e).into())),
|
Err(e) => Err(Error(format!("Cannot open LMDB database: {}", e).into())),
|
||||||
Ok(db) => Ok(crate::lmdb_adapter::LmdbDb::init(db)),
|
Ok(db) => match engine {
|
||||||
|
Engine::LmdbWithMetrics => {
|
||||||
|
let to_wrap = crate::lmdb_adapter::LmdbDb::to_wrap(db);
|
||||||
|
Ok(crate::metric_proxy::MetricDbProxy::init(to_wrap))
|
||||||
|
}
|
||||||
|
_ => Ok(crate::lmdb_adapter::LmdbDb::init(db)),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -121,7 +121,7 @@ impl Garage {
|
||||||
db::Engine::Sqlite => {
|
db::Engine::Sqlite => {
|
||||||
db_path.push("db.sqlite");
|
db_path.push("db.sqlite");
|
||||||
}
|
}
|
||||||
db::Engine::Lmdb => {
|
db::Engine::Lmdb | db::Engine::LmdbWithMetrics => {
|
||||||
db_path.push("db.lmdb");
|
db_path.push("db.lmdb");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::future::*;
|
use futures::future::*;
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
use opentelemetry::{global, metrics::ValueRecorder, KeyValue};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, watch};
|
||||||
|
@ -62,6 +64,7 @@ pub(crate) struct WorkerProcessor {
|
||||||
stop_signal: watch::Receiver<bool>,
|
stop_signal: watch::Receiver<bool>,
|
||||||
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
|
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
|
||||||
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
|
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
|
||||||
|
metrics: ValueRecorder<f64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WorkerProcessor {
|
impl WorkerProcessor {
|
||||||
|
@ -70,10 +73,15 @@ impl WorkerProcessor {
|
||||||
stop_signal: watch::Receiver<bool>,
|
stop_signal: watch::Receiver<bool>,
|
||||||
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
|
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
let meter = global::meter("garage/util");
|
||||||
Self {
|
Self {
|
||||||
stop_signal,
|
stop_signal,
|
||||||
worker_chan,
|
worker_chan,
|
||||||
worker_info,
|
worker_info,
|
||||||
|
metrics: meter
|
||||||
|
.f64_value_recorder("util.worker_step")
|
||||||
|
.with_description("Duration and amount of worker steps executed")
|
||||||
|
.init(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,6 +111,7 @@ impl WorkerProcessor {
|
||||||
errors: 0,
|
errors: 0,
|
||||||
consecutive_errors: 0,
|
consecutive_errors: 0,
|
||||||
last_error: None,
|
last_error: None,
|
||||||
|
metrics: self.metrics.clone(),
|
||||||
};
|
};
|
||||||
workers.push(async move {
|
workers.push(async move {
|
||||||
worker.step().await;
|
worker.step().await;
|
||||||
|
@ -183,10 +192,13 @@ struct WorkerHandler {
|
||||||
errors: usize,
|
errors: usize,
|
||||||
consecutive_errors: usize,
|
consecutive_errors: usize,
|
||||||
last_error: Option<(String, u64)>,
|
last_error: Option<(String, u64)>,
|
||||||
|
metrics: ValueRecorder<f64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WorkerHandler {
|
impl WorkerHandler {
|
||||||
async fn step(&mut self) {
|
async fn step(&mut self) {
|
||||||
|
let request_start = Instant::now();
|
||||||
|
//@FIXME we also want to track errors in metrics but I don't know how yet.
|
||||||
match self.state {
|
match self.state {
|
||||||
WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await {
|
WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await {
|
||||||
Ok(s) => {
|
Ok(s) => {
|
||||||
|
@ -229,5 +241,17 @@ impl WorkerHandler {
|
||||||
}
|
}
|
||||||
WorkerState::Done => unreachable!(),
|
WorkerState::Done => unreachable!(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// metrics
|
||||||
|
let metric_tags = [
|
||||||
|
KeyValue::new("state", self.state.to_string()),
|
||||||
|
KeyValue::new("name", self.worker.name()),
|
||||||
|
KeyValue::new("id", format!("{}", self.task_id)),
|
||||||
|
];
|
||||||
|
|
||||||
|
let delay_secs = Instant::now()
|
||||||
|
.saturating_duration_since(request_start)
|
||||||
|
.as_secs_f64();
|
||||||
|
self.metrics.record(delay_secs, &metric_tags);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue