Compare commits
1 commit
main
...
feat/rocks
Author | SHA1 | Date | |
---|---|---|---|
8b3edd86e4 |
18 changed files with 1440 additions and 685 deletions
|
@ -37,6 +37,8 @@ steps:
|
||||||
- GARAGE_TEST_INTEGRATION_DB_ENGINE=lmdb ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
|
- GARAGE_TEST_INTEGRATION_DB_ENGINE=lmdb ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
|
||||||
- nix-shell --attr ci --run "killall -9 garage" || true
|
- nix-shell --attr ci --run "killall -9 garage" || true
|
||||||
- GARAGE_TEST_INTEGRATION_DB_ENGINE=sqlite ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
|
- GARAGE_TEST_INTEGRATION_DB_ENGINE=sqlite ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
|
||||||
|
- nix-shell --attr ci --run "killall -9 garage" || true
|
||||||
|
- GARAGE_TEST_INTEGRATION_DB_ENGINE=rocksdb ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
|
||||||
- rm result
|
- rm result
|
||||||
- rm -rv tmp-garage-integration
|
- rm -rv tmp-garage-integration
|
||||||
|
|
||||||
|
|
|
@ -9,11 +9,11 @@ depends_on:
|
||||||
steps:
|
steps:
|
||||||
- name: refresh-index
|
- name: refresh-index
|
||||||
image: nixpkgs/nix:nixos-22.05
|
image: nixpkgs/nix:nixos-22.05
|
||||||
environment:
|
secrets:
|
||||||
AWS_ACCESS_KEY_ID:
|
- source: garagehq_aws_access_key_id
|
||||||
from_secret: garagehq_aws_access_key_id
|
target: AWS_ACCESS_KEY_ID
|
||||||
AWS_SECRET_ACCESS_KEY:
|
- source: garagehq_aws_secret_access_key
|
||||||
from_secret: garagehq_aws_secret_access_key
|
target: AWS_SECRET_ACCESS_KEY
|
||||||
commands:
|
commands:
|
||||||
- mkdir -p /etc/nix && cp nix/nix.conf /etc/nix/nix.conf
|
- mkdir -p /etc/nix && cp nix/nix.conf /etc/nix/nix.conf
|
||||||
- nix-shell --attr ci --run "refresh_index"
|
- nix-shell --attr ci --run "refresh_index"
|
||||||
|
|
|
@ -48,10 +48,11 @@ steps:
|
||||||
image: nixpkgs/nix:nixos-22.05
|
image: nixpkgs/nix:nixos-22.05
|
||||||
environment:
|
environment:
|
||||||
TARGET: "${TARGET}"
|
TARGET: "${TARGET}"
|
||||||
AWS_ACCESS_KEY_ID:
|
secrets:
|
||||||
from_secret: garagehq_aws_access_key_id
|
- source: garagehq_aws_access_key_id
|
||||||
AWS_SECRET_ACCESS_KEY:
|
target: AWS_ACCESS_KEY_ID
|
||||||
from_secret: garagehq_aws_secret_access_key
|
- source: garagehq_aws_secret_access_key
|
||||||
|
target: AWS_SECRET_ACCESS_KEY
|
||||||
commands:
|
commands:
|
||||||
- nix-shell --attr ci --run "to_s3"
|
- nix-shell --attr ci --run "to_s3"
|
||||||
|
|
||||||
|
|
1432
Cargo.lock
generated
1432
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -61,6 +61,7 @@ md-5 = "0.10"
|
||||||
mktemp = "0.5"
|
mktemp = "0.5"
|
||||||
nix = { version = "0.27", default-features = false, features = ["fs"] }
|
nix = { version = "0.27", default-features = false, features = ["fs"] }
|
||||||
nom = "7.1"
|
nom = "7.1"
|
||||||
|
num_cpus = "1.0"
|
||||||
parse_duration = "2.1"
|
parse_duration = "2.1"
|
||||||
pin-project = "1.0.12"
|
pin-project = "1.0.12"
|
||||||
pnet_datalink = "0.34"
|
pnet_datalink = "0.34"
|
||||||
|
@ -85,6 +86,7 @@ heed = { version = "0.11", default-features = false, features = ["lmdb"] }
|
||||||
rusqlite = "0.31.0"
|
rusqlite = "0.31.0"
|
||||||
r2d2 = "0.8"
|
r2d2 = "0.8"
|
||||||
r2d2_sqlite = "0.24"
|
r2d2_sqlite = "0.24"
|
||||||
|
rocksdb = { version = "0.22", features = ["multi-threaded-cf"] }
|
||||||
|
|
||||||
async-compression = { version = "0.4", features = ["tokio", "zstd"] }
|
async-compression = { version = "0.4", features = ["tokio", "zstd"] }
|
||||||
zstd = { version = "0.13", default-features = false }
|
zstd = { version = "0.13", default-features = false }
|
||||||
|
|
|
@ -16,7 +16,6 @@ data_dir = "/var/lib/garage/data"
|
||||||
metadata_fsync = true
|
metadata_fsync = true
|
||||||
data_fsync = false
|
data_fsync = false
|
||||||
disable_scrub = false
|
disable_scrub = false
|
||||||
use_local_tz = false
|
|
||||||
metadata_auto_snapshot_interval = "6h"
|
metadata_auto_snapshot_interval = "6h"
|
||||||
|
|
||||||
db_engine = "lmdb"
|
db_engine = "lmdb"
|
||||||
|
@ -100,7 +99,6 @@ Top-level configuration options:
|
||||||
[`data_fsync`](#data_fsync),
|
[`data_fsync`](#data_fsync),
|
||||||
[`db_engine`](#db_engine),
|
[`db_engine`](#db_engine),
|
||||||
[`disable_scrub`](#disable_scrub),
|
[`disable_scrub`](#disable_scrub),
|
||||||
[`use_local_tz`](#use_local_tz),
|
|
||||||
[`lmdb_map_size`](#lmdb_map_size),
|
[`lmdb_map_size`](#lmdb_map_size),
|
||||||
[`metadata_auto_snapshot_interval`](#metadata_auto_snapshot_interval),
|
[`metadata_auto_snapshot_interval`](#metadata_auto_snapshot_interval),
|
||||||
[`metadata_dir`](#metadata_dir),
|
[`metadata_dir`](#metadata_dir),
|
||||||
|
@ -429,13 +427,6 @@ you should delete it from the data directory and then call `garage repair
|
||||||
blocks` on the node to ensure that it re-obtains a copy from another node on
|
blocks` on the node to ensure that it re-obtains a copy from another node on
|
||||||
the network.
|
the network.
|
||||||
|
|
||||||
#### `use_local_tz` {#use_local_tz}
|
|
||||||
|
|
||||||
By default, Garage runs the lifecycle worker every day at midnight in UTC. Set the
|
|
||||||
`use_local_tz` configuration value to `true` if you want Garage to run the
|
|
||||||
lifecycle worker at midnight in your local timezone. If you have multiple nodes,
|
|
||||||
you should also ensure that each node has the same timezone configuration.
|
|
||||||
|
|
||||||
#### `block_size` {#block_size}
|
#### `block_size` {#block_size}
|
||||||
|
|
||||||
Garage splits stored objects in consecutive chunks of size `block_size`
|
Garage splits stored objects in consecutive chunks of size `block_size`
|
||||||
|
|
24
flake.lock
24
flake.lock
|
@ -57,6 +57,22 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"nixpkgs": {
|
"nixpkgs": {
|
||||||
|
"locked": {
|
||||||
|
"lastModified": 1724395761,
|
||||||
|
"narHash": "sha256-zRkDV/nbrnp3Y8oCADf5ETl1sDrdmAW6/bBVJ8EbIdQ=",
|
||||||
|
"owner": "NixOS",
|
||||||
|
"repo": "nixpkgs",
|
||||||
|
"rev": "ae815cee91b417be55d43781eb4b73ae1ecc396c",
|
||||||
|
"type": "github"
|
||||||
|
},
|
||||||
|
"original": {
|
||||||
|
"owner": "NixOS",
|
||||||
|
"ref": "nixpkgs-unstable",
|
||||||
|
"repo": "nixpkgs",
|
||||||
|
"type": "github"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nixpkgs_2": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1724681257,
|
"lastModified": 1724681257,
|
||||||
"narHash": "sha256-EJRuc5Qp7yfXko5ZNeEMYAs4DzAvkCyALuJ/tGllhN4=",
|
"narHash": "sha256-EJRuc5Qp7yfXko5ZNeEMYAs4DzAvkCyALuJ/tGllhN4=",
|
||||||
|
@ -80,15 +96,12 @@
|
||||||
"cargo2nix",
|
"cargo2nix",
|
||||||
"flake-utils"
|
"flake-utils"
|
||||||
],
|
],
|
||||||
"nixpkgs": "nixpkgs"
|
"nixpkgs": "nixpkgs_2"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"rust-overlay": {
|
"rust-overlay": {
|
||||||
"inputs": {
|
"inputs": {
|
||||||
"nixpkgs": [
|
"nixpkgs": "nixpkgs"
|
||||||
"cargo2nix",
|
|
||||||
"nixpkgs"
|
|
||||||
]
|
|
||||||
},
|
},
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1724638882,
|
"lastModified": 1724638882,
|
||||||
|
@ -101,7 +114,6 @@
|
||||||
"original": {
|
"original": {
|
||||||
"owner": "oxalica",
|
"owner": "oxalica",
|
||||||
"repo": "rust-overlay",
|
"repo": "rust-overlay",
|
||||||
"rev": "19b70f147b9c67a759e35824b241f1ed92e46694",
|
|
||||||
"type": "github"
|
"type": "github"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,11 +15,14 @@ path = "lib.rs"
|
||||||
err-derive.workspace = true
|
err-derive.workspace = true
|
||||||
hexdump.workspace = true
|
hexdump.workspace = true
|
||||||
tracing.workspace = true
|
tracing.workspace = true
|
||||||
|
opentelemetry.workspace = true
|
||||||
|
|
||||||
heed = { workspace = true, optional = true }
|
heed = { workspace = true, optional = true }
|
||||||
rusqlite = { workspace = true, optional = true, features = ["backup"] }
|
rusqlite = { workspace = true, optional = true, features = ["backup"] }
|
||||||
r2d2 = { workspace = true, optional = true }
|
r2d2 = { workspace = true, optional = true }
|
||||||
r2d2_sqlite = { workspace = true, optional = true }
|
r2d2_sqlite = { workspace = true, optional = true }
|
||||||
|
rocksdb = { workspace = true, optional = true }
|
||||||
|
num_cpus = { workspace = true, optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
mktemp.workspace = true
|
mktemp.workspace = true
|
||||||
|
@ -29,3 +32,4 @@ default = [ "lmdb", "sqlite" ]
|
||||||
bundled-libs = [ "rusqlite?/bundled" ]
|
bundled-libs = [ "rusqlite?/bundled" ]
|
||||||
lmdb = [ "heed" ]
|
lmdb = [ "heed" ]
|
||||||
sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ]
|
sqlite = [ "rusqlite", "r2d2", "r2d2_sqlite" ]
|
||||||
|
rocksdb = [ "dep:rocksdb", "dep:num_cpus" ]
|
||||||
|
|
|
@ -5,6 +5,10 @@ extern crate tracing;
|
||||||
pub mod lmdb_adapter;
|
pub mod lmdb_adapter;
|
||||||
#[cfg(feature = "sqlite")]
|
#[cfg(feature = "sqlite")]
|
||||||
pub mod sqlite_adapter;
|
pub mod sqlite_adapter;
|
||||||
|
#[cfg(feature = "rocksdb")]
|
||||||
|
pub mod rocksdb_adapter;
|
||||||
|
#[cfg(feature = "rocksdb")]
|
||||||
|
pub mod metric_proxy;
|
||||||
|
|
||||||
pub mod open;
|
pub mod open;
|
||||||
|
|
||||||
|
|
217
src/db/metric_proxy.rs
Normal file
217
src/db/metric_proxy.rs
Normal file
|
@ -0,0 +1,217 @@
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
use crate::rocksdb_adapter::RocksDb;
|
||||||
|
use crate::{
|
||||||
|
Bound, Db, IDb, ITx, ITxFn, OnCommit, Result, TxFnResult, TxOpResult, TxResult, TxValueIter,
|
||||||
|
Value, ValueIter,
|
||||||
|
};
|
||||||
|
use opentelemetry::{
|
||||||
|
global,
|
||||||
|
metrics::{Unit, ValueRecorder},
|
||||||
|
KeyValue,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct MetricDbProxy {
|
||||||
|
//@FIXME Replace with a template
|
||||||
|
db: RocksDb,
|
||||||
|
op: ValueRecorder<f64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MetricDbProxy {
|
||||||
|
pub fn init(db: RocksDb) -> Db {
|
||||||
|
let meter = global::meter("garage/db");
|
||||||
|
let s = Self {
|
||||||
|
db,
|
||||||
|
op: meter
|
||||||
|
.f64_value_recorder("db.op")
|
||||||
|
.with_description("Duration and amount of operations on the local metadata engine")
|
||||||
|
.with_unit(Unit::new("ms"))
|
||||||
|
.init(),
|
||||||
|
};
|
||||||
|
Db(Arc::new(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn instrument<T>(
|
||||||
|
&self,
|
||||||
|
fx: impl FnOnce() -> T,
|
||||||
|
op: &'static str,
|
||||||
|
cat: &'static str,
|
||||||
|
tx: &'static str,
|
||||||
|
) -> T {
|
||||||
|
let metric_tags = [
|
||||||
|
KeyValue::new("op", op),
|
||||||
|
KeyValue::new("cat", cat),
|
||||||
|
KeyValue::new("tx", tx),
|
||||||
|
];
|
||||||
|
|
||||||
|
let request_start = Instant::now();
|
||||||
|
let res = fx();
|
||||||
|
let delay_nanos = Instant::now()
|
||||||
|
.saturating_duration_since(request_start)
|
||||||
|
.as_nanos();
|
||||||
|
let delay_millis: f64 = delay_nanos as f64 / 1_000_000f64;
|
||||||
|
self.op.record(delay_millis, &metric_tags);
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IDb for MetricDbProxy {
|
||||||
|
fn engine(&self) -> String {
|
||||||
|
format!("Metric Proxy on {}", self.db.engine())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_tree(&self, name: &str) -> Result<usize> {
|
||||||
|
self.instrument(|| self.db.open_tree(name), "open_tree", "control", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn list_trees(&self) -> Result<Vec<String>> {
|
||||||
|
self.instrument(|| self.db.list_trees(), "list_trees", "control", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot(&self, to: &PathBuf) -> Result<()> {
|
||||||
|
self.instrument(|| self.db.snapshot(to), "snapshot", "control", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---
|
||||||
|
|
||||||
|
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
|
||||||
|
self.instrument(|| self.db.get(tree, key), "get", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn len(&self, tree: usize) -> Result<usize> {
|
||||||
|
self.instrument(|| self.db.len(tree), "len", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
|
||||||
|
self.instrument(|| self.db.insert(tree, key, value), "insert", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&self, tree: usize, key: &[u8]) -> Result<()> {
|
||||||
|
self.instrument(|| self.db.remove(tree, key), "remove", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear(&self, tree: usize) -> Result<()> {
|
||||||
|
self.instrument(|| self.db.clear(tree), "clear", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
|
||||||
|
self.instrument(|| self.db.iter(tree), "iter", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
|
||||||
|
self.instrument(|| self.db.iter_rev(tree), "iter_rev", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range<'r>(
|
||||||
|
&self,
|
||||||
|
tree: usize,
|
||||||
|
low: Bound<&'r [u8]>,
|
||||||
|
high: Bound<&'r [u8]>,
|
||||||
|
) -> Result<ValueIter<'_>> {
|
||||||
|
self.instrument(|| self.db.range(tree, low, high), "range", "data", "no")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range_rev<'r>(
|
||||||
|
&self,
|
||||||
|
tree: usize,
|
||||||
|
low: Bound<&'r [u8]>,
|
||||||
|
high: Bound<&'r [u8]>,
|
||||||
|
) -> Result<ValueIter<'_>> {
|
||||||
|
self.instrument(
|
||||||
|
|| self.db.range_rev(tree, low, high),
|
||||||
|
"range_rev",
|
||||||
|
"data",
|
||||||
|
"no",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
|
||||||
|
self.instrument(
|
||||||
|
|| self.db.transaction(&MetricITxFnProxy { f, metrics: self }),
|
||||||
|
"transaction",
|
||||||
|
"control",
|
||||||
|
"yes",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MetricITxFnProxy<'a> {
|
||||||
|
f: &'a dyn ITxFn,
|
||||||
|
metrics: &'a MetricDbProxy,
|
||||||
|
}
|
||||||
|
impl<'a> ITxFn for MetricITxFnProxy<'a> {
|
||||||
|
fn try_on(&self, tx: &mut dyn ITx) -> TxFnResult {
|
||||||
|
self.f.try_on(&mut MetricTxProxy {
|
||||||
|
tx,
|
||||||
|
metrics: self.metrics,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MetricTxProxy<'a> {
|
||||||
|
tx: &'a mut dyn ITx,
|
||||||
|
metrics: &'a MetricDbProxy,
|
||||||
|
}
|
||||||
|
impl<'a> ITx for MetricTxProxy<'a> {
|
||||||
|
fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.get(tree, key), "get", "data", "yes")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn len(&self, tree: usize) -> TxOpResult<usize> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.len(tree), "len", "data", "yes")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<()> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.insert(tree, key, value), "insert", "data", "yes")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<()> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.remove(tree, key), "remove", "data", "yes")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear(&mut self, tree: usize) -> TxOpResult<()> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.clear(tree), "clear", "data", "yes")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.iter(tree), "iter", "data", "yes")
|
||||||
|
}
|
||||||
|
fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.iter_rev(tree), "iter_rev", "data", "yes")
|
||||||
|
}
|
||||||
|
fn range<'r>(
|
||||||
|
&self,
|
||||||
|
tree: usize,
|
||||||
|
low: Bound<&'r [u8]>,
|
||||||
|
high: Bound<&'r [u8]>,
|
||||||
|
) -> TxOpResult<TxValueIter<'_>> {
|
||||||
|
self.metrics
|
||||||
|
.instrument(|| self.tx.range(tree, low, high), "range", "data", "yes")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range_rev<'r>(
|
||||||
|
&self,
|
||||||
|
tree: usize,
|
||||||
|
low: Bound<&'r [u8]>,
|
||||||
|
high: Bound<&'r [u8]>,
|
||||||
|
) -> TxOpResult<TxValueIter<'_>> {
|
||||||
|
self.metrics.instrument(
|
||||||
|
|| self.tx.range_rev(tree, low, high),
|
||||||
|
"range_rev",
|
||||||
|
"data",
|
||||||
|
"yes",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
|
@ -11,6 +11,8 @@ use crate::{Db, Error, Result};
|
||||||
pub enum Engine {
|
pub enum Engine {
|
||||||
Lmdb,
|
Lmdb,
|
||||||
Sqlite,
|
Sqlite,
|
||||||
|
RocksDb,
|
||||||
|
RocksDbWithMetrics,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Engine {
|
impl Engine {
|
||||||
|
@ -19,6 +21,8 @@ impl Engine {
|
||||||
match self {
|
match self {
|
||||||
Self::Lmdb => "lmdb",
|
Self::Lmdb => "lmdb",
|
||||||
Self::Sqlite => "sqlite",
|
Self::Sqlite => "sqlite",
|
||||||
|
Self::RocksDb => "rocksdb",
|
||||||
|
Self::RocksDbWithMetrics => "rocksdb-with-metrics",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,6 +40,8 @@ impl std::str::FromStr for Engine {
|
||||||
match text {
|
match text {
|
||||||
"lmdb" | "heed" => Ok(Self::Lmdb),
|
"lmdb" | "heed" => Ok(Self::Lmdb),
|
||||||
"sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite),
|
"sqlite" | "sqlite3" | "rusqlite" => Ok(Self::Sqlite),
|
||||||
|
"rocksdb" => Ok(Self::RocksDb),
|
||||||
|
"rocksdb-with-metrics" => Ok(Self::RocksDbWithMetrics),
|
||||||
"sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())),
|
"sled" => Err(Error("Sled is no longer supported as a database engine. Converting your old metadata db can be done using an older Garage binary (e.g. v0.9.4).".into())),
|
||||||
kind => Err(Error(
|
kind => Err(Error(
|
||||||
format!(
|
format!(
|
||||||
|
@ -114,6 +120,31 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- RocksDB ----
|
||||||
|
#[cfg(feature = "rocksdb")]
|
||||||
|
Engine::RocksDb => {
|
||||||
|
info!("Opening RocksDb database at: {}", path.display());
|
||||||
|
let mut options = rocksdb::Options::default();
|
||||||
|
options.increase_parallelism(num_cpus::get() as i32);
|
||||||
|
options.create_if_missing(true);
|
||||||
|
options.set_compression_type(rocksdb::DBCompressionType::Zstd);
|
||||||
|
let txn_options = rocksdb::TransactionDBOptions::default();
|
||||||
|
Ok(crate::rocksdb_adapter::RocksDb::init(options, txn_options, path))
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- RocksDB ----
|
||||||
|
#[cfg(feature = "rocksdb")]
|
||||||
|
Engine::RocksDbWithMetrics => {
|
||||||
|
info!("Opening RocksDb database at: {}", path.display());
|
||||||
|
let mut options = rocksdb::Options::default();
|
||||||
|
options.increase_parallelism(num_cpus::get() as i32);
|
||||||
|
options.create_if_missing(true);
|
||||||
|
options.set_compression_type(rocksdb::DBCompressionType::Zstd);
|
||||||
|
let txn_options = rocksdb::TransactionDBOptions::default();
|
||||||
|
let db = crate::rocksdb_adapter::RocksDb::new(options, txn_options, path);
|
||||||
|
Ok(crate::metric_proxy::MetricDbProxy::init(db))
|
||||||
|
}
|
||||||
|
|
||||||
// Pattern is unreachable when all supported DB engines are compiled into binary. The allow
|
// Pattern is unreachable when all supported DB engines are compiled into binary. The allow
|
||||||
// attribute is added so that we won't have to change this match in case stop building
|
// attribute is added so that we won't have to change this match in case stop building
|
||||||
// support for one or more engines by default.
|
// support for one or more engines by default.
|
||||||
|
|
322
src/db/rocksdb_adapter.rs
Normal file
322
src/db/rocksdb_adapter.rs
Normal file
|
@ -0,0 +1,322 @@
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use core::ops::Bound;
|
||||||
|
|
||||||
|
use rocksdb::{self as rks, BoundColumnFamily, Direction, IteratorMode, MultiThreaded, Options, Transaction, TransactionDB, TransactionDBOptions};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult,
|
||||||
|
TxResult, TxValueIter, Value, ValueIter,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct RocksDb {
|
||||||
|
db: TransactionDB<MultiThreaded>,
|
||||||
|
column_families: RwLock<(Vec<String>, HashMap<String, usize>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RocksDb {
|
||||||
|
pub fn init(options: Options, txn_options: TransactionDBOptions, path: &PathBuf) -> Db {
|
||||||
|
Db(Arc::new(Self::new(options, txn_options, path)))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new(options: Options, txn_options: TransactionDBOptions, path: &PathBuf) -> RocksDb {
|
||||||
|
let db = TransactionDB::open(&options, &txn_options, path).unwrap();
|
||||||
|
let existing_cf = TransactionDB::<MultiThreaded>::list_cf(&Options::default(), path).unwrap();
|
||||||
|
let existing_cf_map = existing_cf.iter().enumerate().map(|(i, n)| (n.clone(), i)).collect();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
db,
|
||||||
|
column_families: RwLock::new((existing_cf, existing_cf_map)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_cf_handle(&self, tree: usize) -> Result<Arc<BoundColumnFamily>> {
|
||||||
|
let column_families = self.column_families.read().unwrap();
|
||||||
|
let name = column_families.0.get(tree);
|
||||||
|
name.map(|n| self.db.cf_handle(n)).flatten().ok_or(
|
||||||
|
Error("trying to acquire a handle on a non-existing column family".into())
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IDb for RocksDb {
|
||||||
|
fn engine(&self) -> String {
|
||||||
|
"rocksdb".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_tree(&self, name: &str) -> Result<usize> {
|
||||||
|
let mut column_families = self.column_families.write().unwrap();
|
||||||
|
let open_tree = column_families.1.get(name);
|
||||||
|
|
||||||
|
if let Some(i) = open_tree {
|
||||||
|
Ok(*i)
|
||||||
|
} else {
|
||||||
|
self.db.create_cf(name, &Options::default())?;
|
||||||
|
column_families.0.push(name.to_string());
|
||||||
|
let i = column_families.0.len() - 1;
|
||||||
|
column_families.1.insert(name.to_string(), i);
|
||||||
|
Ok(i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn list_trees(&self) -> Result<Vec<String>> {
|
||||||
|
Ok(self.column_families.read().unwrap().0.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn snapshot(&self, path: &PathBuf) -> Result<()> {
|
||||||
|
todo!("snapshots for RocksDB");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> {
|
||||||
|
let cf_handle = self.get_cf_handle(tree)?;
|
||||||
|
Ok(self.db.get_cf(&cf_handle, key)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn len(&self, tree: usize) -> Result<usize> {
|
||||||
|
let cf_handle = self.get_cf_handle(tree)?;
|
||||||
|
Ok(self.db.iterator_cf(&cf_handle, IteratorMode::Start).count())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
|
||||||
|
let cf_handle = self.get_cf_handle(tree)?;
|
||||||
|
Ok(self.db.put_cf(&cf_handle, key, value)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&self, tree: usize, key: &[u8]) -> Result<()> {
|
||||||
|
let cf_handle = self.get_cf_handle(tree)?;
|
||||||
|
Ok(self.db.delete_cf(&cf_handle, key)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear(&self, tree: usize) -> Result<()> {
|
||||||
|
let column_families = self.column_families.write().unwrap(); // locking against open_tree
|
||||||
|
let tree_name = column_families.0.get(tree).ok_or(
|
||||||
|
Error("trying to clear a non-existing column family".into())
|
||||||
|
)?;
|
||||||
|
self.db.drop_cf(tree_name)?;
|
||||||
|
self.db.create_cf(tree_name, &Options::default())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
|
||||||
|
let cf_handle = self.get_cf_handle(tree)?;
|
||||||
|
Ok(Box::new(
|
||||||
|
self.db.iterator_cf(&cf_handle, IteratorMode::Start)
|
||||||
|
.map(|r| r
|
||||||
|
.map(|(k, v)| (k.into_vec(), v.into_vec()))
|
||||||
|
.map_err(|e| e.into()))
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
|
||||||
|
let cf_handle = self.get_cf_handle(tree)?;
|
||||||
|
Ok(Box::new(
|
||||||
|
self.db.iterator_cf(&cf_handle, IteratorMode::End)
|
||||||
|
.map(|r| r
|
||||||
|
.map(|(k, v)| (k.into_vec(), v.into_vec()))
|
||||||
|
.map_err(|e| e.into()))
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range<'r>(
|
||||||
|
&self,
|
||||||
|
tree: usize,
|
||||||
|
low: Bound<&'r [u8]>,
|
||||||
|
high: Bound<&'r [u8]>,
|
||||||
|
) -> Result<ValueIter<'_>> {
|
||||||
|
let cf_handle = self.get_cf_handle(tree)?;
|
||||||
|
let start_mode = match low {
|
||||||
|
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Forward),
|
||||||
|
Bound::Unbounded => IteratorMode::Start,
|
||||||
|
};
|
||||||
|
let base_iterator = self.db.iterator_cf(&cf_handle, start_mode)
|
||||||
|
.map(|r| r
|
||||||
|
.map(|(k, v)| (k.into_vec(), v.into_vec()))
|
||||||
|
.map_err(|e| e.into()));
|
||||||
|
|
||||||
|
let stop_value = match high {
|
||||||
|
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
|
||||||
|
Bound::Unbounded => return Ok(Box::new(base_iterator)),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Box::new(
|
||||||
|
base_iterator
|
||||||
|
.take_while(move |r| r.as_ref().map(|(k, _v)| *k < stop_value).unwrap_or(false))
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range_rev<'r>(
|
||||||
|
&self,
|
||||||
|
tree: usize,
|
||||||
|
low: Bound<&'r [u8]>,
|
||||||
|
high: Bound<&'r [u8]>,
|
||||||
|
) -> Result<ValueIter<'_>> {
|
||||||
|
let cf_handle = self.get_cf_handle(tree)?;
|
||||||
|
let start_mode = match high {
|
||||||
|
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Reverse),
|
||||||
|
Bound::Unbounded => IteratorMode::End,
|
||||||
|
};
|
||||||
|
let base_iterator = self.db.iterator_cf(&cf_handle, start_mode)
|
||||||
|
.map(|r| r
|
||||||
|
.map(|(k, v)| (k.into_vec(), v.into_vec()))
|
||||||
|
.map_err(|e| e.into()));
|
||||||
|
|
||||||
|
let stop_value = match low {
|
||||||
|
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
|
||||||
|
Bound::Unbounded => return Ok(Box::new(base_iterator)),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Box::new(
|
||||||
|
base_iterator
|
||||||
|
.take_while(move |r| r.as_ref().map(|(k, _v)| *k >= stop_value).unwrap_or(false))
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn transaction(&self, f: &dyn ITxFn) -> TxResult<OnCommit, ()> {
|
||||||
|
let txn = self.db.transaction();
|
||||||
|
let mut tx = RocksTx {
|
||||||
|
db: &self,
|
||||||
|
txn,
|
||||||
|
};
|
||||||
|
|
||||||
|
match f.try_on(&mut tx) {
|
||||||
|
TxFnResult::Ok(on_commit) => {
|
||||||
|
tx.txn.commit().map_err(Error::from).map_err(TxError::Db)?;
|
||||||
|
Ok(on_commit)
|
||||||
|
}
|
||||||
|
TxFnResult::Abort => {
|
||||||
|
tx.txn.rollback().map_err(Error::from).map_err(TxError::Db)?;
|
||||||
|
Err(TxError::Abort(()))
|
||||||
|
}
|
||||||
|
TxFnResult::DbErr => {
|
||||||
|
tx.txn.rollback().map_err(Error::from).map_err(TxError::Db)?;
|
||||||
|
Err(TxError::Db(Error(
|
||||||
|
"(this message will be discarded)".into(),
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct RocksTx<'a> {
|
||||||
|
db: &'a RocksDb,
|
||||||
|
txn: Transaction<'a, TransactionDB<MultiThreaded>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> ITx for RocksTx<'a> {
|
||||||
|
fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> {
|
||||||
|
let cf_handle = self.db.get_cf_handle(tree)?;
|
||||||
|
Ok(self.txn.get_cf(&cf_handle, key)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn len(&self, tree: usize) -> TxOpResult<usize> {
|
||||||
|
let cf_handle = self.db.get_cf_handle(tree)?;
|
||||||
|
Ok(self.txn.iterator_cf(&cf_handle, IteratorMode::Start).count())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<()> {
|
||||||
|
let cf_handle = self.db.get_cf_handle(tree)?;
|
||||||
|
Ok(self.txn.put_cf(&cf_handle, key, value)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<()> {
|
||||||
|
let cf_handle = self.db.get_cf_handle(tree)?;
|
||||||
|
Ok(self.txn.delete_cf(&cf_handle, key)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear(&mut self, _tree: usize) -> TxOpResult<()> {
|
||||||
|
unimplemented!("transactional column family clear not supported in RocksDB")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
|
||||||
|
let cf_handle = self.db.get_cf_handle(tree)?;
|
||||||
|
Ok(Box::new(
|
||||||
|
self.txn.iterator_cf(&cf_handle, IteratorMode::Start)
|
||||||
|
.map(|r| r
|
||||||
|
.map(|(k, v)| (k.into_vec(), v.into_vec()))
|
||||||
|
.map_err(|e| e.into()))
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
|
||||||
|
let cf_handle = self.db.get_cf_handle(tree)?;
|
||||||
|
Ok(Box::new(
|
||||||
|
self.txn.iterator_cf(&cf_handle, IteratorMode::End)
|
||||||
|
.map(|r| r
|
||||||
|
.map(|(k, v)| (k.into_vec(), v.into_vec()))
|
||||||
|
.map_err(|e| e.into()))
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range<'r>(
|
||||||
|
&self,
|
||||||
|
tree: usize,
|
||||||
|
low: Bound<&'r [u8]>,
|
||||||
|
high: Bound<&'r [u8]>,
|
||||||
|
) -> TxOpResult<TxValueIter<'_>> {
|
||||||
|
let cf_handle = self.db.get_cf_handle(tree)?;
|
||||||
|
let start_mode = match low {
|
||||||
|
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Forward),
|
||||||
|
Bound::Unbounded => IteratorMode::Start,
|
||||||
|
};
|
||||||
|
let base_iterator = self.txn.iterator_cf(&cf_handle, start_mode)
|
||||||
|
.map(|r| r
|
||||||
|
.map(|(k, v)| (k.into_vec(), v.into_vec()))
|
||||||
|
.map_err(|e| e.into()));
|
||||||
|
|
||||||
|
let stop_value = match high {
|
||||||
|
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
|
||||||
|
Bound::Unbounded => return Ok(Box::new(base_iterator)),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Box::new(
|
||||||
|
base_iterator
|
||||||
|
.take_while(move |r| r.as_ref().map(|(k, _v)| *k < stop_value).unwrap_or(false))
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn range_rev<'r>(
|
||||||
|
&self,
|
||||||
|
tree: usize,
|
||||||
|
low: Bound<&'r [u8]>,
|
||||||
|
high: Bound<&'r [u8]>,
|
||||||
|
) -> TxOpResult<TxValueIter<'_>> {
|
||||||
|
let cf_handle = self.db.get_cf_handle(tree)?;
|
||||||
|
let start_mode = match high {
|
||||||
|
Bound::Included(i) | Bound::Excluded(i) => IteratorMode::From(i, Direction::Reverse),
|
||||||
|
Bound::Unbounded => IteratorMode::End,
|
||||||
|
};
|
||||||
|
let base_iterator = self.txn.iterator_cf(&cf_handle, start_mode)
|
||||||
|
.map(|r| r
|
||||||
|
.map(|(k, v)| (k.into_vec(), v.into_vec()))
|
||||||
|
.map_err(|e| e.into()));
|
||||||
|
|
||||||
|
let stop_value = match low {
|
||||||
|
Bound::Included(i) | Bound::Excluded(i) => i.to_vec(),
|
||||||
|
Bound::Unbounded => return Ok(Box::new(base_iterator)),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Box::new(
|
||||||
|
base_iterator
|
||||||
|
.take_while(move |r| r.as_ref().map(|(k, _v)| *k >= stop_value).unwrap_or(false))
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<rks::Error> for Error {
|
||||||
|
fn from(e: rks::Error) -> Error {
|
||||||
|
Error(format!("RocksDB: {}", e).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<rks::Error> for TxOpError {
|
||||||
|
fn from(e: rks::Error) -> TxOpError {
|
||||||
|
TxOpError(e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Error> for TxOpError {
|
||||||
|
fn from(e: Error) -> TxOpError {
|
||||||
|
TxOpError(e.into())
|
||||||
|
}
|
||||||
|
}
|
|
@ -89,6 +89,7 @@ k2v = [ "garage_util/k2v", "garage_api/k2v" ]
|
||||||
# Database engines
|
# Database engines
|
||||||
lmdb = [ "garage_model/lmdb" ]
|
lmdb = [ "garage_model/lmdb" ]
|
||||||
sqlite = [ "garage_model/sqlite" ]
|
sqlite = [ "garage_model/sqlite" ]
|
||||||
|
rocksdb = [ "garage_model/rocksdb" ]
|
||||||
|
|
||||||
# Automatic registration and discovery via Consul API
|
# Automatic registration and discovery via Consul API
|
||||||
consul-discovery = [ "garage_rpc/consul-discovery" ]
|
consul-discovery = [ "garage_rpc/consul-discovery" ]
|
||||||
|
|
|
@ -47,3 +47,4 @@ default = [ "lmdb", "sqlite" ]
|
||||||
k2v = [ "garage_util/k2v" ]
|
k2v = [ "garage_util/k2v" ]
|
||||||
lmdb = [ "garage_db/lmdb" ]
|
lmdb = [ "garage_db/lmdb" ]
|
||||||
sqlite = [ "garage_db/sqlite" ]
|
sqlite = [ "garage_db/sqlite" ]
|
||||||
|
rocksdb = [ "garage_db/rocksdb" ]
|
||||||
|
|
|
@ -124,6 +124,9 @@ impl Garage {
|
||||||
db::Engine::Lmdb => {
|
db::Engine::Lmdb => {
|
||||||
db_path.push("db.lmdb");
|
db_path.push("db.lmdb");
|
||||||
}
|
}
|
||||||
|
db::Engine::RocksDb | db::Engine::RocksDbWithMetrics => {
|
||||||
|
db_path.push("db.rocksdb");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let db_opt = db::OpenOpt {
|
let db_opt = db::OpenOpt {
|
||||||
fsync: config.metadata_fsync,
|
fsync: config.metadata_fsync,
|
||||||
|
|
|
@ -70,7 +70,7 @@ pub fn register_bg_vars(
|
||||||
|
|
||||||
impl LifecycleWorker {
|
impl LifecycleWorker {
|
||||||
pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self {
|
pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self {
|
||||||
let today = today(garage.config.use_local_tz);
|
let today = today();
|
||||||
let last_completed = persister.get_with(|x| {
|
let last_completed = persister.get_with(|x| {
|
||||||
x.last_completed
|
x.last_completed
|
||||||
.as_deref()
|
.as_deref()
|
||||||
|
@ -205,9 +205,8 @@ impl Worker for LifecycleWorker {
|
||||||
async fn wait_for_work(&mut self) -> WorkerState {
|
async fn wait_for_work(&mut self) -> WorkerState {
|
||||||
match &self.state {
|
match &self.state {
|
||||||
State::Completed(d) => {
|
State::Completed(d) => {
|
||||||
let use_local_tz = self.garage.config.use_local_tz;
|
|
||||||
let next_day = d.succ_opt().expect("no next day");
|
let next_day = d.succ_opt().expect("no next day");
|
||||||
let next_start = midnight_ts(next_day, use_local_tz);
|
let next_start = midnight_ts(next_day);
|
||||||
loop {
|
loop {
|
||||||
let now = now_msec();
|
let now = now_msec();
|
||||||
if now < next_start {
|
if now < next_start {
|
||||||
|
@ -219,7 +218,7 @@ impl Worker for LifecycleWorker {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.state = State::start(std::cmp::max(next_day, today(use_local_tz)));
|
self.state = State::start(std::cmp::max(next_day, today()));
|
||||||
}
|
}
|
||||||
State::Running { .. } => (),
|
State::Running { .. } => (),
|
||||||
}
|
}
|
||||||
|
@ -386,16 +385,10 @@ fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter)
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
fn midnight_ts(date: NaiveDate, use_local_tz: bool) -> u64 {
|
fn midnight_ts(date: NaiveDate) -> u64 {
|
||||||
let midnight = date.and_hms_opt(0, 0, 0).expect("midnight does not exist");
|
date.and_hms_opt(0, 0, 0)
|
||||||
if use_local_tz {
|
.expect("midnight does not exist")
|
||||||
return midnight
|
.timestamp_millis() as u64
|
||||||
.and_local_timezone(Local)
|
|
||||||
.single()
|
|
||||||
.expect("bad local midnight")
|
|
||||||
.timestamp_millis() as u64;
|
|
||||||
}
|
|
||||||
midnight.timestamp_millis() as u64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_date(ts: u64) -> NaiveDate {
|
fn next_date(ts: u64) -> NaiveDate {
|
||||||
|
@ -406,9 +399,6 @@ fn next_date(ts: u64) -> NaiveDate {
|
||||||
.expect("no next day")
|
.expect("no next day")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn today(use_local_tz: bool) -> NaiveDate {
|
fn today() -> NaiveDate {
|
||||||
if use_local_tz {
|
|
||||||
return Local::now().naive_local().date();
|
|
||||||
}
|
|
||||||
Utc::now().naive_utc().date()
|
Utc::now().naive_utc().date()
|
||||||
}
|
}
|
||||||
|
|
|
@ -807,16 +807,6 @@ impl NodeStatus {
|
||||||
|
|
||||||
fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &DataDirEnum) {
|
fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &DataDirEnum) {
|
||||||
use nix::sys::statvfs::statvfs;
|
use nix::sys::statvfs::statvfs;
|
||||||
|
|
||||||
// The HashMap used below requires a filesystem identifier from statfs (instead of statvfs) on FreeBSD, as
|
|
||||||
// FreeBSD's statvfs filesystem identifier is "not meaningful in this implementation" (man 3 statvfs).
|
|
||||||
|
|
||||||
#[cfg(target_os = "freebsd")]
|
|
||||||
let get_filesystem_id = |path: &Path| match nix::sys::statfs::statfs(path) {
|
|
||||||
Ok(fs) => Some(fs.filesystem_id()),
|
|
||||||
Err(_) => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mount_avail = |path: &Path| match statvfs(path) {
|
let mount_avail = |path: &Path| match statvfs(path) {
|
||||||
Ok(x) => {
|
Ok(x) => {
|
||||||
let avail = x.blocks_available() as u64 * x.fragment_size() as u64;
|
let avail = x.blocks_available() as u64 * x.fragment_size() as u64;
|
||||||
|
@ -827,7 +817,6 @@ impl NodeStatus {
|
||||||
};
|
};
|
||||||
|
|
||||||
self.meta_disk_avail = mount_avail(meta_dir).map(|(_, a, t)| (a, t));
|
self.meta_disk_avail = mount_avail(meta_dir).map(|(_, a, t)| (a, t));
|
||||||
|
|
||||||
self.data_disk_avail = match data_dir {
|
self.data_disk_avail = match data_dir {
|
||||||
DataDirEnum::Single(dir) => mount_avail(dir).map(|(_, a, t)| (a, t)),
|
DataDirEnum::Single(dir) => mount_avail(dir).map(|(_, a, t)| (a, t)),
|
||||||
DataDirEnum::Multiple(dirs) => (|| {
|
DataDirEnum::Multiple(dirs) => (|| {
|
||||||
|
@ -838,25 +827,12 @@ impl NodeStatus {
|
||||||
if dir.capacity.is_none() {
|
if dir.capacity.is_none() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(target_os = "freebsd"))]
|
|
||||||
match mount_avail(&dir.path) {
|
match mount_avail(&dir.path) {
|
||||||
Some((fsid, avail, total)) => {
|
Some((fsid, avail, total)) => {
|
||||||
mounts.insert(fsid, (avail, total));
|
mounts.insert(fsid, (avail, total));
|
||||||
}
|
}
|
||||||
None => return None,
|
None => return None,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(target_os = "freebsd")]
|
|
||||||
match get_filesystem_id(&dir.path) {
|
|
||||||
Some(fsid) => match mount_avail(&dir.path) {
|
|
||||||
Some((_, avail, total)) => {
|
|
||||||
mounts.insert(fsid, (avail, total));
|
|
||||||
}
|
|
||||||
None => return None,
|
|
||||||
},
|
|
||||||
None => return None,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Some(
|
Some(
|
||||||
mounts
|
mounts
|
||||||
|
|
|
@ -27,10 +27,6 @@ pub struct Config {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub disable_scrub: bool,
|
pub disable_scrub: bool,
|
||||||
|
|
||||||
/// Use local timezone
|
|
||||||
#[serde(default)]
|
|
||||||
pub use_local_tz: bool,
|
|
||||||
|
|
||||||
/// Automatic snapshot interval for metadata
|
/// Automatic snapshot interval for metadata
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub metadata_auto_snapshot_interval: Option<String>,
|
pub metadata_auto_snapshot_interval: Option<String>,
|
||||||
|
|
Loading…
Reference in a new issue