K2V PollRange, version 2 #471

Merged
lx merged 18 commits from k2v-watch-range-2 into main 2023-01-26 16:19:05 +00:00
17 changed files with 275 additions and 27 deletions
Showing only changes of commit dac254a6e7 - Show all commits

1
.envrc Normal file
View file

@ -0,0 +1 @@
use flake

1
.gitignore vendored
View file

@ -3,3 +3,4 @@
/pki /pki
**/*.rs.bk **/*.rs.bk
*.swp *.swp
/.direnv

1
Cargo.lock generated
View file

@ -1276,6 +1276,7 @@ dependencies = [
"http", "http",
"hyper", "hyper",
"lazy_static", "lazy_static",
"mktemp",
"netapp", "netapp",
"opentelemetry", "opentelemetry",
"rand 0.8.5", "rand 0.8.5",

View file

@ -32,7 +32,7 @@ args@{
ignoreLockHash, ignoreLockHash,
}: }:
let let
nixifiedLockHash = "8f036894ab81a528f76e97e904ff3e496a9b1500569312489d444f615fb781bf"; nixifiedLockHash = "bd3d90904731a6f067b8a535cfe9b84c9727d17eb1cdd499336b6a627dc7c426";
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
lockHashIgnored = if ignoreLockHash lockHashIgnored = if ignoreLockHash
@ -1820,6 +1820,9 @@ in
tracing = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.32" { inherit profileName; }).out; tracing = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.32" { inherit profileName; }).out;
xxhash_rust = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".xxhash-rust."0.8.4" { inherit profileName; }).out; xxhash_rust = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".xxhash-rust."0.8.4" { inherit profileName; }).out;
}; };
devDependencies = {
mktemp = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".mktemp."0.4.1" { inherit profileName; }).out;
};
}); });
"unknown".garage_web."0.8.1" = overridableMkRustCrate (profileName: rec { "unknown".garage_web."0.8.1" = overridableMkRustCrate (profileName: rec {

View file

@ -39,7 +39,7 @@ Now you can enter our nix-shell, all the required packages will be downloaded bu
nix-shell nix-shell
``` ```
You can use the traditionnal Rust development workflow: You can use the traditional Rust development workflow:
```bash ```bash
cargo build # compile the project cargo build # compile the project

View file

@ -96,7 +96,7 @@ Performance characteristics of the different DB engines are as follows:
- Sled: the default database engine, which tends to produce - Sled: the default database engine, which tends to produce
large data files and also has performance issues, especially when the metadata folder large data files and also has performance issues, especially when the metadata folder
is on a traditionnal HDD and not on SSD. is on a traditional HDD and not on SSD.
- LMDB: the recommended alternative on 64-bit systems, - LMDB: the recommended alternative on 64-bit systems,
much more space-efficiant and slightly faster. Note that the data format of LMDB is not portable much more space-efficiant and slightly faster. Note that the data format of LMDB is not portable
between architectures, so for instance the Garage database of an x86-64 between architectures, so for instance the Garage database of an x86-64
@ -267,6 +267,10 @@ This key should be specified here in the form of a 32-byte hex-encoded
random string. Such a string can be generated with a command random string. Such a string can be generated with a command
such as `openssl rand -hex 32`. such as `openssl rand -hex 32`.
### `rpc_secret_file`
Like `rpc_secret` above, just that this is the path to a file that Garage will try to read the secret from.
### `rpc_bind_addr` ### `rpc_bind_addr`
The address and port on which to bind for inter-cluster communcations The address and port on which to bind for inter-cluster communcations

View file

@ -55,6 +55,21 @@
"type": "github" "type": "github"
} }
}, },
"flake-utils_2": {
"locked": {
"lastModified": 1667395993,
"narHash": "sha256-nuEHfE/LcWyuSWnS8t12N1wc105Qtau+/OdUAjtQ0rA=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "5aed5285a952e0b949eb3ba02c12fa4fcfef535f",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1665657542, "lastModified": 1665657542,
@ -74,6 +89,7 @@
"root": { "root": {
"inputs": { "inputs": {
"cargo2nix": "cargo2nix", "cargo2nix": "cargo2nix",
"flake-utils": "flake-utils_2",
"nixpkgs": "nixpkgs" "nixpkgs": "nixpkgs"
} }
}, },

View file

@ -7,22 +7,30 @@
url = "github:Alexis211/cargo2nix/a7a61179b66054904ef6a195d8da736eaaa06c36"; url = "github:Alexis211/cargo2nix/a7a61179b66054904ef6a195d8da736eaaa06c36";
inputs.nixpkgs.follows = "nixpkgs"; inputs.nixpkgs.follows = "nixpkgs";
}; };
inputs.flake-utils.url = "github:numtide/flake-utils";
outputs = { self, nixpkgs, cargo2nix }: let outputs = { self, nixpkgs, cargo2nix, flake-utils }:
let
git_version = self.lastModifiedDate; git_version = self.lastModifiedDate;
compile = import ./nix/compile.nix; compile = import ./nix/compile.nix;
forAllSystems = nixpkgs.lib.genAttrs nixpkgs.lib.systems.flakeExposed; in flake-utils.lib.eachDefaultSystem (system:
in let pkgs = nixpkgs.legacyPackages.${system};
{ in {
packages = forAllSystems (system: { packages = {
default = (compile { default = (compile {
inherit system git_version; inherit system git_version;
pkgsSrc = nixpkgs; pkgsSrc = nixpkgs;
cargo2nixOverlay = cargo2nix.overlays.default; cargo2nixOverlay = cargo2nix.overlays.default;
release = true; release = true;
}).workspace.garage { }).workspace.garage { compileMode = "build"; };
compileMode = "build";
}; };
devShell = ((compile {
inherit system git_version;
pkgsSrc = nixpkgs;
cargo2nixOverlay = cargo2nix.overlays.default;
release = false;
}).workspaceShell {
packages = [ pkgs.rustfmt cargo2nix.packages.${system}.default ];
});
}); });
};
} }

View file

@ -129,8 +129,12 @@ impl BlockManager {
.netapp .netapp
.endpoint("garage_block/manager.rs/Rpc".to_string()); .endpoint("garage_block/manager.rs/Rpc".to_string());
let metrics = let metrics = BlockManagerMetrics::new(
BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); compression_level,
rc.rc.clone(),
resync.queue.clone(),
resync.errors.clone(),
);
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");

View file

@ -5,6 +5,7 @@ use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics /// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics { pub struct BlockManagerMetrics {
pub(crate) _compression_level: ValueObserver<u64>,
pub(crate) _rc_size: ValueObserver<u64>, pub(crate) _rc_size: ValueObserver<u64>,
pub(crate) _resync_queue_len: ValueObserver<u64>, pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>, pub(crate) _resync_errored_blocks: ValueObserver<u64>,
@ -25,9 +26,23 @@ pub struct BlockManagerMetrics {
} }
impl BlockManagerMetrics { impl BlockManagerMetrics {
pub fn new(rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree) -> Self { pub fn new(
compression_level: Option<i32>,
rc_tree: db::Tree,
resync_queue: CountedTree,
resync_errors: CountedTree,
) -> Self {
let meter = global::meter("garage_model/block"); let meter = global::meter("garage_model/block");
Self { Self {
_compression_level: meter
.u64_value_observer("block.compression_level", move |observer| {
match compression_level {
Some(v) => observer.observe(v as u64, &[]),
None => observer.observe(0 as u64, &[]),
}
})
.with_description("Garage compression level for node")
.init(),
_rc_size: meter _rc_size: meter
.u64_value_observer("block.rc_size", move |observer| { .u64_value_observer("block.rc_size", move |observer| {
if let Ok(Some(v)) = rc_tree.fast_len() { if let Ok(Some(v)) = rc_tree.fast_len() {

View file

@ -173,7 +173,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
let net_key_hex_str = opt let net_key_hex_str = opt
.rpc_secret .rpc_secret
.as_ref() .as_ref()
.or_else(|| config.as_ref().map(|c| &c.rpc_secret)) .or_else(|| config.as_ref().and_then(|c| c.rpc_secret.as_ref()))
.ok_or("No RPC secret provided")?; .ok_or("No RPC secret provided")?;
let network_key = NetworkKey::from_slice( let network_key = NetworkKey::from_slice(
&hex::decode(net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..], &hex::decode(net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..],

View file

@ -159,7 +159,7 @@ impl Garage {
}; };
let network_key = NetworkKey::from_slice( let network_key = NetworkKey::from_slice(
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], &hex::decode(&config.rpc_secret.as_ref().unwrap()).expect("Invalid RPC secret key")[..],
) )
.expect("Invalid RPC secret key"); .expect("Invalid RPC secret key");

View file

@ -17,3 +17,5 @@ mod metrics;
pub mod rpc_helper; pub mod rpc_helper;
pub use rpc_helper::*; pub use rpc_helper::*;
pub mod system_metrics;

View file

@ -38,6 +38,9 @@ use crate::replication_mode::*;
use crate::ring::*; use crate::ring::*;
use crate::rpc_helper::*; use crate::rpc_helper::*;
#[cfg(feature = "metrics")]
use crate::system_metrics::*;
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
@ -103,6 +106,8 @@ pub struct System {
consul_discovery: Option<ConsulDiscovery>, consul_discovery: Option<ConsulDiscovery>,
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>, kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
#[cfg(feature = "metrics")]
metrics: SystemMetrics,
replication_mode: ReplicationMode, replication_mode: ReplicationMode,
replication_factor: usize, replication_factor: usize,
@ -275,6 +280,9 @@ impl System {
cluster_layout_staging_hash: cluster_layout.staging_hash, cluster_layout_staging_hash: cluster_layout.staging_hash,
}; };
#[cfg(feature = "metrics")]
let metrics = SystemMetrics::new(replication_factor);
let ring = Ring::new(cluster_layout, replication_factor); let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring)); let (update_ring, ring) = watch::channel(Arc::new(ring));
@ -365,6 +373,8 @@ impl System {
consul_discovery, consul_discovery,
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(), kubernetes_discovery: config.kubernetes_discovery.clone(),
#[cfg(feature = "metrics")]
metrics,
ring, ring,
update_ring: Mutex::new(update_ring), update_ring: Mutex::new(update_ring),

33
src/rpc/system_metrics.rs Normal file
View file

@ -0,0 +1,33 @@
use opentelemetry::{global, metrics::*, KeyValue};
/// TableMetrics reference all counter used for metrics
pub struct SystemMetrics {
pub(crate) _garage_build_info: ValueObserver<u64>,
pub(crate) _replication_factor: ValueObserver<u64>,
}
impl SystemMetrics {
pub fn new(replication_factor: usize) -> Self {
let meter = global::meter("garage_system");
Self {
_garage_build_info: meter
.u64_value_observer("garage_build_info", move |observer| {
observer.observe(
1,
&[KeyValue::new(
"version",
garage_util::version::garage_version(),
)],
)
})
.with_description("Garage build info")
.init(),
_replication_factor: meter
.u64_value_observer("garage_replication_factor", move |observer| {
observer.observe(replication_factor as u64, &[])
})
.with_description("Garage replication factor setting")
.init(),
}
}
}

View file

@ -47,6 +47,8 @@ hyper = "0.14"
opentelemetry = { version = "0.17", features = [ "rt-tokio", "metrics", "trace" ] } opentelemetry = { version = "0.17", features = [ "rt-tokio", "metrics", "trace" ] }
[dev-dependencies]
mktemp = "0.4"
[features] [features]
k2v = [] k2v = []

View file

@ -34,7 +34,11 @@ pub struct Config {
pub compression_level: Option<i32>, pub compression_level: Option<i32>,
/// RPC secret key: 32 bytes hex encoded /// RPC secret key: 32 bytes hex encoded
pub rpc_secret: String, /// Note: When using `read_config` this should never be `None`
pub rpc_secret: Option<String>,
/// Optional file where RPC secret key is read from
pub rpc_secret_file: Option<String>,
/// Address to bind for RPC /// Address to bind for RPC
pub rpc_bind_addr: SocketAddr, pub rpc_bind_addr: SocketAddr,
@ -177,7 +181,31 @@ pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut config = String::new(); let mut config = String::new();
file.read_to_string(&mut config)?; file.read_to_string(&mut config)?;
Ok(toml::from_str(&config)?) let mut parsed_config: Config = toml::from_str(&config)?;
match (&parsed_config.rpc_secret, &parsed_config.rpc_secret_file) {
(Some(_), None) => {
// no-op
}
(Some(_), Some(_)) => {
return Err("only one of `rpc_secret` and `rpc_secret_file` can be set".into())
}
(None, Some(rpc_secret_file_path_string)) => {
let mut rpc_secret_file = std::fs::OpenOptions::new()
.read(true)
.open(rpc_secret_file_path_string)?;
let mut rpc_secret_from_file = String::new();
rpc_secret_file.read_to_string(&mut rpc_secret_from_file)?;
// trim_end: allows for use case such as `echo "$(openssl rand -hex 32)" > somefile`.
// also editors sometimes add a trailing newline
parsed_config.rpc_secret = Some(String::from(rpc_secret_from_file.trim_end()));
}
(None, None) => {
return Err("either `rpc_secret` or `rpc_secret_file` needs to be set".into())
}
};
Ok(parsed_config)
} }
fn default_compression() -> Option<i32> { fn default_compression() -> Option<i32> {
@ -233,3 +261,123 @@ where
deserializer.deserialize_any(OptionVisitor) deserializer.deserialize_any(OptionVisitor)
} }
#[cfg(test)]
mod tests {
use crate::error::Error;
use std::fs::File;
use std::io::Write;
#[test]
fn test_rpc_secret_is_required() -> Result<(), Error> {
let path1 = mktemp::Temp::new_file()?;
let mut file1 = File::create(path1.as_path())?;
writeln!(
file1,
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
replication_mode = "3"
rpc_bind_addr = "[::]:3901"
[s3_api]
s3_region = "garage"
api_bind_addr = "[::]:3900"
"#
)?;
assert_eq!(
"either `rpc_secret` or `rpc_secret_file` needs to be set",
super::read_config(path1.to_path_buf())
.unwrap_err()
.to_string()
);
drop(path1);
drop(file1);
let path2 = mktemp::Temp::new_file()?;
let mut file2 = File::create(path2.as_path())?;
writeln!(
file2,
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
replication_mode = "3"
rpc_bind_addr = "[::]:3901"
rpc_secret = "foo"
[s3_api]
s3_region = "garage"
api_bind_addr = "[::]:3900"
"#
)?;
let config = super::read_config(path2.to_path_buf())?;
assert_eq!("foo", config.rpc_secret.unwrap());
drop(path2);
drop(file2);
Ok(())
}
#[test]
fn test_rpc_secret_file_works() -> Result<(), Error> {
let path_secret = mktemp::Temp::new_file()?;
let mut file_secret = File::create(path_secret.as_path())?;
writeln!(file_secret, "foo")?;
drop(file_secret);
let path_config = mktemp::Temp::new_file()?;
let mut file_config = File::create(path_config.as_path())?;
let path_secret_path = path_secret.as_path().display();
writeln!(
file_config,
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
replication_mode = "3"
rpc_bind_addr = "[::]:3901"
rpc_secret_file = "{path_secret_path}"
[s3_api]
s3_region = "garage"
api_bind_addr = "[::]:3900"
"#
)?;
let config = super::read_config(path_config.to_path_buf())?;
assert_eq!("foo", config.rpc_secret.unwrap());
drop(path_config);
drop(path_secret);
drop(file_config);
Ok(())
}
#[test]
fn test_rcp_secret_and_rpc_secret_file_cannot_be_set_both() -> Result<(), Error> {
let path_config = mktemp::Temp::new_file()?;
let mut file_config = File::create(path_config.as_path())?;
writeln!(
file_config,
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
replication_mode = "3"
rpc_bind_addr = "[::]:3901"
rpc_secret= "dummy"
rpc_secret_file = "dummy"
[s3_api]
s3_region = "garage"
api_bind_addr = "[::]:3900"
"#
)?;
assert_eq!(
"only one of `rpc_secret` and `rpc_secret_file` can be set",
super::read_config(path_config.to_path_buf())
.unwrap_err()
.to_string()
);
drop(path_config);
drop(file_config);
Ok(())
}
}