Compare commits

..

1 commit

Author SHA1 Message Date
8b3edd86e4
Implement a RocksDB backend for benchmarking 2024-11-22 10:19:54 +01:00
18 changed files with 1440 additions and 685 deletions

View file

@ -37,6 +37,8 @@ steps:
- GARAGE_TEST_INTEGRATION_DB_ENGINE=lmdb ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
- nix-shell --attr ci --run "killall -9 garage" || true
- GARAGE_TEST_INTEGRATION_DB_ENGINE=sqlite ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
- nix-shell --attr ci --run "killall -9 garage" || true
- GARAGE_TEST_INTEGRATION_DB_ENGINE=rocksdb ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
- rm result
- rm -rv tmp-garage-integration

View file

@ -9,11 +9,11 @@ depends_on:
steps:
- name: refresh-index
image: nixpkgs/nix:nixos-22.05
environment:
AWS_ACCESS_KEY_ID:
from_secret: garagehq_aws_access_key_id
AWS_SECRET_ACCESS_KEY:
from_secret: garagehq_aws_secret_access_key
secrets:
- source: garagehq_aws_access_key_id
target: AWS_ACCESS_KEY_ID
- source: garagehq_aws_secret_access_key
target: AWS_SECRET_ACCESS_KEY
commands:
- mkdir -p /etc/nix && cp nix/nix.conf /etc/nix/nix.conf
- nix-shell --attr ci --run "refresh_index"

View file

@ -48,10 +48,11 @@ steps:
image: nixpkgs/nix:nixos-22.05
environment:
TARGET: "${TARGET}"
AWS_ACCESS_KEY_ID:
from_secret: garagehq_aws_access_key_id
AWS_SECRET_ACCESS_KEY:
from_secret: garagehq_aws_secret_access_key
secrets:
- source: garagehq_aws_access_key_id
target: AWS_ACCESS_KEY_ID
- source: garagehq_aws_secret_access_key
target: AWS_SECRET_ACCESS_KEY
commands:
- nix-shell --attr ci --run "to_s3"

1432
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -61,6 +61,7 @@ md-5 = "0.10"
mktemp = "0.5"
nix = { version = "0.27", default-features = false, features = ["fs"] }
nom = "7.1"
num_cpus = "1.0"
parse_duration = "2.1"
pin-project = "1.0.12"
pnet_datalink = "0.34"
@ -85,6 +86,7 @@ heed = { version = "0.11", default-features = false, features = ["lmdb"] }
rusqlite = "0.31.0"
r2d2 = "0.8"
r2d2_sqlite = "0.24"
rocksdb = { version = "0.22", features = ["multi-threaded-cf"] }
async-compression = { version = "0.4", features = ["tokio", "zstd"] }
zstd = { version = "0.13", default-features = false }

View file

@ -16,7 +16,6 @@ data_dir = "/var/lib/garage/data"
metadata_fsync = true
data_fsync = false
disable_scrub = false
use_local_tz = false
metadata_auto_snapshot_interval = "6h"
db_engine = "lmdb"
@ -100,7 +99,6 @@ Top-level configuration options:
[`data_fsync`](#data_fsync),
[`db_engine`](#db_engine),
[`disable_scrub`](#disable_scrub),
[`use_local_tz`](#use_local_tz),
[`lmdb_map_size`](#lmdb_map_size),
[`metadata_auto_snapshot_interval`](#metadata_auto_snapshot_interval),
[`metadata_dir`](#metadata_dir),
@ -429,13 +427,6 @@ you should delete it from the data directory and then call `garage repair
blocks` on the node to ensure that it re-obtains a copy from another node on
the network.
#### `use_local_tz` {#use_local_tz}
By default, Garage runs the lifecycle worker every day at midnight in UTC. Set the
`use_local_tz` configuration value to `true` if you want Garage to run the
lifecycle worker at midnight in your local timezone. If you have multiple nodes,
you should also ensure that each node has the same timezone configuration.
#### `block_size` {#block_size}
Garage splits stored objects in consecutive chunks of size `block_size`

View file

@ -57,6 +57,22 @@
}
},
"nixpkgs": {
"locked": {
"lastModified": 1724395761,
"narHash": "sha256-zRkDV/nbrnp3Y8oCADf5ETl1sDrdmAW6/bBVJ8EbIdQ=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "ae815cee91b417be55d43781eb4b73ae1ecc396c",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1724681257,
"narHash": "sha256-EJRuc5Qp7yfXko5ZNeEMYAs4DzAvkCyALuJ/tGllhN4=",
@ -80,15 +96,12 @@
"cargo2nix",
"flake-utils"
],
"nixpkgs": "nixpkgs"
"nixpkgs": "nixpkgs_2"
}
},
"rust-overlay": {
"inputs": {
"nixpkgs": [
"cargo2nix",
"nixpkgs"
]
"nixpkgs": "nixpkgs"
},
"locked": {
"lastModified": 1724638882,
@ -101,7 +114,6 @@
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "19b70f147b9c67a759e35824b241f1ed92e46694",
"type": "github"
}
}

View file

@ -15,11 +15,14 @@ path = "lib.rs"
err-derive.workspace = true
hexdump.workspace = true
tracing.workspace = true
opentelemetry.workspace = true
heed = { workspace = true, optional = true }
rusqlite = { workspace = true, optional = true, features = ["backup"] }
r2d2 = { workspace = true, optional = true }
r2d2_sqlite = { workspace = true, optional = true }
rocksdb = { workspace = true, optional = true }
num_cpus = { workspace = true, optional = true }
[dev-dependencies]
mktemp.workspace = true
@ -29,3 +32,4 @@ default = [ "lmdb", "sqlite" ]
bundled-libs = [ "rusqlite?/bundled" ]
lmdb = [ "heed" ]
sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ]
rocksdb = [ "dep:rocksdb", "dep:num_cpus" ]

View file

@ -5,6 +5,10 @@ extern crate tracing;
pub mod lmdb_adapter;
#[cfg(feature = "sqlite")]
pub mod sqlite_adapter;
#[cfg(feature = "rocksdb")]
pub mod rocksdb_adapter;
#[cfg(feature = "rocksdb")]
pub mod metric_proxy;
pub mod open;

217
src/db/metric_proxy.rs Normal file
View file

@ -0,0 +1,217 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use crate::rocksdb_adapter::RocksDb;
use crate::{
Bound, Db, IDb, ITx, ITxFn, OnCommit, Result, TxFnResult, TxOpResult, TxResult, TxValueIter,
Value, ValueIter,
};
use opentelemetry::{
global,
metrics::{Unit, ValueRecorder},
KeyValue,
};
pub struct MetricDbProxy {
//@FIXME Replace with a template
db: RocksDb,
op: ValueRecorder<f64>,
}
impl MetricDbProxy {
pub fn init(db: RocksDb) -> Db {
let meter = global::meter("garage/db");
let s = Self {
db,
op: meter
.f64_value_recorder("db.op")
.with_description("Duration and amount of operations on the local metadata engine")
.with_unit(Unit::new("ms"))
.init(),
};
Db(Arc::new(s))
}
fn instrument<T>(
&self,
fx: impl FnOnce() -> T,
op: &'static str,
cat: &'static str,
tx: &'static str,
) -> T {
let metric_tags = [
KeyValue::new("op", op),
KeyValue::new("cat", cat),
KeyValue::new("tx", tx),
];
let request_start = Instant::now();
let res = fx();
let delay_nanos = Instant::now()
.saturating_duration_since(request_start)
.as_nanos();
let delay_millis: f64 = delay_nanos as f64 / 1_000_000f64;
self.op.record(delay_millis, &metric_tags);
res
}
}
impl IDb for MetricDbProxy {
fn engine(&self) -> String {
format!("Metric Proxy on {}", self.db.engine())
}
fn open_tree(&self, name: &str) -> Result<usize> {
self.instrument(|| self.db.open_tree(name), "open_tree", "control", "no")
}
fn list_trees(&self) -> Result<Vec<String>> {
self.instrument(|| self.db.list_trees(), "list_trees", "control", "no")
}
fn snapshot(&self, to: &PathBuf) -> Result<()> {
self.instrument(|| self.db.snapshot(to), "snapshot", "control", "no")
}
// ---
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
self.instrument(|| self.db.get(tree, key), "get", "data", "no")
}
fn len(&self, tree: usize) -> Result<usize> {
self.instrument(|| self.db.len(tree), "len", "data", "no")
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
self.instrument(|| self.db.insert(tree, key, value), "insert", "data", "no")
}
fn remove(&self, tree: usize, key: &[u8]) -> Result<()> {
self.instrument(|| self.db.remove(tree, key), "remove", "data", "no")
}
fn clear(&self, tree: usize) -> Result<()> {
self.instrument(|| self.db.clear(tree), "clear", "data", "no")
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
self.instrument(|| self.db.iter(tree), "iter", "data", "no")
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
self.instrument(|| self.db.iter_rev(tree), "iter_rev", "data", "no")
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
self.instrument(|| self.db.range(tree, low, high), "range", "data", "no")
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
self.instrument(
|| self.db.range_rev(tree, low, high),
"range_rev",
"data",
"no",
)
}
// ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
self.instrument(
|| self.db.transaction(&MetricITxFnProxy { f, metrics: self }),
"transaction",
"control",
"yes",
)
}
}
struct MetricITxFnProxy<'a> {
f: &'a dyn ITxFn,
metrics: &'a MetricDbProxy,
}
impl<'a> ITxFn for MetricITxFnProxy<'a> {
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
self.f.try_on(&mut MetricTxProxy {
tx,
metrics: self.metrics,
})
}
}
struct MetricTxProxy<'a> {
tx: &'a mut dyn ITx,
metrics: &'a MetricDbProxy,
}
impl<'a> ITx for MetricTxProxy<'a> {
fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
self.metrics
.instrument(|| self.tx.get(tree, key), "get", "data", "yes")
}
fn len(&self, tree: usize) -> TxOpResult<usize> {
self.metrics
.instrument(|| self.tx.len(tree), "len", "data", "yes")
}
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<()> {
self.metrics
.instrument(|| self.tx.insert(tree, key, value), "insert", "data", "yes")
}
fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<()> {
self.metrics
.instrument(|| self.tx.remove(tree, key), "remove", "data", "yes")
}
fn clear(&mut self, tree: usize) -> TxOpResult<()> {
self.metrics
.instrument(|| self.tx.clear(tree), "clear", "data", "yes")
}
fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
self.metrics
.instrument(|| self.tx.iter(tree), "iter", "data", "yes")
}
fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
self.metrics
.instrument(|| self.tx.iter_rev(tree), "iter_rev", "data", "yes")
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
self.metrics
.instrument(|| self.tx.range(tree, low, high), "range", "data", "yes")
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
self.metrics.instrument(
|| self.tx.range_rev(tree, low, high),
"range_rev",
"data",
"yes",
)
}
}

View file

@ -11,6 +11,8 @@ use crate::{Db, Error, Result};
pub enum Engine {
Lmdb,
Sqlite,
RocksDb,
RocksDbWithMetrics,
}
impl Engine {
@ -19,6 +21,8 @@ impl Engine {
match self {
Self::Lmdb => "lmdb",
Self::Sqlite => "sqlite",
Self::RocksDb => "rocksdb",
Self::RocksDbWithMetrics => "rocksdb-with-metrics",
}
}
}
@ -36,6 +40,8 @@ impl std::str::FromStr for Engine {
match text {
"lmdb" | "heed" => Ok(Self::Lmdb),
"sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite),
"rocksdb" => Ok(Self::RocksDb),
"rocksdb-with-metrics" => Ok(Self::RocksDbWithMetrics),
"sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())),
kind => Err(Error(
format!(
@ -114,6 +120,31 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> {
}
}
// ---- RocksDB ----
#[cfg(feature = "rocksdb")]
Engine::RocksDb => {
info!("Opening RocksDb database at: {}", path.display());
let mut options = rocksdb::Options::default();
options.increase_parallelism(num_cpus::get() as i32);
options.create_if_missing(true);
options.set_compression_type(rocksdb::DBCompressionType::Zstd);
let txn_options = rocksdb::TransactionDBOptions::default();
Ok(crate::rocksdb_adapter::RocksDb::init(options, txn_options, path))
}
// ---- RocksDB ----
#[cfg(feature = "rocksdb")]
Engine::RocksDbWithMetrics => {
info!("Opening RocksDb database at: {}", path.display());
let mut options = rocksdb::Options::default();
options.increase_parallelism(num_cpus::get() as i32);
options.create_if_missing(true);
options.set_compression_type(rocksdb::DBCompressionType::Zstd);
let txn_options = rocksdb::TransactionDBOptions::default();
let db = crate::rocksdb_adapter::RocksDb::new(options, txn_options, path);
Ok(crate::metric_proxy::MetricDbProxy::init(db))
}
// Pattern is unreachable when all supported DB engines are compiled into binary. The allow
// attribute is added so that we won't have to change this match in case stop building
// support for one or more engines by default.

322
src/db/rocksdb_adapter.rs Normal file
View file

@ -0,0 +1,322 @@
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::collections::HashMap;
use core::ops::Bound;
use rocksdb::{self as rks, BoundColumnFamily, Direction, IteratorMode, MultiThreaded, Options, Transaction, TransactionDB, TransactionDBOptions};
use crate::{
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
TxResult, TxValueIter, Value, ValueIter,
};
pub struct RocksDb {
db: TransactionDB<MultiThreaded>,
column_families: RwLock<(Vec<String>, HashMap<String, usize>)>,
}
impl RocksDb {
pub fn init(options: Options, txn_options: TransactionDBOptions, path: &PathBuf) -> Db {
Db(Arc::new(Self::new(options, txn_options, path)))
}
pub fn new(options: Options, txn_options: TransactionDBOptions, path: &PathBuf) -> RocksDb {
let db = TransactionDB::open(&options, &txn_options, path).unwrap();
let existing_cf = TransactionDB::<MultiThreaded>::list_cf(&Options::default(), path).unwrap();
let existing_cf_map = existing_cf.iter().enumerate().map(|(i, n)| (n.clone(), i)).collect();
Self {
db,
column_families: RwLock::new((existing_cf, existing_cf_map)),
}
}
fn get_cf_handle(&self, tree: usize) -> Result<Arc<BoundColumnFamily>> {
let column_families = self.column_families.read().unwrap();
let name = column_families.0.get(tree);
name.map(|n| self.db.cf_handle(n)).flatten().ok_or(
Error("trying to acquire a handle on a non-existing column family".into())
)
}
}
impl IDb for RocksDb {
fn engine(&self) -> String {
"rocksdb".into()
}
fn open_tree(&self, name: &str) -> Result<usize> {
let mut column_families = self.column_families.write().unwrap();
let open_tree = column_families.1.get(name);
if let Some(i) = open_tree {
Ok(*i)
} else {
self.db.create_cf(name, &Options::default())?;
column_families.0.push(name.to_string());
let i = column_families.0.len() - 1;
column_families.1.insert(name.to_string(), i);
Ok(i)
}
}
fn list_trees(&self) -> Result<Vec<String>> {
Ok(self.column_families.read().unwrap().0.clone())
}
fn snapshot(&self, path: &PathBuf) -> Result<()> {
todo!("snapshots for RocksDB");
}
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(self.db.get_cf(&cf_handle, key)?)
}
fn len(&self, tree: usize) -> Result<usize> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(self.db.iterator_cf(&cf_handle, IteratorMode::Start).count())
}
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(self.db.put_cf(&cf_handle, key, value)?)
}
fn remove(&self, tree: usize, key: &[u8]) -> Result<()> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(self.db.delete_cf(&cf_handle, key)?)
}
fn clear(&self, tree: usize) -> Result<()> {
let column_families = self.column_families.write().unwrap(); // locking against open_tree
let tree_name = column_families.0.get(tree).ok_or(
Error("trying to clear a non-existing column family".into())
)?;
self.db.drop_cf(tree_name)?;
self.db.create_cf(tree_name, &Options::default())?;
Ok(())
}
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(Box::new(
self.db.iterator_cf(&cf_handle, IteratorMode::Start)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()))
))
}
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
let cf_handle = self.get_cf_handle(tree)?;
Ok(Box::new(
self.db.iterator_cf(&cf_handle, IteratorMode::End)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()))
))
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
let cf_handle = self.get_cf_handle(tree)?;
let start_mode = match low {
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Forward),
Bound::Unbounded => IteratorMode::Start,
};
let base_iterator = self.db.iterator_cf(&cf_handle, start_mode)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()));
let stop_value = match high {
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
Bound::Unbounded => return Ok(Box::new(base_iterator)),
};
Ok(Box::new(
base_iterator
.take_while(move |r| r.as_ref().map(|(k, _v)| *k < stop_value).unwrap_or(false))
))
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> Result<ValueIter<'_>> {
let cf_handle = self.get_cf_handle(tree)?;
let start_mode = match high {
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Reverse),
Bound::Unbounded => IteratorMode::End,
};
let base_iterator = self.db.iterator_cf(&cf_handle, start_mode)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()));
let stop_value = match low {
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
Bound::Unbounded => return Ok(Box::new(base_iterator)),
};
Ok(Box::new(
base_iterator
.take_while(move |r| r.as_ref().map(|(k, _v)| *k >= stop_value).unwrap_or(false))
))
}
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
let txn = self.db.transaction();
let mut tx = RocksTx {
db: &self,
txn,
};
match f.try_on(&mut tx) {
TxFnResult::Ok(on_commit) => {
tx.txn.commit().map_err(Error::from).map_err(TxError::Db)?;
Ok(on_commit)
}
TxFnResult::Abort => {
tx.txn.rollback().map_err(Error::from).map_err(TxError::Db)?;
Err(TxError::Abort(()))
}
TxFnResult::DbErr => {
tx.txn.rollback().map_err(Error::from).map_err(TxError::Db)?;
Err(TxError::Db(Error(
"(this message will be discarded)".into(),
)))
}
}
}
}
pub struct RocksTx<'a> {
db: &'a RocksDb,
txn: Transaction<'a, TransactionDB<MultiThreaded>>,
}
impl<'a> ITx for RocksTx<'a> {
fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(self.txn.get_cf(&cf_handle, key)?)
}
fn len(&self, tree: usize) -> TxOpResult<usize> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(self.txn.iterator_cf(&cf_handle, IteratorMode::Start).count())
}
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<()> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(self.txn.put_cf(&cf_handle, key, value)?)
}
fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<()> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(self.txn.delete_cf(&cf_handle, key)?)
}
fn clear(&mut self, _tree: usize) -> TxOpResult<()> {
unimplemented!("transactional column family clear not supported in RocksDB")
}
fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(Box::new(
self.txn.iterator_cf(&cf_handle, IteratorMode::Start)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()))
))
}
fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
let cf_handle = self.db.get_cf_handle(tree)?;
Ok(Box::new(
self.txn.iterator_cf(&cf_handle, IteratorMode::End)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()))
))
}
fn range<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
let cf_handle = self.db.get_cf_handle(tree)?;
let start_mode = match low {
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Forward),
Bound::Unbounded => IteratorMode::Start,
};
let base_iterator = self.txn.iterator_cf(&cf_handle, start_mode)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()));
let stop_value = match high {
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
Bound::Unbounded => return Ok(Box::new(base_iterator)),
};
Ok(Box::new(
base_iterator
.take_while(move |r| r.as_ref().map(|(k, _v)| *k < stop_value).unwrap_or(false))
))
}
fn range_rev<'r>(
&self,
tree: usize,
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
let cf_handle = self.db.get_cf_handle(tree)?;
let start_mode = match high {
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Reverse),
Bound::Unbounded => IteratorMode::End,
};
let base_iterator = self.txn.iterator_cf(&cf_handle, start_mode)
.map(|r| r
.map(|(k, v)| (k.into_vec(), v.into_vec()))
.map_err(|e| e.into()));
let stop_value = match low {
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
Bound::Unbounded => return Ok(Box::new(base_iterator)),
};
Ok(Box::new(
base_iterator
.take_while(move |r| r.as_ref().map(|(k, _v)| *k >= stop_value).unwrap_or(false))
))
}
}
impl From<rks::Error> for Error {
fn from(e: rks::Error) -> Error {
Error(format!("RocksDB: {}", e).into())
}
}
impl From<rks::Error> for TxOpError {
fn from(e: rks::Error) -> TxOpError {
TxOpError(e.into())
}
}
impl From<Error> for TxOpError {
fn from(e: Error) -> TxOpError {
TxOpError(e.into())
}
}

View file

@ -89,6 +89,7 @@ k2v = [ "garage_util/k2v", "garage_api/k2v" ]
# Database engines
lmdb = [ "garage_model/lmdb" ]
sqlite = [ "garage_model/sqlite" ]
rocksdb = [ "garage_model/rocksdb" ]
# Automatic registration and discovery via Consul API
consul-discovery = [ "garage_rpc/consul-discovery" ]

View file

@ -47,3 +47,4 @@ default = [ "lmdb", "sqlite" ]
k2v = [ "garage_util/k2v" ]
lmdb = [ "garage_db/lmdb" ]
sqlite = [ "garage_db/sqlite" ]
rocksdb = [ "garage_db/rocksdb" ]

View file

@ -124,6 +124,9 @@ impl Garage {
db::Engine::Lmdb => {
db_path.push("db.lmdb");
}
db::Engine::RocksDb | db::Engine::RocksDbWithMetrics => {
db_path.push("db.rocksdb");
}
}
let db_opt = db::OpenOpt {
fsync: config.metadata_fsync,

View file

@ -70,7 +70,7 @@ pub fn register_bg_vars(
impl LifecycleWorker {
pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self {
let today = today(garage.config.use_local_tz);
let today = today();
let last_completed = persister.get_with(|x| {
x.last_completed
.as_deref()
@ -205,9 +205,8 @@ impl Worker for LifecycleWorker {
async fn wait_for_work(&mut self) -> WorkerState {
match &self.state {
State::Completed(d) => {
let use_local_tz = self.garage.config.use_local_tz;
let next_day = d.succ_opt().expect("no next day");
let next_start = midnight_ts(next_day, use_local_tz);
let next_start = midnight_ts(next_day);
loop {
let now = now_msec();
if now < next_start {
@ -219,7 +218,7 @@ impl Worker for LifecycleWorker {
break;
}
}
self.state = State::start(std::cmp::max(next_day, today(use_local_tz)));
self.state = State::start(std::cmp::max(next_day, today()));
}
State::Running { .. } => (),
}
@ -386,16 +385,10 @@ fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter)
true
}
fn midnight_ts(date: NaiveDate, use_local_tz: bool) -> u64 {
let midnight = date.and_hms_opt(0, 0, 0).expect("midnight does not exist");
if use_local_tz {
return midnight
.and_local_timezone(Local)
.single()
.expect("bad local midnight")
.timestamp_millis() as u64;
}
midnight.timestamp_millis() as u64
fn midnight_ts(date: NaiveDate) -> u64 {
date.and_hms_opt(0, 0, 0)
.expect("midnight does not exist")
.timestamp_millis() as u64
}
fn next_date(ts: u64) -> NaiveDate {
@ -406,9 +399,6 @@ fn next_date(ts: u64) -> NaiveDate {
.expect("no next day")
}
fn today(use_local_tz: bool) -> NaiveDate {
if use_local_tz {
return Local::now().naive_local().date();
}
fn today() -> NaiveDate {
Utc::now().naive_utc().date()
}

View file

@ -807,16 +807,6 @@ impl NodeStatus {
fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &DataDirEnum) {
use nix::sys::statvfs::statvfs;
// The HashMap used below requires a filesystem identifier from statfs (instead of statvfs) on FreeBSD, as
// FreeBSD's statvfs filesystem identifier is "not meaningful in this implementation" (man 3 statvfs).
#[cfg(target_os = "freebsd")]
let get_filesystem_id = |path: &Path| match nix::sys::statfs::statfs(path) {
Ok(fs) => Some(fs.filesystem_id()),
Err(_) => None,
};
let mount_avail = |path: &Path| match statvfs(path) {
Ok(x) => {
let avail = x.blocks_available() as u64 * x.fragment_size() as u64;
@ -827,7 +817,6 @@ impl NodeStatus {
};
self.meta_disk_avail = mount_avail(meta_dir).map(|(_, a, t)| (a, t));
self.data_disk_avail = match data_dir {
DataDirEnum::Single(dir) => mount_avail(dir).map(|(_, a, t)| (a, t)),
DataDirEnum::Multiple(dirs) => (|| {
@ -838,25 +827,12 @@ impl NodeStatus {
if dir.capacity.is_none() {
continue;
}
#[cfg(not(target_os = "freebsd"))]
match mount_avail(&dir.path) {
Some((fsid, avail, total)) => {
mounts.insert(fsid, (avail, total));
}
None => return None,
}
#[cfg(target_os = "freebsd")]
match get_filesystem_id(&dir.path) {
Some(fsid) => match mount_avail(&dir.path) {
Some((_, avail, total)) => {
mounts.insert(fsid, (avail, total));
}
None => return None,
},
None => return None,
}
}
Some(
mounts

View file

@ -27,10 +27,6 @@ pub struct Config {
#[serde(default)]
pub disable_scrub: bool,
/// Use local timezone
#[serde(default)]
pub use_local_tz: bool,
/// Automatic snapshot interval for metadata
#[serde(default)]
pub metadata_auto_snapshot_interval: Option<String>,