Compare commits

..

1 commit

Author SHA1 Message Date
8b3edd86e4
Implement a RocksDB backend for benchmarking 2024-11-22 10:19:54 +01:00
18 changed files with 1440 additions and 685 deletions

View file

@ -37,6 +37,8 @@ steps:
- GARAGE_TEST_INTEGRATION_DB_ENGINE=lmdb ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false) - GARAGE_TEST_INTEGRATION_DB_ENGINE=lmdb ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
- nix-shell --attr ci --run "killall -9 garage" || true - nix-shell --attr ci --run "killall -9 garage" || true
- GARAGE_TEST_INTEGRATION_DB_ENGINE=sqlite ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false) - GARAGE_TEST_INTEGRATION_DB_ENGINE=sqlite ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
- nix-shell --attr ci --run "killall -9 garage" || true
- GARAGE_TEST_INTEGRATION_DB_ENGINE=rocksdb ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
- rm result - rm result
- rm -rv tmp-garage-integration - rm -rv tmp-garage-integration

View file

@ -9,11 +9,11 @@ depends_on:
steps: steps:
- name: refresh-index - name: refresh-index
image: nixpkgs/nix:nixos-22.05 image: nixpkgs/nix:nixos-22.05
environment: secrets:
AWS_ACCESS_KEY_ID: - source: garagehq_aws_access_key_id
from_secret: garagehq_aws_access_key_id target: AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY: - source: garagehq_aws_secret_access_key
from_secret: garagehq_aws_secret_access_key target: AWS_SECRET_ACCESS_KEY
commands: commands:
- mkdir -p /etc/nix && cp nix/nix.conf /etc/nix/nix.conf - mkdir -p /etc/nix && cp nix/nix.conf /etc/nix/nix.conf
- nix-shell --attr ci --run "refresh_index" - nix-shell --attr ci --run "refresh_index"

View file

@ -48,10 +48,11 @@ steps:
image: nixpkgs/nix:nixos-22.05 image: nixpkgs/nix:nixos-22.05
environment: environment:
TARGET: "${TARGET}" TARGET: "${TARGET}"
AWS_ACCESS_KEY_ID: secrets:
from_secret: garagehq_aws_access_key_id - source: garagehq_aws_access_key_id
AWS_SECRET_ACCESS_KEY: target: AWS_ACCESS_KEY_ID
from_secret: garagehq_aws_secret_access_key - source: garagehq_aws_secret_access_key
target: AWS_SECRET_ACCESS_KEY
commands: commands:
- nix-shell --attr ci --run "to_s3" - nix-shell --attr ci --run "to_s3"

1432
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -61,6 +61,7 @@ md-5 = "0.10"
mktemp = "0.5" mktemp = "0.5"
nix = { version = "0.27", default-features = false, features = ["fs"] } nix = { version = "0.27", default-features = false, features = ["fs"] }
nom = "7.1" nom = "7.1"
num_cpus = "1.0"
parse_duration = "2.1" parse_duration = "2.1"
pin-project = "1.0.12" pin-project = "1.0.12"
pnet_datalink = "0.34" pnet_datalink = "0.34"
@ -85,6 +86,7 @@ heed = { version = "0.11", default-features = false, features = ["lmdb"] }
rusqlite = "0.31.0" rusqlite = "0.31.0"
r2d2 = "0.8" r2d2 = "0.8"
r2d2_sqlite = "0.24" r2d2_sqlite = "0.24"
rocksdb = { version = "0.22", features = ["multi-threaded-cf"] }
async-compression = { version = "0.4", features = ["tokio", "zstd"] } async-compression = { version = "0.4", features = ["tokio", "zstd"] }
zstd = { version = "0.13", default-features = false } zstd = { version = "0.13", default-features = false }

View file

@ -16,7 +16,6 @@ data_dir = "/var/lib/garage/data"
metadata_fsync = true metadata_fsync = true
data_fsync = false data_fsync = false
disable_scrub = false disable_scrub = false
use_local_tz = false
metadata_auto_snapshot_interval = "6h" metadata_auto_snapshot_interval = "6h"
db_engine = "lmdb" db_engine = "lmdb"
@ -100,7 +99,6 @@ Top-level configuration options:
[`data_fsync`](#data_fsync), [`data_fsync`](#data_fsync),
[`db_engine`](#db_engine), [`db_engine`](#db_engine),
[`disable_scrub`](#disable_scrub), [`disable_scrub`](#disable_scrub),
[`use_local_tz`](#use_local_tz),
[`lmdb_map_size`](#lmdb_map_size), [`lmdb_map_size`](#lmdb_map_size),
[`metadata_auto_snapshot_interval`](#metadata_auto_snapshot_interval), [`metadata_auto_snapshot_interval`](#metadata_auto_snapshot_interval),
[`metadata_dir`](#metadata_dir), [`metadata_dir`](#metadata_dir),
@ -429,13 +427,6 @@ you should delete it from the data directory and then call `garage repair
blocks` on the node to ensure that it re-obtains a copy from another node on blocks` on the node to ensure that it re-obtains a copy from another node on
the network. the network.
#### `use_local_tz` {#use_local_tz}
By default, Garage runs the lifecycle worker every day at midnight in UTC. Set the
`use_local_tz` configuration value to `true` if you want Garage to run the
lifecycle worker at midnight in your local timezone. If you have multiple nodes,
you should also ensure that each node has the same timezone configuration.
#### `block_size` {#block_size} #### `block_size` {#block_size}
Garage splits stored objects in consecutive chunks of size `block_size` Garage splits stored objects in consecutive chunks of size `block_size`

View file

@ -57,6 +57,22 @@
} }
}, },
"nixpkgs": { "nixpkgs": {
"locked": {
"lastModified": 1724395761,
"narHash": "sha256-zRkDV/nbrnp3Y8oCADf5ETl1sDrdmAW6/bBVJ8EbIdQ=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "ae815cee91b417be55d43781eb4b73ae1ecc396c",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": { "locked": {
"lastModified": 1724681257, "lastModified": 1724681257,
"narHash": "sha256-EJRuc5Qp7yfXko5ZNeEMYAs4DzAvkCyALuJ/tGllhN4=", "narHash": "sha256-EJRuc5Qp7yfXko5ZNeEMYAs4DzAvkCyALuJ/tGllhN4=",
@ -80,15 +96,12 @@
"cargo2nix", "cargo2nix",
"flake-utils" "flake-utils"
], ],
"nixpkgs": "nixpkgs" "nixpkgs": "nixpkgs_2"
} }
}, },
"rust-overlay": { "rust-overlay": {
"inputs": { "inputs": {
"nixpkgs": [ "nixpkgs": "nixpkgs"
"cargo2nix",
"nixpkgs"
]
}, },
"locked": { "locked": {
"lastModified": 1724638882, "lastModified": 1724638882,
@ -101,7 +114,6 @@
"original": { "original": {
"owner": "oxalica", "owner": "oxalica",
"repo": "rust-overlay", "repo": "rust-overlay",
"rev": "19b70f147b9c67a759e35824b241f1ed92e46694",
"type": "github" "type": "github"
} }
} }

View file

@ -15,11 +15,14 @@ path = "lib.rs"
err-derive.workspace = true err-derive.workspace = true
hexdump.workspace = true hexdump.workspace = true
tracing.workspace = true tracing.workspace = true
opentelemetry.workspace = true
heed = { workspace = true, optional = true } heed = { workspace = true, optional = true }
rusqlite = { workspace = true, optional = true, features = ["backup"] } rusqlite = { workspace = true, optional = true, features = ["backup"] }
r2d2 = { workspace = true, optional = true } r2d2 = { workspace = true, optional = true }
r2d2_sqlite = { workspace = true, optional = true } r2d2_sqlite = { workspace = true, optional = true }
rocksdb = { workspace = true, optional = true }
num_cpus = { workspace = true, optional = true }
[dev-dependencies] [dev-dependencies]
mktemp.workspace = true mktemp.workspace = true
@ -29,3 +32,4 @@ default = [ "lmdb", "sqlite" ]
bundled-libs = [ "rusqlite?/bundled" ] bundled-libs = [ "rusqlite?/bundled" ]
lmdb = [ "heed" ] lmdb = [ "heed" ]
sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ] sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ]
rocksdb = [ "dep:rocksdb", "dep:num_cpus" ]

View file

@ -5,6 +5,10 @@ extern crate tracing;
pub mod lmdb_adapter; pub mod lmdb_adapter;
#[cfg(feature = "sqlite")] #[cfg(feature = "sqlite")]
pub mod sqlite_adapter; pub mod sqlite_adapter;
#[cfg(feature = "rocksdb")]
pub mod rocksdb_adapter;
#[cfg(feature = "rocksdb")]
pub mod metric_proxy;
pub mod open; pub mod open;

217
src/db/metric_proxy.rs Normal file
View file

@ -0,0 +1,217 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use crate::rocksdb_adapter::RocksDb;
use crate::{
Bound, Db, IDb, ITx, ITxFn, OnCommit, Result, TxFnResult, TxOpResult, TxResult, TxValueIter,
Value, ValueIter,
};
use opentelemetry::{
global,
metrics::{Unit, ValueRecorder},
KeyValue,
};
pub struct MetricDbProxy {
//@FIXME Replace with a template
db: RocksDb,
op: ValueRecorder<f64>,
}
impl MetricDbProxy {
pub fn init(db: RocksDb) -> Db {
let meter = global::meter("garage/db");
let s = Self {
db,
op: meter
.f64_value_recorder("db.op")
.with_description("Duration and amount of operations on the local metadata engine")
.with_unit(Unit::new("ms"))
.init(),
};
Db(Arc::new(s))
}
fn instrument<T>(
&self,
fx: impl FnOnce() -> T,
op: &'static str,
cat: &'static str,
tx: &'static str,
) -> T {
let metric_tags = [
KeyValue::new("op", op),
KeyValue::new("cat", cat),
KeyValue::new("tx", tx),
];
let request_start = Instant::now();
let res = fx();
let delay_nanos = Instant::now()
.saturating_duration_since(request_start)
.as_nanos();
let delay_millis: f64 = delay_nanos as f64 / 1_000_000f64;
self.op.record(delay_millis, &metric_tags);
res
}
}
impl IDb for MetricDbProxy {
fn engine(&self) -> String {
format!("Metric Proxy on {}", self.db.engine())
}
fn open_tree(&self, name: &str) -> Result<usize> {
self.instrument(|| self.db.open_tree(name), "open_tree", "control", "no")
}
fn list_trees(&self) -> Result<Vec<String>> {
self.instrument(|| self.db.list_trees(), "list_trees", "control", "no")
}
fn snapshot(&self, to: &PathBuf) -> Result<()> {
self.instrument(|| self.db.snapshot(to), "snapshot", "control", "no")
}
// ---
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
self.instrument(|| self.db.get(tree, key), "get", "data", "no")
}
fn len(&self, tree: usize) -> Result<usize> {
self.instrument(|| self.db.len(tree), "len", "data", "no")
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
self.instrument(|| self.db.insert(tree, key, value), "insert", "data", "no")
}
fn remove(&self, tree: usize, key: &[u8]) -> Result<()> {
self.instrument(|| self.db.remove(tree, key), "remove", "data", "no")
}
fn clear(&self, tree: usize) -> Result<()> {
self.instrument(|| self.db.clear(tree), "clear", "data", "no")
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
self.instrument(|| self.db.iter(tree), "iter", "data", "no")
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
self.instrument(|| self.db.iter_rev(tree), "iter_rev", "data", "no")
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
self.instrument(|| self.db.range(tree, low, high), "range", "data", "no")
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
self.instrument(
|| self.db.range_rev(tree, low, high),
"range_rev",
"data",
"no",
)
}
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
self.instrument(
|| self.db.transaction(&MetricITxFnProxy { f, metrics: self }),
"transaction",
"control",
"yes",
)
}
}
struct MetricITxFnProxy<'a> {
f: &'a dyn ITxFn,
metrics: &'a MetricDbProxy,
}
impl<'a> ITxFn for MetricITxFnProxy<'a> {
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
self.f.try_on(&mut MetricTxProxy {
tx,
metrics: self.metrics,
})
}
}
struct MetricTxProxy<'a> {
tx: &'a mut dyn ITx,
metrics: &'a MetricDbProxy,
}
impl<'a> ITx for MetricTxProxy<'a> {
fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
self.metrics
.instrument(|| self.tx.get(tree, key), "get", "data", "yes")
}
fn len(&self, tree: usize) -> TxOpResult<usize> {
self.metrics
.instrument(|| self.tx.len(tree), "len", "data", "yes")
}
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<()> {
self.metrics
.instrument(|| self.tx.insert(tree, key, value), "insert", "data", "yes")
}
fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<()> {
self.metrics
.instrument(|| self.tx.remove(tree, key), "remove", "data", "yes")
}
fn clear(&mut self, tree: usize) -> TxOpResult<()> {
self.metrics
.instrument(|| self.tx.clear(tree), "clear", "data", "yes")
}
fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
self.metrics
.instrument(|| self.tx.iter(tree), "iter", "data", "yes")
}
fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
self.metrics
.instrument(|| self.tx.iter_rev(tree), "iter_rev", "data", "yes")
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
self.metrics
.instrument(|| self.tx.range(tree, low, high), "range", "data", "yes")
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
self.metrics.instrument(
|| self.tx.range_rev(tree, low, high),
"range_rev",
"data",
"yes",
)
}
}

View file

@ -11,6 +11,8 @@ use crate::{Db, Error, Result};
pub enum Engine { pub enum Engine {
Lmdb, Lmdb,
Sqlite, Sqlite,
RocksDb,
RocksDbWithMetrics,
} }
impl Engine { impl Engine {
@ -19,6 +21,8 @@ impl Engine {
match self { match self {
Self::Lmdb => "lmdb", Self::Lmdb => "lmdb",
Self::Sqlite => "sqlite", Self::Sqlite => "sqlite",
Self::RocksDb => "rocksdb",
Self::RocksDbWithMetrics => "rocksdb-with-metrics",
} }
} }
} }
@ -36,6 +40,8 @@ impl std::str::FromStr for Engine {
match text { match text {
"lmdb" | "heed" => Ok(Self::Lmdb), "lmdb" | "heed" => Ok(Self::Lmdb),
"sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite), "sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite),
"rocksdb" => Ok(Self::RocksDb),
"rocksdb-with-metrics" => Ok(Self::RocksDbWithMetrics),
"sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())), "sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())),
kind => Err(Error( kind => Err(Error(
format!( format!(
@ -114,6 +120,31 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> {
} }
} }
// ---- RocksDB ----
#[cfg(feature = "rocksdb")]
Engine::RocksDb => {
info!("Opening RocksDb database at: {}", path.display());
let mut options = rocksdb::Options::default();
options.increase_parallelism(num_cpus::get() as i32);
options.create_if_missing(true);
options.set_compression_type(rocksdb::DBCompressionType::Zstd);
let txn_options = rocksdb::TransactionDBOptions::default();
Ok(crate::rocksdb_adapter::RocksDb::init(options, txn_options, path))
}
// ---- RocksDB ----
#[cfg(feature = "rocksdb")]
Engine::RocksDbWithMetrics => {
info!("Opening RocksDb database at: {}", path.display());
let mut options = rocksdb::Options::default();
options.increase_parallelism(num_cpus::get() as i32);
options.create_if_missing(true);
options.set_compression_type(rocksdb::DBCompressionType::Zstd);
let txn_options = rocksdb::TransactionDBOptions::default();
let db = crate::rocksdb_adapter::RocksDb::new(options, txn_options, path);
Ok(crate::metric_proxy::MetricDbProxy::init(db))
}
// Pattern is unreachable when all supported DB engines are compiled into binary. The allow // Pattern is unreachable when all supported DB engines are compiled into binary. The allow
// attribute is added so that we won't have to change this match in case stop building // attribute is added so that we won't have to change this match in case stop building
// support for one or more engines by default. // support for one or more engines by default.

322
src/db/rocksdb_adapter.rs Normal file
View file

@ -0,0 +1,322 @@
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::collections::HashMap;
use core::ops::Bound;
use rocksdb::{self as rks, BoundColumnFamily, Direction, IteratorMode, MultiThreaded, Options, Transaction, TransactionDB, TransactionDBOptions};
use crate::{
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
TxResult, TxValueIter, Value, ValueIter,
};
pub struct RocksDb {
db: TransactionDB<MultiThreaded>,
column_families: RwLock<(Vec<String>, HashMap<String, usize>)>,
}
impl RocksDb {
pub fn init(options: Options, txn_options: TransactionDBOptions, path: &PathBuf) -> Db {
Db(Arc::new(Self::new(options, txn_options, path)))
}
pub fn new(options: Options, txn_options: TransactionDBOptions, path: &PathBuf) -> RocksDb {
let db = TransactionDB::open(&options, &txn_options, path).unwrap();
let existing_cf = TransactionDB::<MultiThreaded>::list_cf(&Options::default(), path).unwrap();
let existing_cf_map = existing_cf.iter().enumerate().map(|(i, n)| (n.clone(), i)).collect();
Self {
db,
column_families: RwLock::new((existing_cf, existing_cf_map)),
}
}
fn get_cf_handle(&self, tree: usize) -> Result<Arc<BoundColumnFamily>> {
let column_families = self.column_families.read().unwrap();
let name = column_families.0.get(tree);
name.map(|n| self.db.cf_handle(n)).flatten().ok_or(
Error("trying to acquire a handle on a non-existing column family".into())
)
}
}
impl IDb for RocksDb {
fn engine(&self) -> String {
"rocksdb".into()
}
fn open_tree(&self, name: &str) -> Result<usize> {
let mut column_families = self.column_families.write().unwrap();
let open_tree = column_families.1.get(name);
if let Some(i) = open_tree {
Ok(*i)
} else {
self.db.create_cf(name, &Options::default())?;
column_families.0.push(name.to_string());
let i = column_families.0.len() - 1;
column_families.1.insert(name.to_string(), i);
Ok(i)
}
}
fn list_trees(&self) -> Result<Vec<String>> {
Ok(self.column_families.read().unwrap().0.clone())
}
fn snapshot(&self, path: &PathBuf) -> Result<()> {
todo!("snapshots for RocksDB");
}
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(self.db.get_cf(&cf_handle, key)?)
}
fn len(&self, tree: usize) -> Result<usize> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(self.db.iterator_cf(&cf_handle, IteratorMode::Start).count())
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(self.db.put_cf(&cf_handle, key, value)?)
}
fn remove(&self, tree: usize, key: &[u8]) -> Result<()> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(self.db.delete_cf(&cf_handle, key)?)
}
fn clear(&self, tree: usize) -> Result<()> {
let column_families = self.column_families.write().unwrap(); // locking against open_tree
let tree_name = column_families.0.get(tree).ok_or(
Error("trying to clear a non-existing column family".into())
)?;
self.db.drop_cf(tree_name)?;
self.db.create_cf(tree_name, &Options::default())?;
Ok(())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(Box::new(
self.db.iterator_cf(&cf_handle, IteratorMode::Start)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()))
))
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(Box::new(
self.db.iterator_cf(&cf_handle, IteratorMode::End)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()))
))
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
let cf_handle = self.get_cf_handle(tree)?;
let start_mode = match low {
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Forward),
Bound::Unbounded => IteratorMode::Start,
};
let base_iterator = self.db.iterator_cf(&cf_handle, start_mode)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()));
let stop_value = match high {
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
Bound::Unbounded => return Ok(Box::new(base_iterator)),
};
Ok(Box::new(
base_iterator
.take_while(move |r| r.as_ref().map(|(k, _v)| *k < stop_value).unwrap_or(false))
))
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
let cf_handle = self.get_cf_handle(tree)?;
let start_mode = match high {
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Reverse),
Bound::Unbounded => IteratorMode::End,
};
let base_iterator = self.db.iterator_cf(&cf_handle, start_mode)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()));
let stop_value = match low {
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
Bound::Unbounded => return Ok(Box::new(base_iterator)),
};
Ok(Box::new(
base_iterator
.take_while(move |r| r.as_ref().map(|(k, _v)| *k >= stop_value).unwrap_or(false))
))
}
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
let txn = self.db.transaction();
let mut tx = RocksTx {
db: &self,
txn,
};
match f.try_on(&mut tx) {
TxFnResult::Ok(on_commit) => {
tx.txn.commit().map_err(Error::from).map_err(TxError::Db)?;
Ok(on_commit)
}
TxFnResult::Abort => {
tx.txn.rollback().map_err(Error::from).map_err(TxError::Db)?;
Err(TxError::Abort(()))
}
TxFnResult::DbErr => {
tx.txn.rollback().map_err(Error::from).map_err(TxError::Db)?;
Err(TxError::Db(Error(
"(this message will be discarded)".into(),
)))
}
}
}
}
pub struct RocksTx<'a> {
db: &'a RocksDb,
txn: Transaction<'a, TransactionDB<MultiThreaded>>,
}
impl<'a> ITx for RocksTx<'a> {
fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(self.txn.get_cf(&cf_handle, key)?)
}
fn len(&self, tree: usize) -> TxOpResult<usize> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(self.txn.iterator_cf(&cf_handle, IteratorMode::Start).count())
}
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<()> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(self.txn.put_cf(&cf_handle, key, value)?)
}
fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<()> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(self.txn.delete_cf(&cf_handle, key)?)
}
fn clear(&mut self, _tree: usize) -> TxOpResult<()> {
unimplemented!("transactional column family clear not supported in RocksDB")
}
fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(Box::new(
self.txn.iterator_cf(&cf_handle, IteratorMode::Start)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()))
))
}
fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(Box::new(
self.txn.iterator_cf(&cf_handle, IteratorMode::End)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()))
))
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
let cf_handle = self.db.get_cf_handle(tree)?;
let start_mode = match low {
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Forward),
Bound::Unbounded => IteratorMode::Start,
};
let base_iterator = self.txn.iterator_cf(&cf_handle, start_mode)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()));
let stop_value = match high {
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
Bound::Unbounded => return Ok(Box::new(base_iterator)),
};
Ok(Box::new(
base_iterator
.take_while(move |r| r.as_ref().map(|(k, _v)| *k < stop_value).unwrap_or(false))
))
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
let cf_handle = self.db.get_cf_handle(tree)?;
let start_mode = match high {
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Reverse),
Bound::Unbounded => IteratorMode::End,
};
let base_iterator = self.txn.iterator_cf(&cf_handle, start_mode)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()));
let stop_value = match low {
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
Bound::Unbounded => return Ok(Box::new(base_iterator)),
};
Ok(Box::new(
base_iterator
.take_while(move |r| r.as_ref().map(|(k, _v)| *k >= stop_value).unwrap_or(false))
))
}
}
impl From<rks::Error> for Error {
fn from(e: rks::Error) -> Error {
Error(format!("RocksDB: {}", e).into())
}
}
impl From<rks::Error> for TxOpError {
fn from(e: rks::Error) -> TxOpError {
TxOpError(e.into())
}
}
impl From<Error> for TxOpError {
fn from(e: Error) -> TxOpError {
TxOpError(e.into())
}
}

View file

@ -89,6 +89,7 @@ k2v = [ "garage_util/k2v", "garage_api/k2v" ]
# Database engines # Database engines
lmdb = [ "garage_model/lmdb" ] lmdb = [ "garage_model/lmdb" ]
sqlite = [ "garage_model/sqlite" ] sqlite = [ "garage_model/sqlite" ]
rocksdb = [ "garage_model/rocksdb" ]
# Automatic registration and discovery via Consul API # Automatic registration and discovery via Consul API
consul-discovery = [ "garage_rpc/consul-discovery" ] consul-discovery = [ "garage_rpc/consul-discovery" ]

View file

@ -47,3 +47,4 @@ default = [ "lmdb", "sqlite" ]
k2v = [ "garage_util/k2v" ] k2v = [ "garage_util/k2v" ]
lmdb = [ "garage_db/lmdb" ] lmdb = [ "garage_db/lmdb" ]
sqlite = [ "garage_db/sqlite" ] sqlite = [ "garage_db/sqlite" ]
rocksdb = [ "garage_db/rocksdb" ]

View file

@ -124,6 +124,9 @@ impl Garage {
db::Engine::Lmdb => { db::Engine::Lmdb => {
db_path.push("db.lmdb"); db_path.push("db.lmdb");
} }
db::Engine::RocksDb | db::Engine::RocksDbWithMetrics => {
db_path.push("db.rocksdb");
}
} }
let db_opt = db::OpenOpt { let db_opt = db::OpenOpt {
fsync: config.metadata_fsync, fsync: config.metadata_fsync,

View file

@ -70,7 +70,7 @@ pub fn register_bg_vars(
impl LifecycleWorker { impl LifecycleWorker {
pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self { pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self {
let today = today(garage.config.use_local_tz); let today = today();
let last_completed = persister.get_with(|x| { let last_completed = persister.get_with(|x| {
x.last_completed x.last_completed
.as_deref() .as_deref()
@ -205,9 +205,8 @@ impl Worker for LifecycleWorker {
async fn wait_for_work(&mut self) -> WorkerState { async fn wait_for_work(&mut self) -> WorkerState {
match &self.state { match &self.state {
State::Completed(d) => { State::Completed(d) => {
let use_local_tz = self.garage.config.use_local_tz;
let next_day = d.succ_opt().expect("no next day"); let next_day = d.succ_opt().expect("no next day");
let next_start = midnight_ts(next_day, use_local_tz); let next_start = midnight_ts(next_day);
loop { loop {
let now = now_msec(); let now = now_msec();
if now < next_start { if now < next_start {
@ -219,7 +218,7 @@ impl Worker for LifecycleWorker {
break; break;
} }
} }
self.state = State::start(std::cmp::max(next_day, today(use_local_tz))); self.state = State::start(std::cmp::max(next_day, today()));
} }
State::Running { .. } => (), State::Running { .. } => (),
} }
@ -386,16 +385,10 @@ fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter)
true true
} }
fn midnight_ts(date: NaiveDate, use_local_tz: bool) -> u64 { fn midnight_ts(date: NaiveDate) -> u64 {
let midnight = date.and_hms_opt(0, 0, 0).expect("midnight does not exist"); date.and_hms_opt(0, 0, 0)
if use_local_tz { .expect("midnight does not exist")
return midnight .timestamp_millis() as u64
.and_local_timezone(Local)
.single()
.expect("bad local midnight")
.timestamp_millis() as u64;
}
midnight.timestamp_millis() as u64
} }
fn next_date(ts: u64) -> NaiveDate { fn next_date(ts: u64) -> NaiveDate {
@ -406,9 +399,6 @@ fn next_date(ts: u64) -> NaiveDate {
.expect("no next day") .expect("no next day")
} }
fn today(use_local_tz: bool) -> NaiveDate { fn today() -> NaiveDate {
if use_local_tz {
return Local::now().naive_local().date();
}
Utc::now().naive_utc().date() Utc::now().naive_utc().date()
} }

View file

@ -807,16 +807,6 @@ impl NodeStatus {
fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &DataDirEnum) { fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &DataDirEnum) {
use nix::sys::statvfs::statvfs; use nix::sys::statvfs::statvfs;
// The HashMap used below requires a filesystem identifier from statfs (instead of statvfs) on FreeBSD, as
// FreeBSD's statvfs filesystem identifier is "not meaningful in this implementation" (man 3 statvfs).
#[cfg(target_os = "freebsd")]
let get_filesystem_id = |path: &Path| match nix::sys::statfs::statfs(path) {
Ok(fs) => Some(fs.filesystem_id()),
Err(_) => None,
};
let mount_avail = |path: &Path| match statvfs(path) { let mount_avail = |path: &Path| match statvfs(path) {
Ok(x) => { Ok(x) => {
let avail = x.blocks_available() as u64 * x.fragment_size() as u64; let avail = x.blocks_available() as u64 * x.fragment_size() as u64;
@ -827,7 +817,6 @@ impl NodeStatus {
}; };
self.meta_disk_avail = mount_avail(meta_dir).map(|(_, a, t)| (a, t)); self.meta_disk_avail = mount_avail(meta_dir).map(|(_, a, t)| (a, t));
self.data_disk_avail = match data_dir { self.data_disk_avail = match data_dir {
DataDirEnum::Single(dir) => mount_avail(dir).map(|(_, a, t)| (a, t)), DataDirEnum::Single(dir) => mount_avail(dir).map(|(_, a, t)| (a, t)),
DataDirEnum::Multiple(dirs) => (|| { DataDirEnum::Multiple(dirs) => (|| {
@ -838,25 +827,12 @@ impl NodeStatus {
if dir.capacity.is_none() { if dir.capacity.is_none() {
continue; continue;
} }
#[cfg(not(target_os = "freebsd"))]
match mount_avail(&dir.path) { match mount_avail(&dir.path) {
Some((fsid, avail, total)) => { Some((fsid, avail, total)) => {
mounts.insert(fsid, (avail, total)); mounts.insert(fsid, (avail, total));
} }
None => return None, None => return None,
} }
#[cfg(target_os = "freebsd")]
match get_filesystem_id(&dir.path) {
Some(fsid) => match mount_avail(&dir.path) {
Some((_, avail, total)) => {
mounts.insert(fsid, (avail, total));
}
None => return None,
},
None => return None,
}
} }
Some( Some(
mounts mounts

View file

@ -27,10 +27,6 @@ pub struct Config {
#[serde(default)] #[serde(default)]
pub disable_scrub: bool, pub disable_scrub: bool,
/// Use local timezone
#[serde(default)]
pub use_local_tz: bool,
/// Automatic snapshot interval for metadata /// Automatic snapshot interval for metadata
#[serde(default)] #[serde(default)]
pub metadata_auto_snapshot_interval: Option<String>, pub metadata_auto_snapshot_interval: Option<String>,