Use bytes as capacity units

This commit is contained in:
Alex 2022-11-07 21:12:11 +01:00
parent fd5bc142b5
commit 73a4ca8b15
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
6 changed files with 67 additions and 53 deletions

1
Cargo.lock generated
View file

@ -1215,6 +1215,7 @@ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"bytes", "bytes",
"bytesize",
"err-derive", "err-derive",
"futures", "futures",
"futures-util", "futures-util",

View file

@ -1,3 +1,5 @@
use bytesize::ByteSize;
use garage_util::crdt::Crdt; use garage_util::crdt::Crdt;
use garage_util::error::*; use garage_util::error::*;
use garage_util::formater::format_table; use garage_util::formater::format_table;
@ -86,7 +88,7 @@ pub async fn cmd_assign_role(
return Err(Error::Message( return Err(Error::Message(
"-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
} }
if args.capacity == Some(0) { if args.capacity == Some(ByteSize::b(0)) {
return Err(Error::Message("Invalid capacity value: 0".into())); return Err(Error::Message("Invalid capacity value: 0".into()));
} }
@ -94,7 +96,7 @@ pub async fn cmd_assign_role(
let new_entry = match roles.get(&added_node) { let new_entry = match roles.get(&added_node) {
Some(NodeRoleV(Some(old))) => { Some(NodeRoleV(Some(old))) => {
let capacity = match args.capacity { let capacity = match args.capacity {
Some(c) => Some(c), Some(c) => Some(c.as_u64()),
None if args.gateway => None, None if args.gateway => None,
None => old.capacity, None => old.capacity,
}; };
@ -111,7 +113,7 @@ pub async fn cmd_assign_role(
} }
_ => { _ => {
let capacity = match args.capacity { let capacity = match args.capacity {
Some(c) => Some(c), Some(c) => Some(c.as_u64()),
None if args.gateway => None, None if args.gateway => None,
None => return Err(Error::Message( None => return Err(Error::Message(
"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
@ -265,6 +267,7 @@ pub async fn cmd_config_layout(
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut layout = fetch_layout(rpc_cli, rpc_host).await?; let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
let mut did_something = false;
match config_opt.redundancy { match config_opt.redundancy {
None => (), None => (),
Some(r) => { Some(r) => {
@ -282,9 +285,16 @@ pub async fn cmd_config_layout(
.update(LayoutParameters { zone_redundancy: r }); .update(LayoutParameters { zone_redundancy: r });
println!("The new zone redundancy has been saved ({}).", r); println!("The new zone redundancy has been saved ({}).", r);
} }
did_something = true;
} }
} }
if !did_something {
return Err(Error::Message(
"Please specify an action for `garage layout config` to do".into(),
));
}
send_layout(rpc_cli, rpc_host, layout).await?; send_layout(rpc_cli, rpc_host, layout).await?;
Ok(()) Ok(())
} }
@ -335,7 +345,7 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
tags, tags,
role.zone, role.zone,
role.capacity_string(), role.capacity_string(),
usage as u32 * layout.partition_size, ByteSize::b(usage as u64 * layout.partition_size).to_string_as(false),
(100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32) (100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32)
)); ));
} }

View file

@ -114,9 +114,9 @@ pub struct AssignRoleOpt {
#[structopt(short = "z", long = "zone")] #[structopt(short = "z", long = "zone")]
pub(crate) zone: Option<String>, pub(crate) zone: Option<String>,
/// Capacity (in relative terms) /// Storage capacity, in bytes (supported suffixes: B, KB, MB, GB, TB, PB)
#[structopt(short = "c", long = "capacity")] #[structopt(short = "c", long = "capacity")]
pub(crate) capacity: Option<u32>, pub(crate) capacity: Option<bytesize::ByteSize>,
/// Gateway-only node /// Gateway-only node
#[structopt(short = "g", long = "gateway")] #[structopt(short = "g", long = "gateway")]

View file

@ -18,6 +18,7 @@ garage_util = { version = "0.8.0", path = "../util" }
arc-swap = "1.0" arc-swap = "1.0"
bytes = "1.0" bytes = "1.0"
bytesize = "1.1"
gethostname = "0.2" gethostname = "0.2"
hex = "0.4" hex = "0.4"
tracing = "0.1.30" tracing = "0.1.30"

View file

@ -23,8 +23,8 @@ pub enum Vertex {
/// Edge data structure for the flow algorithm. /// Edge data structure for the flow algorithm.
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub struct FlowEdge { pub struct FlowEdge {
cap: u32, // flow maximal capacity of the edge cap: u64, // flow maximal capacity of the edge
flow: i32, // flow value on the edge flow: i64, // flow value on the edge
dest: usize, // destination vertex id dest: usize, // destination vertex id
rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v
} }
@ -32,7 +32,7 @@ pub struct FlowEdge {
/// Edge data structure for the detection of negative cycles. /// Edge data structure for the detection of negative cycles.
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub struct WeightedEdge { pub struct WeightedEdge {
w: i32, // weight of the edge w: i64, // weight of the edge
dest: usize, dest: usize,
} }
@ -51,7 +51,7 @@ pub struct Graph<E: Edge> {
graph: Vec<Vec<E>>, graph: Vec<Vec<E>>,
} }
pub type CostFunction = HashMap<(Vertex, Vertex), i32>; pub type CostFunction = HashMap<(Vertex, Vertex), i64>;
impl<E: Edge> Graph<E> { impl<E: Edge> Graph<E> {
pub fn new(vertices: &[Vertex]) -> Self { pub fn new(vertices: &[Vertex]) -> Self {
@ -77,7 +77,7 @@ impl<E: Edge> Graph<E> {
impl Graph<FlowEdge> { impl Graph<FlowEdge> {
/// This function adds a directed edge to the graph with capacity c, and the /// This function adds a directed edge to the graph with capacity c, and the
/// corresponding reversed edge with capacity 0. /// corresponding reversed edge with capacity 0.
pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u32) -> Result<(), String> { pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u64) -> Result<(), String> {
let idu = self.get_vertex_id(&u)?; let idu = self.get_vertex_id(&u)?;
let idv = self.get_vertex_id(&v)?; let idv = self.get_vertex_id(&v)?;
if idu == idv { if idu == idv {
@ -115,7 +115,7 @@ impl Graph<FlowEdge> {
} }
/// This function returns the value of the flow incoming to v. /// This function returns the value of the flow incoming to v.
pub fn get_inflow(&self, v: Vertex) -> Result<i32, String> { pub fn get_inflow(&self, v: Vertex) -> Result<i64, String> {
let idv = self.get_vertex_id(&v)?; let idv = self.get_vertex_id(&v)?;
let mut result = 0; let mut result = 0;
for edge in self.graph[idv].iter() { for edge in self.graph[idv].iter() {
@ -125,7 +125,7 @@ impl Graph<FlowEdge> {
} }
/// This function returns the value of the flow outgoing from v. /// This function returns the value of the flow outgoing from v.
pub fn get_outflow(&self, v: Vertex) -> Result<i32, String> { pub fn get_outflow(&self, v: Vertex) -> Result<i64, String> {
let idv = self.get_vertex_id(&v)?; let idv = self.get_vertex_id(&v)?;
let mut result = 0; let mut result = 0;
for edge in self.graph[idv].iter() { for edge in self.graph[idv].iter() {
@ -136,7 +136,7 @@ impl Graph<FlowEdge> {
/// This function computes the flow total value by computing the outgoing flow /// This function computes the flow total value by computing the outgoing flow
/// from the source. /// from the source.
pub fn get_flow_value(&mut self) -> Result<i32, String> { pub fn get_flow_value(&mut self) -> Result<i64, String> {
self.get_outflow(Vertex::Source) self.get_outflow(Vertex::Source)
} }
@ -156,7 +156,7 @@ impl Graph<FlowEdge> {
} }
/// Computes an upper bound of the flow on the graph /// Computes an upper bound of the flow on the graph
pub fn flow_upper_bound(&self) -> Result<u32, String> { pub fn flow_upper_bound(&self) -> Result<u64, String> {
let idsource = self.get_vertex_id(&Vertex::Source)?; let idsource = self.get_vertex_id(&Vertex::Source)?;
let mut flow_upper_bound = 0; let mut flow_upper_bound = 0;
for edge in self.graph[idsource].iter() { for edge in self.graph[idsource].iter() {
@ -193,7 +193,7 @@ impl Graph<FlowEdge> {
// it means id has not yet been reached // it means id has not yet been reached
level[id] = Some(lvl); level[id] = Some(lvl);
for edge in self.graph[id].iter() { for edge in self.graph[id].iter() {
if edge.cap as i32 - edge.flow > 0 { if edge.cap as i64 - edge.flow > 0 {
fifo.push_back((edge.dest, lvl + 1)); fifo.push_back((edge.dest, lvl + 1));
} }
} }
@ -216,10 +216,10 @@ impl Graph<FlowEdge> {
lifo.pop(); lifo.pop();
while let Some((id, _)) = lifo.pop() { while let Some((id, _)) = lifo.pop() {
let nbd = next_nbd[id]; let nbd = next_nbd[id];
self.graph[id][nbd].flow += f as i32; self.graph[id][nbd].flow += f as i64;
let id_rev = self.graph[id][nbd].dest; let id_rev = self.graph[id][nbd].dest;
let nbd_rev = self.graph[id][nbd].rev; let nbd_rev = self.graph[id][nbd].rev;
self.graph[id_rev][nbd_rev].flow -= f as i32; self.graph[id_rev][nbd_rev].flow -= f as i64;
} }
lifo.push((idsource, flow_upper_bound)); lifo.push((idsource, flow_upper_bound));
continue; continue;
@ -236,9 +236,9 @@ impl Graph<FlowEdge> {
} }
// else we can try to send flow from id to its nbd // else we can try to send flow from id to its nbd
let new_flow = min( let new_flow = min(
f as i32, f as i64,
self.graph[id][nbd].cap as i32 - self.graph[id][nbd].flow, self.graph[id][nbd].cap as i64 - self.graph[id][nbd].flow,
) as u32; ) as u64;
if new_flow == 0 { if new_flow == 0 {
next_nbd[id] += 1; next_nbd[id] += 1;
continue; continue;
@ -302,7 +302,7 @@ impl Graph<FlowEdge> {
let nb_vertices = self.id_to_vertex.len(); let nb_vertices = self.id_to_vertex.len();
for i in 0..nb_vertices { for i in 0..nb_vertices {
for edge in self.graph[i].iter() { for edge in self.graph[i].iter() {
if edge.cap as i32 - edge.flow > 0 { if edge.cap as i64 - edge.flow > 0 {
// It is possible to send overflow through this edge // It is possible to send overflow through this edge
let u = self.id_to_vertex[i]; let u = self.id_to_vertex[i];
let v = self.id_to_vertex[edge.dest]; let v = self.id_to_vertex[edge.dest];
@ -322,7 +322,7 @@ impl Graph<FlowEdge> {
impl Graph<WeightedEdge> { impl Graph<WeightedEdge> {
/// This function adds a single directed weighted edge to the graph. /// This function adds a single directed weighted edge to the graph.
pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i32) -> Result<(), String> { pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i64) -> Result<(), String> {
let idu = self.get_vertex_id(&u)?; let idu = self.get_vertex_id(&u)?;
let idv = self.get_vertex_id(&v)?; let idv = self.get_vertex_id(&v)?;
self.graph[idu].push(WeightedEdge { w, dest: idv }); self.graph[idu].push(WeightedEdge { w, dest: idv });

View file

@ -2,7 +2,7 @@ use std::cmp::Ordering;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use hex::ToHex; use bytesize::ByteSize;
use itertools::Itertools; use itertools::Itertools;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -32,7 +32,7 @@ pub struct ClusterLayout {
/// This attribute is only used to retain the previously computed partition size, /// This attribute is only used to retain the previously computed partition size,
/// to know to what extent does it change with the layout update. /// to know to what extent does it change with the layout update.
pub partition_size: u32, pub partition_size: u64,
/// Parameters used to compute the assignation currently given by /// Parameters used to compute the assignation currently given by
/// ring_assignation_data /// ring_assignation_data
pub parameters: LayoutParameters, pub parameters: LayoutParameters,
@ -86,8 +86,7 @@ pub struct NodeRole {
/// The capacity of the node /// The capacity of the node
/// If this is set to None, the node does not participate in storing data for the system /// If this is set to None, the node does not participate in storing data for the system
/// and is only active as an API gateway to other nodes /// and is only active as an API gateway to other nodes
// TODO : change the capacity to u64 and use byte unit input/output pub capacity: Option<u64>,
pub capacity: Option<u32>,
/// A set of tags to recognize the node /// A set of tags to recognize the node
pub tags: Vec<String>, pub tags: Vec<String>,
} }
@ -95,7 +94,7 @@ pub struct NodeRole {
impl NodeRole { impl NodeRole {
pub fn capacity_string(&self) -> String { pub fn capacity_string(&self) -> String {
match self.capacity { match self.capacity {
Some(c) => format!("{}", c), Some(c) => ByteSize::b(c).to_string_as(false),
None => "gateway".to_string(), None => "gateway".to_string(),
} }
} }
@ -264,7 +263,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
} }
/// Given a node uuids, this function returns its capacity or fails if it does not have any /// Given a node uuids, this function returns its capacity or fails if it does not have any
pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u32, Error> { pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> {
match self.node_role(uuid) { match self.node_role(uuid) {
Some(NodeRole { Some(NodeRole {
capacity: Some(cap), capacity: Some(cap),
@ -300,7 +299,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
} }
/// Returns the sum of capacities of non gateway nodes in the cluster /// Returns the sum of capacities of non gateway nodes in the cluster
pub fn get_total_capacity(&self) -> Result<u32, Error> { pub fn get_total_capacity(&self) -> Result<u64, Error> {
let mut total_capacity = 0; let mut total_capacity = 0;
for uuid in self.nongateway_nodes().iter() { for uuid in self.nongateway_nodes().iter() {
total_capacity += self.get_node_capacity(uuid)?; total_capacity += self.get_node_capacity(uuid)?;
@ -458,13 +457,14 @@ impl ClusterLayout {
if old_assignation_opt != None { if old_assignation_opt != None {
msg.push(format!( msg.push(format!(
"Optimal size of a partition: {} (was {} in the previous layout).", "Optimal size of a partition: {} (was {} in the previous layout).",
partition_size, self.partition_size ByteSize::b(partition_size).to_string_as(false),
ByteSize::b(self.partition_size).to_string_as(false)
)); ));
} else { } else {
msg.push(format!( msg.push(format!(
"Given the replication and redundancy constraints, the \ "Given the replication and redundancy constraints, the \
optimal size of a partition is {}.", optimal size of a partition is {}.",
partition_size ByteSize::b(partition_size).to_string_as(false)
)); ));
} }
// We write the partition size. // We write the partition size.
@ -613,7 +613,7 @@ impl ClusterLayout {
fn compute_optimal_partition_size( fn compute_optimal_partition_size(
&self, &self,
zone_to_id: &HashMap<String, usize>, zone_to_id: &HashMap<String, usize>,
) -> Result<u32, Error> { ) -> Result<u64, Error> {
let empty_set = HashSet::<(usize, usize)>::new(); let empty_set = HashSet::<(usize, usize)>::new();
let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?; let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?;
g.compute_maximal_flow()?; g.compute_maximal_flow()?;
@ -672,7 +672,7 @@ impl ClusterLayout {
/// previous one. /// previous one.
fn generate_flow_graph( fn generate_flow_graph(
&self, &self,
partition_size: u32, partition_size: u64,
zone_to_id: &HashMap<String, usize>, zone_to_id: &HashMap<String, usize>,
exclude_assoc: &HashSet<(usize, usize)>, exclude_assoc: &HashSet<(usize, usize)>,
) -> Result<Graph<FlowEdge>, Error> { ) -> Result<Graph<FlowEdge>, Error> {
@ -682,18 +682,18 @@ impl ClusterLayout {
let nb_zones = zone_to_id.len(); let nb_zones = zone_to_id.len();
let redundancy = self.parameters.zone_redundancy; let redundancy = self.parameters.zone_redundancy;
for p in 0..NB_PARTITIONS { for p in 0..NB_PARTITIONS {
g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u32)?; g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u64)?;
g.add_edge( g.add_edge(
Vertex::Source, Vertex::Source,
Vertex::Pdown(p), Vertex::Pdown(p),
(self.replication_factor - redundancy) as u32, (self.replication_factor - redundancy) as u64,
)?; )?;
for z in 0..nb_zones { for z in 0..nb_zones {
g.add_edge(Vertex::Pup(p), Vertex::PZ(p, z), 1)?; g.add_edge(Vertex::Pup(p), Vertex::PZ(p, z), 1)?;
g.add_edge( g.add_edge(
Vertex::Pdown(p), Vertex::Pdown(p),
Vertex::PZ(p, z), Vertex::PZ(p, z),
self.replication_factor as u32, self.replication_factor as u64,
)?; )?;
} }
} }
@ -813,17 +813,19 @@ impl ClusterLayout {
) -> Result<Message, Error> { ) -> Result<Message, Error> {
let mut msg = Message::new(); let mut msg = Message::new();
let used_cap = self.partition_size * NB_PARTITIONS as u32 * self.replication_factor as u32; let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64;
let total_cap = self.get_total_capacity()?; let total_cap = self.get_total_capacity()?;
let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32); let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32);
msg.push("".into()); msg.push("".into());
msg.push(format!( msg.push(format!(
"Usable capacity / Total cluster capacity: {} / {} ({:.1} %)", "Usable capacity / Total cluster capacity: {} / {} ({:.1} %)",
used_cap, total_cap, percent_cap ByteSize::b(used_cap).to_string_as(false),
ByteSize::b(total_cap).to_string_as(false),
percent_cap
)); ));
msg.push("".into()); msg.push("".into());
msg.push( msg.push(
"If the percentage is to low, it might be that the \ "If the percentage is too low, it might be that the \
replication/redundancy constraints force the use of nodes/zones with small \ replication/redundancy constraints force the use of nodes/zones with small \
storage capacities. \ storage capacities. \
You might want to rebalance the storage capacities or relax the constraints. \ You might want to rebalance the storage capacities or relax the constraints. \
@ -833,9 +835,9 @@ impl ClusterLayout {
msg.push(format!( msg.push(format!(
"Recall that because of the replication factor, the actual available \ "Recall that because of the replication factor, the actual available \
storage capacity is {} / {} = {}.", storage capacity is {} / {} = {}.",
used_cap, ByteSize::b(used_cap).to_string_as(false),
self.replication_factor, self.replication_factor,
used_cap / self.replication_factor as u32 ByteSize::b(used_cap / self.replication_factor as u64).to_string_as(false)
)); ));
// We define and fill in the following tables // We define and fill in the following tables
@ -914,34 +916,34 @@ impl ClusterLayout {
replicated_partitions replicated_partitions
)); ));
let available_cap_z: u32 = self.partition_size * replicated_partitions as u32; let available_cap_z: u64 = self.partition_size * replicated_partitions as u64;
let mut total_cap_z = 0; let mut total_cap_z = 0;
for n in nodes_of_z.iter() { for n in nodes_of_z.iter() {
total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?;
} }
let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32); let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32);
msg.push(format!( msg.push(format!(
" Usable capacity / Total capacity: {}/{} ({:.1}%).", " Usable capacity / Total capacity: {} / {} ({:.1}%).",
available_cap_z, total_cap_z, percent_cap_z ByteSize::b(available_cap_z).to_string_as(false),
ByteSize::b(total_cap_z).to_string_as(false),
percent_cap_z
)); ));
for n in nodes_of_z.iter() { for n in nodes_of_z.iter() {
let available_cap_n = stored_partitions[*n] as u32 * self.partition_size; let available_cap_n = stored_partitions[*n] as u64 * self.partition_size;
let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?; let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?;
let tags_n = (self let tags_n = (self
.node_role(&self.node_id_vec[*n]) .node_role(&self.node_id_vec[*n])
.ok_or("Node not found."))? .ok_or("Node not found."))?
.tags_string(); .tags_string();
msg.push(format!( msg.push(format!(
" Node {}: {} partitions ({} new) ; \ " Node {:?}: {} partitions ({} new) ; \
usable/total capacity: {} / {} ({:.1}%) ; tags:{}", usable/total capacity: {} / {} ({:.1}%) ; tags:{}",
&self.node_id_vec[*n].to_vec()[0..2] self.node_id_vec[*n],
.to_vec()
.encode_hex::<String>(),
stored_partitions[*n], stored_partitions[*n],
new_partitions[*n], new_partitions[*n],
available_cap_n, ByteSize::b(available_cap_n).to_string_as(false),
total_cap_n, ByteSize::b(total_cap_n).to_string_as(false),
(available_cap_n as f32) / (total_cap_n as f32) * 100.0, (available_cap_n as f32) / (total_cap_n as f32) * 100.0,
tags_n tags_n
)); ));
@ -1041,7 +1043,7 @@ mod tests {
fn update_layout( fn update_layout(
cl: &mut ClusterLayout, cl: &mut ClusterLayout,
node_id_vec: &Vec<u8>, node_id_vec: &Vec<u8>,
node_capacity_vec: &Vec<u32>, node_capacity_vec: &Vec<u64>,
node_zone_vec: &Vec<String>, node_zone_vec: &Vec<String>,
zone_redundancy: usize, zone_redundancy: usize,
) { ) {