Implement a RocksDB backend for benchmarking

This commit is contained in:
Withings 2024-10-23 11:46:37 +02:00
parent a18b3f0d1f
commit bede3b5834
Signed by: withings
GPG key ID: 7778B323E465AABB
11 changed files with 1404 additions and 615 deletions

View file

@ -37,6 +37,8 @@ steps:
- GARAGE_TEST_INTEGRATION_DB_ENGINE=lmdb ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
- nix-shell --attr ci --run "killall -9 garage" || true
- GARAGE_TEST_INTEGRATION_DB_ENGINE=sqlite ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
- nix-shell --attr ci --run "killall -9 garage" || true
- GARAGE_TEST_INTEGRATION_DB_ENGINE=rocksdb ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
- rm result
- rm -rv tmp-garage-integration

1432
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -61,6 +61,7 @@ md-5 = "0.10"
mktemp = "0.5"
nix = { version = "0.27", default-features = false, features = ["fs"] }
nom = "7.1"
num_cpus = "1.0"
parse_duration = "2.1"
pin-project = "1.0.12"
pnet_datalink = "0.34"
@ -85,6 +86,7 @@ heed = { version = "0.11", default-features = false, features = ["lmdb"] }
rusqlite = "0.31.0"
r2d2 = "0.8"
r2d2_sqlite = "0.24"
rocksdb = { version = "0.22", features = ["multi-threaded-cf"] }
async-compression = { version = "0.4", features = ["tokio", "zstd"] }
zstd = { version = "0.13", default-features = false }

View file

@ -15,11 +15,14 @@ path = "lib.rs"
err-derive.workspace = true
hexdump.workspace = true
tracing.workspace = true
opentelemetry.workspace = true
heed = { workspace = true, optional = true }
rusqlite = { workspace = true, optional = true, features = ["backup"] }
r2d2 = { workspace = true, optional = true }
r2d2_sqlite = { workspace = true, optional = true }
rocksdb = { workspace = true, optional = true }
num_cpus = { workspace = true, optional = true }
[dev-dependencies]
mktemp.workspace = true
@ -29,3 +32,4 @@ default = [ "lmdb", "sqlite" ]
bundled-libs = [ "rusqlite?/bundled" ]
lmdb = [ "heed" ]
sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ]
rocksdb = [ "dep:rocksdb", "dep:num_cpus" ]

View file

@ -5,6 +5,10 @@ extern crate tracing;
pub mod lmdb_adapter;
#[cfg(feature = "sqlite")]
pub mod sqlite_adapter;
#[cfg(feature = "rocksdb")]
pub mod rocksdb_adapter;
#[cfg(feature = "rocksdb")]
pub mod metric_proxy;
pub mod open;

217
src/db/metric_proxy.rs Normal file
View file

@ -0,0 +1,217 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use crate::rocksdb_adapter::RocksDb;
use crate::{
Bound, Db, IDb, ITx, ITxFn, OnCommit, Result, TxFnResult, TxOpResult, TxResult, TxValueIter,
Value, ValueIter,
};
use opentelemetry::{
global,
metrics::{Unit, ValueRecorder},
KeyValue,
};
pub struct MetricDbProxy {
//@FIXME Replace with a template
db: RocksDb,
op: ValueRecorder<f64>,
}
impl MetricDbProxy {
pub fn init(db: RocksDb) -> Db {
let meter = global::meter("garage/db");
let s = Self {
db,
op: meter
.f64_value_recorder("db.op")
.with_description("Duration and amount of operations on the local metadata engine")
.with_unit(Unit::new("ms"))
.init(),
};
Db(Arc::new(s))
}
fn instrument<T>(
&self,
fx: impl FnOnce() -> T,
op: &'static str,
cat: &'static str,
tx: &'static str,
) -> T {
let metric_tags = [
KeyValue::new("op", op),
KeyValue::new("cat", cat),
KeyValue::new("tx", tx),
];
let request_start = Instant::now();
let res = fx();
let delay_nanos = Instant::now()
.saturating_duration_since(request_start)
.as_nanos();
let delay_millis: f64 = delay_nanos as f64 / 1_000_000f64;
self.op.record(delay_millis, &metric_tags);
res
}
}
impl IDb for MetricDbProxy {
fn engine(&self) -> String {
format!("Metric Proxy on {}", self.db.engine())
}
fn open_tree(&self, name: &str) -> Result<usize> {
self.instrument(|| self.db.open_tree(name), "open_tree", "control", "no")
}
fn list_trees(&self) -> Result<Vec<String>> {
self.instrument(|| self.db.list_trees(), "list_trees", "control", "no")
}
fn snapshot(&self, to: &PathBuf) -> Result<()> {
self.instrument(|| self.db.snapshot(to), "snapshot", "control", "no")
}
// ---
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
self.instrument(|| self.db.get(tree, key), "get", "data", "no")
}
fn len(&self, tree: usize) -> Result<usize> {
self.instrument(|| self.db.len(tree), "len", "data", "no")
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
self.instrument(|| self.db.insert(tree, key, value), "insert", "data", "no")
}
fn remove(&self, tree: usize, key: &[u8]) -> Result<()> {
self.instrument(|| self.db.remove(tree, key), "remove", "data", "no")
}
fn clear(&self, tree: usize) -> Result<()> {
self.instrument(|| self.db.clear(tree), "clear", "data", "no")
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
self.instrument(|| self.db.iter(tree), "iter", "data", "no")
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
self.instrument(|| self.db.iter_rev(tree), "iter_rev", "data", "no")
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
self.instrument(|| self.db.range(tree, low, high), "range", "data", "no")
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
self.instrument(
|| self.db.range_rev(tree, low, high),
"range_rev",
"data",
"no",
)
}
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
self.instrument(
|| self.db.transaction(&MetricITxFnProxy { f, metrics: self }),
"transaction",
"control",
"yes",
)
}
}
struct MetricITxFnProxy<'a> {
f: &'a dyn ITxFn,
metrics: &'a MetricDbProxy,
}
impl<'a> ITxFn for MetricITxFnProxy<'a> {
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
self.f.try_on(&mut MetricTxProxy {
tx,
metrics: self.metrics,
})
}
}
struct MetricTxProxy<'a> {
tx: &'a mut dyn ITx,
metrics: &'a MetricDbProxy,
}
impl<'a> ITx for MetricTxProxy<'a> {
fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
self.metrics
.instrument(|| self.tx.get(tree, key), "get", "data", "yes")
}
fn len(&self, tree: usize) -> TxOpResult<usize> {
self.metrics
.instrument(|| self.tx.len(tree), "len", "data", "yes")
}
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<()> {
self.metrics
.instrument(|| self.tx.insert(tree, key, value), "insert", "data", "yes")
}
fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<()> {
self.metrics
.instrument(|| self.tx.remove(tree, key), "remove", "data", "yes")
}
fn clear(&mut self, tree: usize) -> TxOpResult<()> {
self.metrics
.instrument(|| self.tx.clear(tree), "clear", "data", "yes")
}
fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
self.metrics
.instrument(|| self.tx.iter(tree), "iter", "data", "yes")
}
fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
self.metrics
.instrument(|| self.tx.iter_rev(tree), "iter_rev", "data", "yes")
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
self.metrics
.instrument(|| self.tx.range(tree, low, high), "range", "data", "yes")
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
self.metrics.instrument(
|| self.tx.range_rev(tree, low, high),
"range_rev",
"data",
"yes",
)
}
}

View file

@ -11,6 +11,8 @@ use crate::{Db, Error, Result};
pub enum Engine {
Lmdb,
Sqlite,
RocksDb,
RocksDbWithMetrics,
}
impl Engine {
@ -19,6 +21,8 @@ impl Engine {
match self {
Self::Lmdb => "lmdb",
Self::Sqlite => "sqlite",
Self::RocksDb => "rocksdb",
Self::RocksDbWithMetrics => "rocksdb-with-metrics",
}
}
}
@ -36,6 +40,8 @@ impl std::str::FromStr for Engine {
match text {
"lmdb" | "heed" => Ok(Self::Lmdb),
"sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite),
"rocksdb" => Ok(Self::RocksDb),
"rocksdb-with-metrics" => Ok(Self::RocksDbWithMetrics),
"sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())),
kind => Err(Error(
format!(
@ -114,6 +120,31 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> {
}
}
// ---- RocksDB ----
#[cfg(feature = "rocksdb")]
Engine::RocksDb => {
info!("Opening RocksDb database at: {}", path.display());
let mut options = rocksdb::Options::default();
options.increase_parallelism(num_cpus::get() as i32);
options.create_if_missing(true);
options.set_compression_type(rocksdb::DBCompressionType::Zstd);
let txn_options = rocksdb::TransactionDBOptions::default();
Ok(crate::rocksdb_adapter::RocksDb::init(options, txn_options, path))
}
// ---- RocksDB ----
#[cfg(feature = "rocksdb")]
Engine::RocksDbWithMetrics => {
info!("Opening RocksDb database at: {}", path.display());
let mut options = rocksdb::Options::default();
options.increase_parallelism(num_cpus::get() as i32);
options.create_if_missing(true);
options.set_compression_type(rocksdb::DBCompressionType::Zstd);
let txn_options = rocksdb::TransactionDBOptions::default();
let db = crate::rocksdb_adapter::RocksDb::new(options, txn_options, path);
Ok(crate::metric_proxy::MetricDbProxy::init(db))
}
// Pattern is unreachable when all supported DB engines are compiled into binary. The allow
// attribute is added so that we won't have to change this match in case stop building
// support for one or more engines by default.

322
src/db/rocksdb_adapter.rs Normal file
View file

@ -0,0 +1,322 @@
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::collections::HashMap;
use core::ops::Bound;
use rocksdb::{self as rks, BoundColumnFamily, Direction, IteratorMode, MultiThreaded, Options, Transaction, TransactionDB, TransactionDBOptions};
use crate::{
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
TxResult, TxValueIter, Value, ValueIter,
};
pub struct RocksDb {
db: TransactionDB<MultiThreaded>,
column_families: RwLock<(Vec<String>, HashMap<String, usize>)>,
}
impl RocksDb {
pub fn init(options: Options, txn_options: TransactionDBOptions, path: &PathBuf) -> Db {
Db(Arc::new(Self::new(options, txn_options, path)))
}
pub fn new(options: Options, txn_options: TransactionDBOptions, path: &PathBuf) -> RocksDb {
let db = TransactionDB::open(&options, &txn_options, path).unwrap();
let existing_cf = TransactionDB::<MultiThreaded>::list_cf(&Options::default(), path).unwrap();
let existing_cf_map = existing_cf.iter().enumerate().map(|(i, n)| (n.clone(), i)).collect();
Self {
db,
column_families: RwLock::new((existing_cf, existing_cf_map)),
}
}
fn get_cf_handle(&self, tree: usize) -> Result<Arc<BoundColumnFamily>> {
let column_families = self.column_families.read().unwrap();
let name = column_families.0.get(tree);
name.map(|n| self.db.cf_handle(n)).flatten().ok_or(
Error("trying to acquire a handle on a non-existing column family".into())
)
}
}
impl IDb for RocksDb {
fn engine(&self) -> String {
"rocksdb".into()
}
fn open_tree(&self, name: &str) -> Result<usize> {
let mut column_families = self.column_families.write().unwrap();
let open_tree = column_families.1.get(name);
if let Some(i) = open_tree {
Ok(*i)
} else {
self.db.create_cf(name, &Options::default())?;
column_families.0.push(name.to_string());
let i = column_families.0.len() - 1;
column_families.1.insert(name.to_string(), i);
Ok(i)
}
}
fn list_trees(&self) -> Result<Vec<String>> {
Ok(self.column_families.read().unwrap().0.clone())
}
fn snapshot(&self, path: &PathBuf) -> Result<()> {
todo!("snapshots for RocksDB");
}
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(self.db.get_cf(&cf_handle, key)?)
}
fn len(&self, tree: usize) -> Result<usize> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(self.db.iterator_cf(&cf_handle, IteratorMode::Start).count())
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(self.db.put_cf(&cf_handle, key, value)?)
}
fn remove(&self, tree: usize, key: &[u8]) -> Result<()> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(self.db.delete_cf(&cf_handle, key)?)
}
fn clear(&self, tree: usize) -> Result<()> {
let column_families = self.column_families.write().unwrap(); // locking against open_tree
let tree_name = column_families.0.get(tree).ok_or(
Error("trying to clear a non-existing column family".into())
)?;
self.db.drop_cf(tree_name)?;
self.db.create_cf(tree_name, &Options::default())?;
Ok(())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(Box::new(
self.db.iterator_cf(&cf_handle, IteratorMode::Start)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()))
))
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(Box::new(
self.db.iterator_cf(&cf_handle, IteratorMode::End)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()))
))
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
let cf_handle = self.get_cf_handle(tree)?;
let start_mode = match low {
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Forward),
Bound::Unbounded => IteratorMode::Start,
};
let base_iterator = self.db.iterator_cf(&cf_handle, start_mode)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()));
let stop_value = match high {
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
Bound::Unbounded => return Ok(Box::new(base_iterator)),
};
Ok(Box::new(
base_iterator
.take_while(move |r| r.as_ref().map(|(k, _v)| *k < stop_value).unwrap_or(false))
))
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
let cf_handle = self.get_cf_handle(tree)?;
let start_mode = match high {
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Reverse),
Bound::Unbounded => IteratorMode::End,
};
let base_iterator = self.db.iterator_cf(&cf_handle, start_mode)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()));
let stop_value = match low {
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
Bound::Unbounded => return Ok(Box::new(base_iterator)),
};
Ok(Box::new(
base_iterator
.take_while(move |r| r.as_ref().map(|(k, _v)| *k >= stop_value).unwrap_or(false))
))
}
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
let txn = self.db.transaction();
let mut tx = RocksTx {
db: &self,
txn,
};
match f.try_on(&mut tx) {
TxFnResult::Ok(on_commit) => {
tx.txn.commit().map_err(Error::from).map_err(TxError::Db)?;
Ok(on_commit)
}
TxFnResult::Abort => {
tx.txn.rollback().map_err(Error::from).map_err(TxError::Db)?;
Err(TxError::Abort(()))
}
TxFnResult::DbErr => {
tx.txn.rollback().map_err(Error::from).map_err(TxError::Db)?;
Err(TxError::Db(Error(
"(this message will be discarded)".into(),
)))
}
}
}
}
pub struct RocksTx<'a> {
db: &'a RocksDb,
txn: Transaction<'a, TransactionDB<MultiThreaded>>,
}
impl<'a> ITx for RocksTx<'a> {
fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(self.txn.get_cf(&cf_handle, key)?)
}
fn len(&self, tree: usize) -> TxOpResult<usize> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(self.txn.iterator_cf(&cf_handle, IteratorMode::Start).count())
}
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<()> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(self.txn.put_cf(&cf_handle, key, value)?)
}
fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<()> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(self.txn.delete_cf(&cf_handle, key)?)
}
fn clear(&mut self, _tree: usize) -> TxOpResult<()> {
unimplemented!("transactional column family clear not supported in RocksDB")
}
fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(Box::new(
self.txn.iterator_cf(&cf_handle, IteratorMode::Start)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()))
))
}
fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(Box::new(
self.txn.iterator_cf(&cf_handle, IteratorMode::End)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()))
))
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
let cf_handle = self.db.get_cf_handle(tree)?;
let start_mode = match low {
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Forward),
Bound::Unbounded => IteratorMode::Start,
};
let base_iterator = self.txn.iterator_cf(&cf_handle, start_mode)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()));
let stop_value = match high {
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
Bound::Unbounded => return Ok(Box::new(base_iterator)),
};
Ok(Box::new(
base_iterator
.take_while(move |r| r.as_ref().map(|(k, _v)| *k < stop_value).unwrap_or(false))
))
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
let cf_handle = self.db.get_cf_handle(tree)?;
let start_mode = match high {
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Reverse),
Bound::Unbounded => IteratorMode::End,
};
let base_iterator = self.txn.iterator_cf(&cf_handle, start_mode)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()));
let stop_value = match low {
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
Bound::Unbounded => return Ok(Box::new(base_iterator)),
};
Ok(Box::new(
base_iterator
.take_while(move |r| r.as_ref().map(|(k, _v)| *k >= stop_value).unwrap_or(false))
))
}
}
impl From<rks::Error> for Error {
fn from(e: rks::Error) -> Error {
Error(format!("RocksDB: {}", e).into())
}
}
impl From<rks::Error> for TxOpError {
fn from(e: rks::Error) -> TxOpError {
TxOpError(e.into())
}
}
impl From<Error> for TxOpError {
fn from(e: Error) -> TxOpError {
TxOpError(e.into())
}
}

View file

@ -89,6 +89,7 @@ k2v = [ "garage_util/k2v", "garage_api/k2v" ]
# Database engines
lmdb = [ "garage_model/lmdb" ]
sqlite = [ "garage_model/sqlite" ]
rocksdb = [ "garage_model/rocksdb" ]
# Automatic registration and discovery via Consul API
consul-discovery = [ "garage_rpc/consul-discovery" ]

View file

@ -47,3 +47,4 @@ default = [ "lmdb", "sqlite" ]
k2v = [ "garage_util/k2v" ]
lmdb = [ "garage_db/lmdb" ]
sqlite = [ "garage_db/sqlite" ]
rocksdb = [ "garage_db/rocksdb" ]

View file

@ -124,6 +124,9 @@ impl Garage {
db::Engine::Lmdb => {
db_path.push("db.lmdb");
}
db::Engine::RocksDb | db::Engine::RocksDbWithMetrics => {
db_path.push("db.rocksdb");
}
}
let db_opt = db::OpenOpt {
fsync: config.metadata_fsync,