Report available disk space in garage stats
#487
8 changed files with 220 additions and 35 deletions
15
Cargo.lock
generated
15
Cargo.lock
generated
|
@ -1230,6 +1230,7 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_bytes",
|
"serde_bytes",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
"systemstat",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
@ -3567,6 +3568,20 @@ dependencies = [
|
||||||
"unicode-xid",
|
"unicode-xid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "systemstat"
|
||||||
|
version = "0.2.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a24aec24a9312c83999a28e3ef9db7e2afd5c64bf47725b758cdc1cafd5b0bd2"
|
||||||
|
dependencies = [
|
||||||
|
"bytesize",
|
||||||
|
"lazy_static",
|
||||||
|
"libc",
|
||||||
|
"nom",
|
||||||
|
"time 0.3.9",
|
||||||
|
"winapi",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tempfile"
|
name = "tempfile"
|
||||||
version = "3.3.0"
|
version = "3.3.0"
|
||||||
|
|
19
Cargo.nix
19
Cargo.nix
|
@ -32,7 +32,7 @@ args@{
|
||||||
ignoreLockHash,
|
ignoreLockHash,
|
||||||
}:
|
}:
|
||||||
let
|
let
|
||||||
nixifiedLockHash = "8461dcfb984a8d042fecb5745d5da17912135dbf2a8ef7e6c3ae8e64c03d9744";
|
nixifiedLockHash = "e59ef222aaaada125e2a5fccbd215b545740b2abd21ce381c42a7a6e3a7e672a";
|
||||||
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
||||||
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
||||||
lockHashIgnored = if ignoreLockHash
|
lockHashIgnored = if ignoreLockHash
|
||||||
|
@ -1753,6 +1753,7 @@ in
|
||||||
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out;
|
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out;
|
||||||
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out;
|
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out;
|
||||||
serde_json = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_json."1.0.81" { inherit profileName; }).out;
|
serde_json = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_json."1.0.81" { inherit profileName; }).out;
|
||||||
|
systemstat = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".systemstat."0.2.3" { inherit profileName; }).out;
|
||||||
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out;
|
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out;
|
||||||
tokio_stream = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.8" { inherit profileName; }).out;
|
tokio_stream = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.8" { inherit profileName; }).out;
|
||||||
tracing = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.32" { inherit profileName; }).out;
|
tracing = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.32" { inherit profileName; }).out;
|
||||||
|
@ -4917,6 +4918,21 @@ in
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
|
"registry+https://github.com/rust-lang/crates.io-index".systemstat."0.2.3" = overridableMkRustCrate (profileName: rec {
|
||||||
|
name = "systemstat";
|
||||||
|
version = "0.2.3";
|
||||||
|
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||||
|
src = fetchCratesIo { inherit name version; sha256 = "a24aec24a9312c83999a28e3ef9db7e2afd5c64bf47725b758cdc1cafd5b0bd2"; };
|
||||||
|
dependencies = {
|
||||||
|
bytesize = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytesize."1.1.0" { inherit profileName; }).out;
|
||||||
|
lazy_static = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".lazy_static."1.4.0" { inherit profileName; }).out;
|
||||||
|
libc = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }).out;
|
||||||
|
${ if hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android" then "nom" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".nom."7.1.1" { inherit profileName; }).out;
|
||||||
|
time = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".time."0.3.9" { inherit profileName; }).out;
|
||||||
|
${ if hostPlatform.isWindows then "winapi" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".winapi."0.3.9" { inherit profileName; }).out;
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
"registry+https://github.com/rust-lang/crates.io-index".tempfile."3.3.0" = overridableMkRustCrate (profileName: rec {
|
"registry+https://github.com/rust-lang/crates.io-index".tempfile."3.3.0" = overridableMkRustCrate (profileName: rec {
|
||||||
name = "tempfile";
|
name = "tempfile";
|
||||||
version = "3.3.0";
|
version = "3.3.0";
|
||||||
|
@ -5890,6 +5906,7 @@ in
|
||||||
[ "ntsecapi" ]
|
[ "ntsecapi" ]
|
||||||
[ "ntstatus" ]
|
[ "ntstatus" ]
|
||||||
[ "objbase" ]
|
[ "objbase" ]
|
||||||
|
[ "pdh" ]
|
||||||
[ "processenv" ]
|
[ "processenv" ]
|
||||||
[ "processthreadsapi" ]
|
[ "processthreadsapi" ]
|
||||||
[ "profileapi" ]
|
[ "profileapi" ]
|
||||||
|
|
2
Makefile
2
Makefile
|
@ -4,7 +4,7 @@ all:
|
||||||
clear; cargo build
|
clear; cargo build
|
||||||
|
|
||||||
release:
|
release:
|
||||||
nix-build --arg release true
|
nix-build --attr pkgs.amd64.release --no-build-output
|
||||||
|
|
||||||
shell:
|
shell:
|
||||||
nix-shell
|
nix-shell
|
||||||
|
|
|
@ -15,6 +15,7 @@ use garage_util::time::*;
|
||||||
use garage_table::replication::*;
|
use garage_table::replication::*;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
|
use garage_rpc::ring::PARTITION_BITS;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
|
||||||
use garage_block::manager::BlockResyncErrorInfo;
|
use garage_block::manager::BlockResyncErrorInfo;
|
||||||
|
@ -783,6 +784,7 @@ impl AdminRpcHandler {
|
||||||
for node in ring.layout.node_ids().iter() {
|
for node in ring.layout.node_ids().iter() {
|
||||||
let mut opt = opt.clone();
|
let mut opt = opt.clone();
|
||||||
opt.all_nodes = false;
|
opt.all_nodes = false;
|
||||||
|
opt.skip_global = true;
|
||||||
|
|
||||||
writeln!(&mut ret, "\n======================").unwrap();
|
writeln!(&mut ret, "\n======================").unwrap();
|
||||||
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
|
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
|
||||||
|
@ -799,6 +801,15 @@ impl AdminRpcHandler {
|
||||||
Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
|
Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
writeln!(&mut ret, "\n======================").unwrap();
|
||||||
|
write!(
|
||||||
|
&mut ret,
|
||||||
|
"Cluster statistics:\n\n{}",
|
||||||
|
self.gather_cluster_stats()
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
Ok(AdminRpc::Ok(ret))
|
Ok(AdminRpc::Ok(ret))
|
||||||
} else {
|
} else {
|
||||||
Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
|
Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
|
||||||
|
@ -819,22 +830,6 @@ impl AdminRpcHandler {
|
||||||
|
|
||||||
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
|
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
|
||||||
|
|
||||||
// Gather ring statistics
|
|
||||||
let ring = self.garage.system.ring.borrow().clone();
|
|
||||||
let mut ring_nodes = HashMap::new();
|
|
||||||
for (_i, loc) in ring.partitions().iter() {
|
|
||||||
for n in ring.get_nodes(loc, ring.replication_factor).iter() {
|
|
||||||
if !ring_nodes.contains_key(n) {
|
|
||||||
ring_nodes.insert(*n, 0usize);
|
|
||||||
}
|
|
||||||
*ring_nodes.get_mut(n).unwrap() += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
writeln!(&mut ret, "\nRing nodes & partition count:").unwrap();
|
|
||||||
for (n, c) in ring_nodes.iter() {
|
|
||||||
writeln!(&mut ret, " {:?} {}", n, c).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gather table statistics
|
// Gather table statistics
|
||||||
let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
|
let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
|
||||||
table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?);
|
table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?);
|
||||||
|
@ -881,12 +876,108 @@ impl AdminRpcHandler {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
if !opt.detailed {
|
if !opt.detailed {
|
||||||
writeln!(&mut ret, "\nIf values are missing (marked as NC), consider adding the --detailed flag - this will be slow.").unwrap();
|
writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
if !opt.skip_global {
|
||||||
|
write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn gather_cluster_stats(&self) -> String {
|
||||||
|
let mut ret = String::new();
|
||||||
|
|
||||||
|
// Gather storage node and free space statistics
|
||||||
|
let layout = &self.garage.system.ring.borrow().layout;
|
||||||
|
let mut node_partition_count = HashMap::<Uuid, u64>::new();
|
||||||
|
for short_id in layout.ring_assignation_data.iter() {
|
||||||
|
let id = layout.node_id_vec[*short_id as usize];
|
||||||
|
*node_partition_count.entry(id).or_default() += 1;
|
||||||
|
}
|
||||||
|
let node_info = self
|
||||||
|
.garage
|
||||||
|
.system
|
||||||
|
.get_known_nodes()
|
||||||
|
.into_iter()
|
||||||
|
.map(|n| (n.id, n))
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
|
||||||
|
for (id, parts) in node_partition_count.iter() {
|
||||||
|
let info = node_info.get(id);
|
||||||
|
let status = info.map(|x| &x.status);
|
||||||
|
let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
|
||||||
|
let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
|
||||||
|
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
|
||||||
|
let capacity = role.map(|x| x.capacity_string()).unwrap_or("?".into());
|
||||||
|
let avail_str = |x| match x {
|
||||||
|
Some((avail, total)) => {
|
||||||
|
let pct = (avail as f64) / (total as f64) * 100.;
|
||||||
|
let avail = bytesize::ByteSize::b(avail);
|
||||||
|
let total = bytesize::ByteSize::b(total);
|
||||||
|
format!("{}/{} ({:.1}%)", avail, total, pct)
|
||||||
|
}
|
||||||
|
None => "?".into(),
|
||||||
|
};
|
||||||
|
let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
|
||||||
|
let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
|
||||||
|
table.push(format!(
|
||||||
|
" {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
|
||||||
|
id, hostname, zone, capacity, parts, data_avail, meta_avail
|
||||||
|
));
|
||||||
|
}
|
||||||
|
write!(
|
||||||
|
&mut ret,
|
||||||
|
"Storage nodes:\n{}",
|
||||||
|
format_table_to_string(table)
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let meta_part_avail = node_partition_count
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(id, parts)| {
|
||||||
|
node_info
|
||||||
|
.get(id)
|
||||||
|
.and_then(|x| x.status.meta_disk_avail)
|
||||||
|
.map(|c| c.0 / *parts)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let data_part_avail = node_partition_count
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(id, parts)| {
|
||||||
|
node_info
|
||||||
|
.get(id)
|
||||||
|
.and_then(|x| x.status.data_disk_avail)
|
||||||
|
.map(|c| c.0 / *parts)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
|
||||||
|
let meta_avail =
|
||||||
|
bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
|
||||||
|
let data_avail =
|
||||||
|
bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
|
||||||
|
writeln!(
|
||||||
|
&mut ret,
|
||||||
|
"\nEstimated available storage space cluster-wide (might be lower in practice):"
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
if meta_part_avail.len() < node_partition_count.len()
|
||||||
|
|| data_part_avail.len() < node_partition_count.len()
|
||||||
|
{
|
||||||
|
writeln!(&mut ret, " data: < {}", data_avail).unwrap();
|
||||||
|
writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
|
||||||
|
writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
|
||||||
|
} else {
|
||||||
|
writeln!(&mut ret, " data: {}", data_avail).unwrap();
|
||||||
|
writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
|
||||||
fn gather_table_stats<F, R>(
|
fn gather_table_stats<F, R>(
|
||||||
&self,
|
&self,
|
||||||
t: &Arc<Table<F, R>>,
|
t: &Arc<Table<F, R>>,
|
||||||
|
|
|
@ -59,18 +59,29 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
|
||||||
let layout = fetch_layout(rpc_cli, rpc_host).await?;
|
let layout = fetch_layout(rpc_cli, rpc_host).await?;
|
||||||
|
|
||||||
println!("==== HEALTHY NODES ====");
|
println!("==== HEALTHY NODES ====");
|
||||||
let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity".to_string()];
|
let mut healthy_nodes =
|
||||||
|
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail\tMetaAvail".to_string()];
|
||||||
for adv in status.iter().filter(|adv| adv.is_up) {
|
for adv in status.iter().filter(|adv| adv.is_up) {
|
||||||
match layout.roles.get(&adv.id) {
|
match layout.roles.get(&adv.id) {
|
||||||
Some(NodeRoleV(Some(cfg))) => {
|
Some(NodeRoleV(Some(cfg))) => {
|
||||||
|
let data_avail = match &adv.status.data_disk_avail {
|
||||||
|
_ if cfg.capacity.is_none() => "N/A".into(),
|
||||||
|
Some((avail, total)) => {
|
||||||
|
let pct = (*avail as f64) / (*total as f64) * 100.;
|
||||||
|
let avail = bytesize::ByteSize::b(*avail);
|
||||||
|
format!("{} ({:.1}%)", avail, pct)
|
||||||
|
}
|
||||||
|
None => "?".into(),
|
||||||
|
};
|
||||||
healthy_nodes.push(format!(
|
healthy_nodes.push(format!(
|
||||||
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}",
|
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
|
||||||
id = adv.id,
|
id = adv.id,
|
||||||
host = adv.status.hostname,
|
host = adv.status.hostname,
|
||||||
addr = adv.addr,
|
addr = adv.addr,
|
||||||
tags = cfg.tags.join(","),
|
tags = cfg.tags.join(","),
|
||||||
zone = cfg.zone,
|
zone = cfg.zone,
|
||||||
capacity = cfg.capacity_string(),
|
capacity = cfg.capacity_string(),
|
||||||
|
data_avail = data_avail,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
|
|
|
@ -504,6 +504,11 @@ pub struct StatsOpt {
|
||||||
/// Gather detailed statistics (this can be long)
|
/// Gather detailed statistics (this can be long)
|
||||||
#[structopt(short = "d", long = "detailed")]
|
#[structopt(short = "d", long = "detailed")]
|
||||||
pub detailed: bool,
|
pub detailed: bool,
|
||||||
|
|
||||||
|
/// Don't show global cluster stats (internal use in RPC)
|
||||||
|
#[structopt(skip)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub skip_global: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
|
||||||
|
|
|
@ -23,6 +23,7 @@ hex = "0.4"
|
||||||
tracing = "0.1.30"
|
tracing = "0.1.30"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
|
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
|
||||||
|
systemstat = "0.2.3"
|
||||||
|
|
||||||
async-trait = "0.1.7"
|
async-trait = "0.1.7"
|
||||||
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
||||||
|
|
|
@ -118,18 +118,28 @@ pub struct System {
|
||||||
|
|
||||||
/// Path to metadata directory
|
/// Path to metadata directory
|
||||||
pub metadata_dir: PathBuf,
|
pub metadata_dir: PathBuf,
|
||||||
|
/// Path to data directory
|
||||||
|
pub data_dir: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct NodeStatus {
|
pub struct NodeStatus {
|
||||||
/// Hostname of the node
|
/// Hostname of the node
|
||||||
pub hostname: String,
|
pub hostname: String,
|
||||||
|
|
||||||
/// Replication factor configured on the node
|
/// Replication factor configured on the node
|
||||||
pub replication_factor: usize,
|
pub replication_factor: usize,
|
||||||
/// Cluster layout version
|
/// Cluster layout version
|
||||||
pub cluster_layout_version: u64,
|
pub cluster_layout_version: u64,
|
||||||
/// Hash of cluster layout staging data
|
/// Hash of cluster layout staging data
|
||||||
pub cluster_layout_staging_hash: Hash,
|
pub cluster_layout_staging_hash: Hash,
|
||||||
|
|
||||||
|
/// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
|
||||||
|
#[serde(default)]
|
||||||
|
pub meta_disk_avail: Option<(u64, u64)>,
|
||||||
|
/// Disk usage on partition containing data directory (tuple: `(avail, total)`)
|
||||||
|
#[serde(default)]
|
||||||
|
pub data_disk_avail: Option<(u64, u64)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
@ -271,14 +281,8 @@ impl System {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let local_status = NodeStatus {
|
let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
|
||||||
hostname: gethostname::gethostname()
|
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir);
|
||||||
.into_string()
|
|
||||||
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
|
|
||||||
replication_factor,
|
|
||||||
cluster_layout_version: cluster_layout.version,
|
|
||||||
cluster_layout_staging_hash: cluster_layout.staging_hash,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
let metrics = SystemMetrics::new(replication_factor);
|
let metrics = SystemMetrics::new(replication_factor);
|
||||||
|
@ -379,6 +383,7 @@ impl System {
|
||||||
ring,
|
ring,
|
||||||
update_ring: Mutex::new(update_ring),
|
update_ring: Mutex::new(update_ring),
|
||||||
metadata_dir: config.metadata_dir.clone(),
|
metadata_dir: config.metadata_dir.clone(),
|
||||||
|
data_dir: config.data_dir.clone(),
|
||||||
});
|
});
|
||||||
sys.system_endpoint.set_handler(sys.clone());
|
sys.system_endpoint.set_handler(sys.clone());
|
||||||
Ok(sys)
|
Ok(sys)
|
||||||
|
@ -416,12 +421,7 @@ impl System {
|
||||||
.get(&n.id.into())
|
.get(&n.id.into())
|
||||||
.cloned()
|
.cloned()
|
||||||
.map(|(_, st)| st)
|
.map(|(_, st)| st)
|
||||||
.unwrap_or(NodeStatus {
|
.unwrap_or(NodeStatus::unknown()),
|
||||||
hostname: "?".to_string(),
|
|
||||||
replication_factor: 0,
|
|
||||||
cluster_layout_version: 0,
|
|
||||||
cluster_layout_staging_hash: Hash::from([0u8; 32]),
|
|
||||||
}),
|
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
known_nodes
|
known_nodes
|
||||||
|
@ -600,6 +600,9 @@ impl System {
|
||||||
let ring = self.ring.borrow();
|
let ring = self.ring.borrow();
|
||||||
new_si.cluster_layout_version = ring.layout.version;
|
new_si.cluster_layout_version = ring.layout.version;
|
||||||
new_si.cluster_layout_staging_hash = ring.layout.staging_hash;
|
new_si.cluster_layout_staging_hash = ring.layout.staging_hash;
|
||||||
|
|
||||||
|
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir);
|
||||||
|
|
||||||
self.local_status.swap(Arc::new(new_si));
|
self.local_status.swap(Arc::new(new_si));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -864,6 +867,48 @@ impl EndpointHandler<SystemRpc> for System {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl NodeStatus {
|
||||||
|
fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self {
|
||||||
|
NodeStatus {
|
||||||
|
hostname: gethostname::gethostname()
|
||||||
|
.into_string()
|
||||||
|
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
|
||||||
|
replication_factor,
|
||||||
|
cluster_layout_version: layout.version,
|
||||||
|
cluster_layout_staging_hash: layout.staging_hash,
|
||||||
|
meta_disk_avail: None,
|
||||||
|
data_disk_avail: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unknown() -> Self {
|
||||||
|
NodeStatus {
|
||||||
|
hostname: "?".to_string(),
|
||||||
|
replication_factor: 0,
|
||||||
|
cluster_layout_version: 0,
|
||||||
|
cluster_layout_staging_hash: Hash::from([0u8; 32]),
|
||||||
|
meta_disk_avail: None,
|
||||||
|
data_disk_avail: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path) {
|
||||||
|
use systemstat::{Platform, System};
|
||||||
|
let mounts = System::new().mounts().unwrap_or_default();
|
||||||
|
|
||||||
|
let mount_avail = |path: &Path| {
|
||||||
|
mounts
|
||||||
|
.iter()
|
||||||
|
.filter(|x| path.starts_with(&x.fs_mounted_on))
|
||||||
|
.max_by_key(|x| x.fs_mounted_on.len())
|
||||||
|
.map(|x| (x.avail.as_u64(), x.total.as_u64()))
|
||||||
|
};
|
||||||
|
|
||||||
|
self.meta_disk_avail = mount_avail(meta_dir);
|
||||||
|
self.data_disk_avail = mount_avail(data_dir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn get_default_ip() -> Option<IpAddr> {
|
fn get_default_ip() -> Option<IpAddr> {
|
||||||
pnet_datalink::interfaces()
|
pnet_datalink::interfaces()
|
||||||
.iter()
|
.iter()
|
||||||
|
|
Loading…
Reference in a new issue