admin api: move functions to their correct location
This commit is contained in:
parent
85a07c87d7
commit
6b420ff71d
4 changed files with 513 additions and 501 deletions
|
@ -1,11 +1,13 @@
|
||||||
|
use std::fmt::Write;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use garage_util::crdt::*;
|
use format_table::format_table_to_string;
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::Error as GarageError;
|
|
||||||
|
|
||||||
use garage_rpc::layout;
|
use garage_rpc::layout;
|
||||||
|
use garage_rpc::layout::PARTITION_BITS;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
|
||||||
|
@ -140,6 +142,108 @@ impl RequestHandler for GetClusterHealthRequest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl RequestHandler for GetClusterStatisticsRequest {
|
||||||
|
type Response = GetClusterStatisticsResponse;
|
||||||
|
|
||||||
|
// FIXME: return this as a JSON struct instead of text
|
||||||
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<GetClusterStatisticsResponse, Error> {
|
||||||
|
let mut ret = String::new();
|
||||||
|
|
||||||
|
// Gather storage node and free space statistics for current nodes
|
||||||
|
let layout = &garage.system.cluster_layout();
|
||||||
|
let mut node_partition_count = HashMap::<Uuid, u64>::new();
|
||||||
|
for short_id in layout.current().ring_assignment_data.iter() {
|
||||||
|
let id = layout.current().node_id_vec[*short_id as usize];
|
||||||
|
*node_partition_count.entry(id).or_default() += 1;
|
||||||
|
}
|
||||||
|
let node_info = garage
|
||||||
|
.system
|
||||||
|
.get_known_nodes()
|
||||||
|
.into_iter()
|
||||||
|
.map(|n| (n.id, n))
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
|
||||||
|
for (id, parts) in node_partition_count.iter() {
|
||||||
|
let info = node_info.get(id);
|
||||||
|
let status = info.map(|x| &x.status);
|
||||||
|
let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
|
||||||
|
let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
|
||||||
|
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
|
||||||
|
let capacity = role
|
||||||
|
.map(|x| x.capacity_string())
|
||||||
|
.unwrap_or_else(|| "?".into());
|
||||||
|
let avail_str = |x| match x {
|
||||||
|
Some((avail, total)) => {
|
||||||
|
let pct = (avail as f64) / (total as f64) * 100.;
|
||||||
|
let avail = bytesize::ByteSize::b(avail);
|
||||||
|
let total = bytesize::ByteSize::b(total);
|
||||||
|
format!("{}/{} ({:.1}%)", avail, total, pct)
|
||||||
|
}
|
||||||
|
None => "?".into(),
|
||||||
|
};
|
||||||
|
let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
|
||||||
|
let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
|
||||||
|
table.push(format!(
|
||||||
|
" {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
|
||||||
|
id, hostname, zone, capacity, parts, data_avail, meta_avail
|
||||||
|
));
|
||||||
|
}
|
||||||
|
write!(
|
||||||
|
&mut ret,
|
||||||
|
"Storage nodes:\n{}",
|
||||||
|
format_table_to_string(table)
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let meta_part_avail = node_partition_count
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(id, parts)| {
|
||||||
|
node_info
|
||||||
|
.get(id)
|
||||||
|
.and_then(|x| x.status.meta_disk_avail)
|
||||||
|
.map(|c| c.0 / *parts)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let data_part_avail = node_partition_count
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(id, parts)| {
|
||||||
|
node_info
|
||||||
|
.get(id)
|
||||||
|
.and_then(|x| x.status.data_disk_avail)
|
||||||
|
.map(|c| c.0 / *parts)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
|
||||||
|
let meta_avail =
|
||||||
|
bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
|
||||||
|
let data_avail =
|
||||||
|
bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
|
||||||
|
writeln!(
|
||||||
|
&mut ret,
|
||||||
|
"\nEstimated available storage space cluster-wide (might be lower in practice):"
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
if meta_part_avail.len() < node_partition_count.len()
|
||||||
|
|| data_part_avail.len() < node_partition_count.len()
|
||||||
|
{
|
||||||
|
writeln!(&mut ret, " data: < {}", data_avail).unwrap();
|
||||||
|
writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
|
||||||
|
writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
|
||||||
|
} else {
|
||||||
|
writeln!(&mut ret, " data: {}", data_avail).unwrap();
|
||||||
|
writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(GetClusterStatisticsResponse { freeform: ret })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl RequestHandler for ConnectClusterNodesRequest {
|
impl RequestHandler for ConnectClusterNodesRequest {
|
||||||
type Response = ConnectClusterNodesResponse;
|
type Response = ConnectClusterNodesResponse;
|
||||||
|
|
||||||
|
@ -165,396 +269,3 @@ impl RequestHandler for ConnectClusterNodesRequest {
|
||||||
Ok(ConnectClusterNodesResponse(res))
|
Ok(ConnectClusterNodesResponse(res))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RequestHandler for GetClusterLayoutRequest {
|
|
||||||
type Response = GetClusterLayoutResponse;
|
|
||||||
|
|
||||||
async fn handle(
|
|
||||||
self,
|
|
||||||
garage: &Arc<Garage>,
|
|
||||||
_admin: &Admin,
|
|
||||||
) -> Result<GetClusterLayoutResponse, Error> {
|
|
||||||
Ok(format_cluster_layout(
|
|
||||||
garage.system.cluster_layout().inner(),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse {
|
|
||||||
let current = layout.current();
|
|
||||||
|
|
||||||
let roles = current
|
|
||||||
.roles
|
|
||||||
.items()
|
|
||||||
.iter()
|
|
||||||
.filter_map(|(k, _, v)| v.0.clone().map(|x| (k, x)))
|
|
||||||
.map(|(k, v)| {
|
|
||||||
let stored_partitions = current.get_node_usage(k).ok().map(|x| x as u64);
|
|
||||||
LayoutNodeRole {
|
|
||||||
id: hex::encode(k),
|
|
||||||
zone: v.zone.clone(),
|
|
||||||
capacity: v.capacity,
|
|
||||||
stored_partitions,
|
|
||||||
usable_capacity: stored_partitions.map(|x| x * current.partition_size),
|
|
||||||
tags: v.tags.clone(),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let staged_role_changes = layout
|
|
||||||
.staging
|
|
||||||
.get()
|
|
||||||
.roles
|
|
||||||
.items()
|
|
||||||
.iter()
|
|
||||||
.filter(|(k, _, v)| current.roles.get(k) != Some(v))
|
|
||||||
.map(|(k, _, v)| match &v.0 {
|
|
||||||
None => NodeRoleChange {
|
|
||||||
id: hex::encode(k),
|
|
||||||
action: NodeRoleChangeEnum::Remove { remove: true },
|
|
||||||
},
|
|
||||||
Some(r) => NodeRoleChange {
|
|
||||||
id: hex::encode(k),
|
|
||||||
action: NodeRoleChangeEnum::Update(NodeAssignedRole {
|
|
||||||
zone: r.zone.clone(),
|
|
||||||
capacity: r.capacity,
|
|
||||||
tags: r.tags.clone(),
|
|
||||||
}),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let staged_parameters = if *layout.staging.get().parameters.get() != current.parameters {
|
|
||||||
Some((*layout.staging.get().parameters.get()).into())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
GetClusterLayoutResponse {
|
|
||||||
version: current.version,
|
|
||||||
roles,
|
|
||||||
partition_size: current.partition_size,
|
|
||||||
parameters: current.parameters.into(),
|
|
||||||
staged_role_changes,
|
|
||||||
staged_parameters,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RequestHandler for GetClusterLayoutHistoryRequest {
|
|
||||||
type Response = GetClusterLayoutHistoryResponse;
|
|
||||||
|
|
||||||
async fn handle(
|
|
||||||
self,
|
|
||||||
garage: &Arc<Garage>,
|
|
||||||
_admin: &Admin,
|
|
||||||
) -> Result<GetClusterLayoutHistoryResponse, Error> {
|
|
||||||
let layout_helper = garage.system.cluster_layout();
|
|
||||||
let layout = layout_helper.inner();
|
|
||||||
let min_stored = layout.min_stored();
|
|
||||||
|
|
||||||
let versions = layout
|
|
||||||
.versions
|
|
||||||
.iter()
|
|
||||||
.rev()
|
|
||||||
.chain(layout.old_versions.iter().rev())
|
|
||||||
.map(|ver| {
|
|
||||||
let status = if ver.version == layout.current().version {
|
|
||||||
ClusterLayoutVersionStatus::Current
|
|
||||||
} else if ver.version >= min_stored {
|
|
||||||
ClusterLayoutVersionStatus::Draining
|
|
||||||
} else {
|
|
||||||
ClusterLayoutVersionStatus::Historical
|
|
||||||
};
|
|
||||||
ClusterLayoutVersion {
|
|
||||||
version: ver.version,
|
|
||||||
status,
|
|
||||||
storage_nodes: ver
|
|
||||||
.roles
|
|
||||||
.items()
|
|
||||||
.iter()
|
|
||||||
.filter(
|
|
||||||
|(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_some()),
|
|
||||||
)
|
|
||||||
.count() as u64,
|
|
||||||
gateway_nodes: ver
|
|
||||||
.roles
|
|
||||||
.items()
|
|
||||||
.iter()
|
|
||||||
.filter(
|
|
||||||
|(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_none()),
|
|
||||||
)
|
|
||||||
.count() as u64,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let all_nodes = layout.get_all_nodes();
|
|
||||||
let min_ack = layout_helper.ack_map_min();
|
|
||||||
|
|
||||||
let update_trackers = if layout.versions.len() > 1 {
|
|
||||||
Some(
|
|
||||||
all_nodes
|
|
||||||
.iter()
|
|
||||||
.map(|node| {
|
|
||||||
(
|
|
||||||
hex::encode(&node),
|
|
||||||
NodeUpdateTrackers {
|
|
||||||
ack: layout.update_trackers.ack_map.get(node, min_stored),
|
|
||||||
sync: layout.update_trackers.sync_map.get(node, min_stored),
|
|
||||||
sync_ack: layout.update_trackers.sync_ack_map.get(node, min_stored),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(GetClusterLayoutHistoryResponse {
|
|
||||||
current_version: layout.current().version,
|
|
||||||
min_ack,
|
|
||||||
versions,
|
|
||||||
update_trackers,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ----
|
|
||||||
|
|
||||||
// ---- update functions ----
|
|
||||||
|
|
||||||
impl RequestHandler for UpdateClusterLayoutRequest {
|
|
||||||
type Response = UpdateClusterLayoutResponse;
|
|
||||||
|
|
||||||
async fn handle(
|
|
||||||
self,
|
|
||||||
garage: &Arc<Garage>,
|
|
||||||
_admin: &Admin,
|
|
||||||
) -> Result<UpdateClusterLayoutResponse, Error> {
|
|
||||||
let mut layout = garage.system.cluster_layout().inner().clone();
|
|
||||||
|
|
||||||
let mut roles = layout.current().roles.clone();
|
|
||||||
roles.merge(&layout.staging.get().roles);
|
|
||||||
|
|
||||||
for change in self.roles {
|
|
||||||
let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?;
|
|
||||||
let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
|
|
||||||
|
|
||||||
let new_role = match change.action {
|
|
||||||
NodeRoleChangeEnum::Remove { remove: true } => None,
|
|
||||||
NodeRoleChangeEnum::Update(NodeAssignedRole {
|
|
||||||
zone,
|
|
||||||
capacity,
|
|
||||||
tags,
|
|
||||||
}) => {
|
|
||||||
if matches!(capacity, Some(cap) if cap < 1024) {
|
|
||||||
return Err(Error::bad_request("Capacity should be at least 1K (1024)"));
|
|
||||||
}
|
|
||||||
Some(layout::NodeRole {
|
|
||||||
zone,
|
|
||||||
capacity,
|
|
||||||
tags,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
_ => return Err(Error::bad_request("Invalid layout change")),
|
|
||||||
};
|
|
||||||
|
|
||||||
layout
|
|
||||||
.staging
|
|
||||||
.get_mut()
|
|
||||||
.roles
|
|
||||||
.merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(param) = self.parameters {
|
|
||||||
if let ZoneRedundancy::AtLeast(r_int) = param.zone_redundancy {
|
|
||||||
if r_int > layout.current().replication_factor {
|
|
||||||
return Err(Error::bad_request(format!(
|
|
||||||
"The zone redundancy must be smaller or equal to the replication factor ({}).",
|
|
||||||
layout.current().replication_factor
|
|
||||||
)));
|
|
||||||
} else if r_int < 1 {
|
|
||||||
return Err(Error::bad_request(
|
|
||||||
"The zone redundancy must be at least 1.",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
layout.staging.get_mut().parameters.update(param.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
garage
|
|
||||||
.system
|
|
||||||
.layout_manager
|
|
||||||
.update_cluster_layout(&layout)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let res = format_cluster_layout(&layout);
|
|
||||||
Ok(UpdateClusterLayoutResponse(res))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RequestHandler for PreviewClusterLayoutChangesRequest {
|
|
||||||
type Response = PreviewClusterLayoutChangesResponse;
|
|
||||||
|
|
||||||
async fn handle(
|
|
||||||
self,
|
|
||||||
garage: &Arc<Garage>,
|
|
||||||
_admin: &Admin,
|
|
||||||
) -> Result<PreviewClusterLayoutChangesResponse, Error> {
|
|
||||||
let layout = garage.system.cluster_layout().inner().clone();
|
|
||||||
let new_ver = layout.current().version + 1;
|
|
||||||
match layout.apply_staged_changes(new_ver) {
|
|
||||||
Err(GarageError::Message(error)) => {
|
|
||||||
Ok(PreviewClusterLayoutChangesResponse::Error { error })
|
|
||||||
}
|
|
||||||
Err(e) => Err(e.into()),
|
|
||||||
Ok((new_layout, msg)) => Ok(PreviewClusterLayoutChangesResponse::Success {
|
|
||||||
message: msg,
|
|
||||||
new_layout: format_cluster_layout(&new_layout),
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RequestHandler for ApplyClusterLayoutRequest {
|
|
||||||
type Response = ApplyClusterLayoutResponse;
|
|
||||||
|
|
||||||
async fn handle(
|
|
||||||
self,
|
|
||||||
garage: &Arc<Garage>,
|
|
||||||
_admin: &Admin,
|
|
||||||
) -> Result<ApplyClusterLayoutResponse, Error> {
|
|
||||||
let layout = garage.system.cluster_layout().inner().clone();
|
|
||||||
let (layout, msg) = layout.apply_staged_changes(self.version)?;
|
|
||||||
|
|
||||||
garage
|
|
||||||
.system
|
|
||||||
.layout_manager
|
|
||||||
.update_cluster_layout(&layout)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(ApplyClusterLayoutResponse {
|
|
||||||
message: msg,
|
|
||||||
layout: format_cluster_layout(&layout),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RequestHandler for RevertClusterLayoutRequest {
|
|
||||||
type Response = RevertClusterLayoutResponse;
|
|
||||||
|
|
||||||
async fn handle(
|
|
||||||
self,
|
|
||||||
garage: &Arc<Garage>,
|
|
||||||
_admin: &Admin,
|
|
||||||
) -> Result<RevertClusterLayoutResponse, Error> {
|
|
||||||
let layout = garage.system.cluster_layout().inner().clone();
|
|
||||||
let layout = layout.revert_staged_changes()?;
|
|
||||||
garage
|
|
||||||
.system
|
|
||||||
.layout_manager
|
|
||||||
.update_cluster_layout(&layout)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let res = format_cluster_layout(&layout);
|
|
||||||
Ok(RevertClusterLayoutResponse(res))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RequestHandler for ClusterLayoutSkipDeadNodesRequest {
|
|
||||||
type Response = ClusterLayoutSkipDeadNodesResponse;
|
|
||||||
|
|
||||||
async fn handle(
|
|
||||||
self,
|
|
||||||
garage: &Arc<Garage>,
|
|
||||||
_admin: &Admin,
|
|
||||||
) -> Result<ClusterLayoutSkipDeadNodesResponse, Error> {
|
|
||||||
let status = garage.system.get_known_nodes();
|
|
||||||
|
|
||||||
let mut layout = garage.system.cluster_layout().inner().clone();
|
|
||||||
let mut ack_updated = vec![];
|
|
||||||
let mut sync_updated = vec![];
|
|
||||||
|
|
||||||
if layout.versions.len() == 1 {
|
|
||||||
return Err(Error::bad_request(
|
|
||||||
"This command cannot be called when there is only one live cluster layout version",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let min_v = layout.min_stored();
|
|
||||||
if self.version <= min_v || self.version > layout.current().version {
|
|
||||||
return Err(Error::bad_request(format!(
|
|
||||||
"Invalid version, you may use the following version numbers: {}",
|
|
||||||
(min_v + 1..=layout.current().version)
|
|
||||||
.map(|x| x.to_string())
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.join(" ")
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let all_nodes = layout.get_all_nodes();
|
|
||||||
for node in all_nodes.iter() {
|
|
||||||
// Update ACK tracker for dead nodes or for all nodes if --allow-missing-data
|
|
||||||
if self.allow_missing_data || !status.iter().any(|x| x.id == *node && x.is_up) {
|
|
||||||
if layout.update_trackers.ack_map.set_max(*node, self.version) {
|
|
||||||
ack_updated.push(hex::encode(node));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If --allow-missing-data, update SYNC tracker for all nodes.
|
|
||||||
if self.allow_missing_data {
|
|
||||||
if layout.update_trackers.sync_map.set_max(*node, self.version) {
|
|
||||||
sync_updated.push(hex::encode(node));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
garage
|
|
||||||
.system
|
|
||||||
.layout_manager
|
|
||||||
.update_cluster_layout(&layout)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(ClusterLayoutSkipDeadNodesResponse {
|
|
||||||
ack_updated,
|
|
||||||
sync_updated,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ----
|
|
||||||
|
|
||||||
impl From<layout::ZoneRedundancy> for ZoneRedundancy {
|
|
||||||
fn from(x: layout::ZoneRedundancy) -> Self {
|
|
||||||
match x {
|
|
||||||
layout::ZoneRedundancy::Maximum => ZoneRedundancy::Maximum,
|
|
||||||
layout::ZoneRedundancy::AtLeast(x) => ZoneRedundancy::AtLeast(x),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Into<layout::ZoneRedundancy> for ZoneRedundancy {
|
|
||||||
fn into(self) -> layout::ZoneRedundancy {
|
|
||||||
match self {
|
|
||||||
ZoneRedundancy::Maximum => layout::ZoneRedundancy::Maximum,
|
|
||||||
ZoneRedundancy::AtLeast(x) => layout::ZoneRedundancy::AtLeast(x),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<layout::LayoutParameters> for LayoutParameters {
|
|
||||||
fn from(x: layout::LayoutParameters) -> Self {
|
|
||||||
LayoutParameters {
|
|
||||||
zone_redundancy: x.zone_redundancy.into(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Into<layout::LayoutParameters> for LayoutParameters {
|
|
||||||
fn into(self) -> layout::LayoutParameters {
|
|
||||||
layout::LayoutParameters {
|
|
||||||
zone_redundancy: self.zone_redundancy.into(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
406
src/api/admin/layout.rs
Normal file
406
src/api/admin/layout.rs
Normal file
|
@ -0,0 +1,406 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use garage_util::crdt::*;
|
||||||
|
use garage_util::data::*;
|
||||||
|
use garage_util::error::Error as GarageError;
|
||||||
|
|
||||||
|
use garage_rpc::layout;
|
||||||
|
|
||||||
|
use garage_model::garage::Garage;
|
||||||
|
|
||||||
|
use crate::api::*;
|
||||||
|
use crate::error::*;
|
||||||
|
use crate::{Admin, RequestHandler};
|
||||||
|
|
||||||
|
impl RequestHandler for GetClusterLayoutRequest {
|
||||||
|
type Response = GetClusterLayoutResponse;
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<GetClusterLayoutResponse, Error> {
|
||||||
|
Ok(format_cluster_layout(
|
||||||
|
garage.system.cluster_layout().inner(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse {
|
||||||
|
let current = layout.current();
|
||||||
|
|
||||||
|
let roles = current
|
||||||
|
.roles
|
||||||
|
.items()
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(k, _, v)| v.0.clone().map(|x| (k, x)))
|
||||||
|
.map(|(k, v)| {
|
||||||
|
let stored_partitions = current.get_node_usage(k).ok().map(|x| x as u64);
|
||||||
|
LayoutNodeRole {
|
||||||
|
id: hex::encode(k),
|
||||||
|
zone: v.zone.clone(),
|
||||||
|
capacity: v.capacity,
|
||||||
|
stored_partitions,
|
||||||
|
usable_capacity: stored_partitions.map(|x| x * current.partition_size),
|
||||||
|
tags: v.tags.clone(),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let staged_role_changes = layout
|
||||||
|
.staging
|
||||||
|
.get()
|
||||||
|
.roles
|
||||||
|
.items()
|
||||||
|
.iter()
|
||||||
|
.filter(|(k, _, v)| current.roles.get(k) != Some(v))
|
||||||
|
.map(|(k, _, v)| match &v.0 {
|
||||||
|
None => NodeRoleChange {
|
||||||
|
id: hex::encode(k),
|
||||||
|
action: NodeRoleChangeEnum::Remove { remove: true },
|
||||||
|
},
|
||||||
|
Some(r) => NodeRoleChange {
|
||||||
|
id: hex::encode(k),
|
||||||
|
action: NodeRoleChangeEnum::Update(NodeAssignedRole {
|
||||||
|
zone: r.zone.clone(),
|
||||||
|
capacity: r.capacity,
|
||||||
|
tags: r.tags.clone(),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let staged_parameters = if *layout.staging.get().parameters.get() != current.parameters {
|
||||||
|
Some((*layout.staging.get().parameters.get()).into())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
GetClusterLayoutResponse {
|
||||||
|
version: current.version,
|
||||||
|
roles,
|
||||||
|
partition_size: current.partition_size,
|
||||||
|
parameters: current.parameters.into(),
|
||||||
|
staged_role_changes,
|
||||||
|
staged_parameters,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RequestHandler for GetClusterLayoutHistoryRequest {
|
||||||
|
type Response = GetClusterLayoutHistoryResponse;
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<GetClusterLayoutHistoryResponse, Error> {
|
||||||
|
let layout_helper = garage.system.cluster_layout();
|
||||||
|
let layout = layout_helper.inner();
|
||||||
|
let min_stored = layout.min_stored();
|
||||||
|
|
||||||
|
let versions = layout
|
||||||
|
.versions
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.chain(layout.old_versions.iter().rev())
|
||||||
|
.map(|ver| {
|
||||||
|
let status = if ver.version == layout.current().version {
|
||||||
|
ClusterLayoutVersionStatus::Current
|
||||||
|
} else if ver.version >= min_stored {
|
||||||
|
ClusterLayoutVersionStatus::Draining
|
||||||
|
} else {
|
||||||
|
ClusterLayoutVersionStatus::Historical
|
||||||
|
};
|
||||||
|
ClusterLayoutVersion {
|
||||||
|
version: ver.version,
|
||||||
|
status,
|
||||||
|
storage_nodes: ver
|
||||||
|
.roles
|
||||||
|
.items()
|
||||||
|
.iter()
|
||||||
|
.filter(
|
||||||
|
|(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_some()),
|
||||||
|
)
|
||||||
|
.count() as u64,
|
||||||
|
gateway_nodes: ver
|
||||||
|
.roles
|
||||||
|
.items()
|
||||||
|
.iter()
|
||||||
|
.filter(
|
||||||
|
|(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_none()),
|
||||||
|
)
|
||||||
|
.count() as u64,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let all_nodes = layout.get_all_nodes();
|
||||||
|
let min_ack = layout_helper.ack_map_min();
|
||||||
|
|
||||||
|
let update_trackers = if layout.versions.len() > 1 {
|
||||||
|
Some(
|
||||||
|
all_nodes
|
||||||
|
.iter()
|
||||||
|
.map(|node| {
|
||||||
|
(
|
||||||
|
hex::encode(&node),
|
||||||
|
NodeUpdateTrackers {
|
||||||
|
ack: layout.update_trackers.ack_map.get(node, min_stored),
|
||||||
|
sync: layout.update_trackers.sync_map.get(node, min_stored),
|
||||||
|
sync_ack: layout.update_trackers.sync_ack_map.get(node, min_stored),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(GetClusterLayoutHistoryResponse {
|
||||||
|
current_version: layout.current().version,
|
||||||
|
min_ack,
|
||||||
|
versions,
|
||||||
|
update_trackers,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
// ---- update functions ----
|
||||||
|
|
||||||
|
impl RequestHandler for UpdateClusterLayoutRequest {
|
||||||
|
type Response = UpdateClusterLayoutResponse;
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<UpdateClusterLayoutResponse, Error> {
|
||||||
|
let mut layout = garage.system.cluster_layout().inner().clone();
|
||||||
|
|
||||||
|
let mut roles = layout.current().roles.clone();
|
||||||
|
roles.merge(&layout.staging.get().roles);
|
||||||
|
|
||||||
|
for change in self.roles {
|
||||||
|
let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?;
|
||||||
|
let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
|
||||||
|
|
||||||
|
let new_role = match change.action {
|
||||||
|
NodeRoleChangeEnum::Remove { remove: true } => None,
|
||||||
|
NodeRoleChangeEnum::Update(NodeAssignedRole {
|
||||||
|
zone,
|
||||||
|
capacity,
|
||||||
|
tags,
|
||||||
|
}) => {
|
||||||
|
if matches!(capacity, Some(cap) if cap < 1024) {
|
||||||
|
return Err(Error::bad_request("Capacity should be at least 1K (1024)"));
|
||||||
|
}
|
||||||
|
Some(layout::NodeRole {
|
||||||
|
zone,
|
||||||
|
capacity,
|
||||||
|
tags,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
_ => return Err(Error::bad_request("Invalid layout change")),
|
||||||
|
};
|
||||||
|
|
||||||
|
layout
|
||||||
|
.staging
|
||||||
|
.get_mut()
|
||||||
|
.roles
|
||||||
|
.merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(param) = self.parameters {
|
||||||
|
if let ZoneRedundancy::AtLeast(r_int) = param.zone_redundancy {
|
||||||
|
if r_int > layout.current().replication_factor {
|
||||||
|
return Err(Error::bad_request(format!(
|
||||||
|
"The zone redundancy must be smaller or equal to the replication factor ({}).",
|
||||||
|
layout.current().replication_factor
|
||||||
|
)));
|
||||||
|
} else if r_int < 1 {
|
||||||
|
return Err(Error::bad_request(
|
||||||
|
"The zone redundancy must be at least 1.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
layout.staging.get_mut().parameters.update(param.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
garage
|
||||||
|
.system
|
||||||
|
.layout_manager
|
||||||
|
.update_cluster_layout(&layout)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let res = format_cluster_layout(&layout);
|
||||||
|
Ok(UpdateClusterLayoutResponse(res))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RequestHandler for PreviewClusterLayoutChangesRequest {
|
||||||
|
type Response = PreviewClusterLayoutChangesResponse;
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<PreviewClusterLayoutChangesResponse, Error> {
|
||||||
|
let layout = garage.system.cluster_layout().inner().clone();
|
||||||
|
let new_ver = layout.current().version + 1;
|
||||||
|
match layout.apply_staged_changes(new_ver) {
|
||||||
|
Err(GarageError::Message(error)) => {
|
||||||
|
Ok(PreviewClusterLayoutChangesResponse::Error { error })
|
||||||
|
}
|
||||||
|
Err(e) => Err(e.into()),
|
||||||
|
Ok((new_layout, msg)) => Ok(PreviewClusterLayoutChangesResponse::Success {
|
||||||
|
message: msg,
|
||||||
|
new_layout: format_cluster_layout(&new_layout),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RequestHandler for ApplyClusterLayoutRequest {
|
||||||
|
type Response = ApplyClusterLayoutResponse;
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<ApplyClusterLayoutResponse, Error> {
|
||||||
|
let layout = garage.system.cluster_layout().inner().clone();
|
||||||
|
let (layout, msg) = layout.apply_staged_changes(self.version)?;
|
||||||
|
|
||||||
|
garage
|
||||||
|
.system
|
||||||
|
.layout_manager
|
||||||
|
.update_cluster_layout(&layout)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(ApplyClusterLayoutResponse {
|
||||||
|
message: msg,
|
||||||
|
layout: format_cluster_layout(&layout),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RequestHandler for RevertClusterLayoutRequest {
|
||||||
|
type Response = RevertClusterLayoutResponse;
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<RevertClusterLayoutResponse, Error> {
|
||||||
|
let layout = garage.system.cluster_layout().inner().clone();
|
||||||
|
let layout = layout.revert_staged_changes()?;
|
||||||
|
garage
|
||||||
|
.system
|
||||||
|
.layout_manager
|
||||||
|
.update_cluster_layout(&layout)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let res = format_cluster_layout(&layout);
|
||||||
|
Ok(RevertClusterLayoutResponse(res))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RequestHandler for ClusterLayoutSkipDeadNodesRequest {
|
||||||
|
type Response = ClusterLayoutSkipDeadNodesResponse;
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<ClusterLayoutSkipDeadNodesResponse, Error> {
|
||||||
|
let status = garage.system.get_known_nodes();
|
||||||
|
|
||||||
|
let mut layout = garage.system.cluster_layout().inner().clone();
|
||||||
|
let mut ack_updated = vec![];
|
||||||
|
let mut sync_updated = vec![];
|
||||||
|
|
||||||
|
if layout.versions.len() == 1 {
|
||||||
|
return Err(Error::bad_request(
|
||||||
|
"This command cannot be called when there is only one live cluster layout version",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let min_v = layout.min_stored();
|
||||||
|
if self.version <= min_v || self.version > layout.current().version {
|
||||||
|
return Err(Error::bad_request(format!(
|
||||||
|
"Invalid version, you may use the following version numbers: {}",
|
||||||
|
(min_v + 1..=layout.current().version)
|
||||||
|
.map(|x| x.to_string())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(" ")
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let all_nodes = layout.get_all_nodes();
|
||||||
|
for node in all_nodes.iter() {
|
||||||
|
// Update ACK tracker for dead nodes or for all nodes if --allow-missing-data
|
||||||
|
if self.allow_missing_data || !status.iter().any(|x| x.id == *node && x.is_up) {
|
||||||
|
if layout.update_trackers.ack_map.set_max(*node, self.version) {
|
||||||
|
ack_updated.push(hex::encode(node));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If --allow-missing-data, update SYNC tracker for all nodes.
|
||||||
|
if self.allow_missing_data {
|
||||||
|
if layout.update_trackers.sync_map.set_max(*node, self.version) {
|
||||||
|
sync_updated.push(hex::encode(node));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
garage
|
||||||
|
.system
|
||||||
|
.layout_manager
|
||||||
|
.update_cluster_layout(&layout)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(ClusterLayoutSkipDeadNodesResponse {
|
||||||
|
ack_updated,
|
||||||
|
sync_updated,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
impl From<layout::ZoneRedundancy> for ZoneRedundancy {
|
||||||
|
fn from(x: layout::ZoneRedundancy) -> Self {
|
||||||
|
match x {
|
||||||
|
layout::ZoneRedundancy::Maximum => ZoneRedundancy::Maximum,
|
||||||
|
layout::ZoneRedundancy::AtLeast(x) => ZoneRedundancy::AtLeast(x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Into<layout::ZoneRedundancy> for ZoneRedundancy {
|
||||||
|
fn into(self) -> layout::ZoneRedundancy {
|
||||||
|
match self {
|
||||||
|
ZoneRedundancy::Maximum => layout::ZoneRedundancy::Maximum,
|
||||||
|
ZoneRedundancy::AtLeast(x) => layout::ZoneRedundancy::AtLeast(x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<layout::LayoutParameters> for LayoutParameters {
|
||||||
|
fn from(x: layout::LayoutParameters) -> Self {
|
||||||
|
LayoutParameters {
|
||||||
|
zone_redundancy: x.zone_redundancy.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Into<layout::LayoutParameters> for LayoutParameters {
|
||||||
|
fn into(self) -> layout::LayoutParameters {
|
||||||
|
layout::LayoutParameters {
|
||||||
|
zone_redundancy: self.zone_redundancy.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -14,6 +14,7 @@ mod router_v2;
|
||||||
mod bucket;
|
mod bucket;
|
||||||
mod cluster;
|
mod cluster;
|
||||||
mod key;
|
mod key;
|
||||||
|
mod layout;
|
||||||
mod special;
|
mod special;
|
||||||
|
|
||||||
mod block;
|
mod block;
|
||||||
|
|
|
@ -1,17 +1,13 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use format_table::format_table_to_string;
|
use format_table::format_table_to_string;
|
||||||
|
|
||||||
use garage_util::data::*;
|
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
|
|
||||||
use garage_table::replication::*;
|
use garage_table::replication::*;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
use garage_rpc::layout::PARTITION_BITS;
|
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
|
||||||
use crate::api::*;
|
use crate::api::*;
|
||||||
|
@ -114,108 +110,6 @@ impl RequestHandler for LocalGetNodeStatisticsRequest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RequestHandler for GetClusterStatisticsRequest {
|
|
||||||
type Response = GetClusterStatisticsResponse;
|
|
||||||
|
|
||||||
// FIXME: return this as a JSON struct instead of text
|
|
||||||
async fn handle(
|
|
||||||
self,
|
|
||||||
garage: &Arc<Garage>,
|
|
||||||
_admin: &Admin,
|
|
||||||
) -> Result<GetClusterStatisticsResponse, Error> {
|
|
||||||
let mut ret = String::new();
|
|
||||||
|
|
||||||
// Gather storage node and free space statistics for current nodes
|
|
||||||
let layout = &garage.system.cluster_layout();
|
|
||||||
let mut node_partition_count = HashMap::<Uuid, u64>::new();
|
|
||||||
for short_id in layout.current().ring_assignment_data.iter() {
|
|
||||||
let id = layout.current().node_id_vec[*short_id as usize];
|
|
||||||
*node_partition_count.entry(id).or_default() += 1;
|
|
||||||
}
|
|
||||||
let node_info = garage
|
|
||||||
.system
|
|
||||||
.get_known_nodes()
|
|
||||||
.into_iter()
|
|
||||||
.map(|n| (n.id, n))
|
|
||||||
.collect::<HashMap<_, _>>();
|
|
||||||
|
|
||||||
let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
|
|
||||||
for (id, parts) in node_partition_count.iter() {
|
|
||||||
let info = node_info.get(id);
|
|
||||||
let status = info.map(|x| &x.status);
|
|
||||||
let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
|
|
||||||
let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
|
|
||||||
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
|
|
||||||
let capacity = role
|
|
||||||
.map(|x| x.capacity_string())
|
|
||||||
.unwrap_or_else(|| "?".into());
|
|
||||||
let avail_str = |x| match x {
|
|
||||||
Some((avail, total)) => {
|
|
||||||
let pct = (avail as f64) / (total as f64) * 100.;
|
|
||||||
let avail = bytesize::ByteSize::b(avail);
|
|
||||||
let total = bytesize::ByteSize::b(total);
|
|
||||||
format!("{}/{} ({:.1}%)", avail, total, pct)
|
|
||||||
}
|
|
||||||
None => "?".into(),
|
|
||||||
};
|
|
||||||
let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
|
|
||||||
let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
|
|
||||||
table.push(format!(
|
|
||||||
" {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
|
|
||||||
id, hostname, zone, capacity, parts, data_avail, meta_avail
|
|
||||||
));
|
|
||||||
}
|
|
||||||
write!(
|
|
||||||
&mut ret,
|
|
||||||
"Storage nodes:\n{}",
|
|
||||||
format_table_to_string(table)
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let meta_part_avail = node_partition_count
|
|
||||||
.iter()
|
|
||||||
.filter_map(|(id, parts)| {
|
|
||||||
node_info
|
|
||||||
.get(id)
|
|
||||||
.and_then(|x| x.status.meta_disk_avail)
|
|
||||||
.map(|c| c.0 / *parts)
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
let data_part_avail = node_partition_count
|
|
||||||
.iter()
|
|
||||||
.filter_map(|(id, parts)| {
|
|
||||||
node_info
|
|
||||||
.get(id)
|
|
||||||
.and_then(|x| x.status.data_disk_avail)
|
|
||||||
.map(|c| c.0 / *parts)
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
|
|
||||||
let meta_avail =
|
|
||||||
bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
|
|
||||||
let data_avail =
|
|
||||||
bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
|
|
||||||
writeln!(
|
|
||||||
&mut ret,
|
|
||||||
"\nEstimated available storage space cluster-wide (might be lower in practice):"
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
if meta_part_avail.len() < node_partition_count.len()
|
|
||||||
|| data_part_avail.len() < node_partition_count.len()
|
|
||||||
{
|
|
||||||
writeln!(&mut ret, " data: < {}", data_avail).unwrap();
|
|
||||||
writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
|
|
||||||
writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
|
|
||||||
} else {
|
|
||||||
writeln!(&mut ret, " data: {}", data_avail).unwrap();
|
|
||||||
writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(GetClusterStatisticsResponse { freeform: ret })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn gather_table_stats<F, R>(t: &Arc<Table<F, R>>) -> Result<String, Error>
|
fn gather_table_stats<F, R>(t: &Arc<Table<F, R>>) -> Result<String, Error>
|
||||||
where
|
where
|
||||||
F: TableSchema + 'static,
|
F: TableSchema + 'static,
|
||||||
|
|
Loading…
Add table
Reference in a new issue