From 73a4ca8b1515f95bf7860fc292c12db83d3c6228 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 7 Nov 2022 21:12:11 +0100 Subject: [PATCH] Use bytes as capacity units --- Cargo.lock | 1 + src/garage/cli/layout.rs | 18 +++++++++--- src/garage/cli/structs.rs | 4 +-- src/rpc/Cargo.toml | 1 + src/rpc/graph_algo.rs | 34 ++++++++++----------- src/rpc/layout.rs | 62 ++++++++++++++++++++------------------- 6 files changed, 67 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 75c25628..c9f63a19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1215,6 +1215,7 @@ dependencies = [ "arc-swap", "async-trait", "bytes", + "bytesize", "err-derive", "futures", "futures-util", diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 4b23a096..85af345a 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -1,3 +1,5 @@ +use bytesize::ByteSize; + use garage_util::crdt::Crdt; use garage_util::error::*; use garage_util::formater::format_table; @@ -86,7 +88,7 @@ pub async fn cmd_assign_role( return Err(Error::Message( "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); } - if args.capacity == Some(0) { + if args.capacity == Some(ByteSize::b(0)) { return Err(Error::Message("Invalid capacity value: 0".into())); } @@ -94,7 +96,7 @@ pub async fn cmd_assign_role( let new_entry = match roles.get(&added_node) { Some(NodeRoleV(Some(old))) => { let capacity = match args.capacity { - Some(c) => Some(c), + Some(c) => Some(c.as_u64()), None if args.gateway => None, None => old.capacity, }; @@ -111,7 +113,7 @@ pub async fn cmd_assign_role( } _ => { let capacity = match args.capacity { - Some(c) => Some(c), + Some(c) => Some(c.as_u64()), None if args.gateway => None, None => return Err(Error::Message( "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), @@ -265,6 +267,7 @@ pub async fn cmd_config_layout( ) -> Result<(), Error> { let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + let mut did_something = false; match config_opt.redundancy { None => (), Some(r) => { @@ -282,9 +285,16 @@ pub async fn cmd_config_layout( .update(LayoutParameters { zone_redundancy: r }); println!("The new zone redundancy has been saved ({}).", r); } + did_something = true; } } + if !did_something { + return Err(Error::Message( + "Please specify an action for `garage layout config` to do".into(), + )); + } + send_layout(rpc_cli, rpc_host, layout).await?; Ok(()) } @@ -335,7 +345,7 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { tags, role.zone, role.capacity_string(), - usage as u32 * layout.partition_size, + ByteSize::b(usage as u64 * layout.partition_size).to_string_as(false), (100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32) )); } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 64798952..49a1f267 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -114,9 +114,9 @@ pub struct AssignRoleOpt { #[structopt(short = "z", long = "zone")] pub(crate) zone: Option, - /// Capacity (in relative terms) + /// Storage capacity, in bytes (supported suffixes: B, KB, MB, GB, TB, PB) #[structopt(short = "c", long = "capacity")] - pub(crate) capacity: Option, + pub(crate) capacity: Option, /// Gateway-only node #[structopt(short = "g", long = "gateway")] diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 5a427131..1b411c6a 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -18,6 +18,7 @@ garage_util = { version = "0.8.0", path = "../util" } arc-swap = "1.0" bytes = "1.0" +bytesize = "1.1" gethostname = "0.2" hex = "0.4" tracing = "0.1.30" diff --git a/src/rpc/graph_algo.rs b/src/rpc/graph_algo.rs index 1e4a819b..f181e2ba 100644 --- a/src/rpc/graph_algo.rs +++ b/src/rpc/graph_algo.rs @@ -23,8 +23,8 @@ pub enum Vertex { /// Edge data structure for the flow algorithm. #[derive(Clone, Copy, Debug)] pub struct FlowEdge { - cap: u32, // flow maximal capacity of the edge - flow: i32, // flow value on the edge + cap: u64, // flow maximal capacity of the edge + flow: i64, // flow value on the edge dest: usize, // destination vertex id rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v } @@ -32,7 +32,7 @@ pub struct FlowEdge { /// Edge data structure for the detection of negative cycles. #[derive(Clone, Copy, Debug)] pub struct WeightedEdge { - w: i32, // weight of the edge + w: i64, // weight of the edge dest: usize, } @@ -51,7 +51,7 @@ pub struct Graph { graph: Vec>, } -pub type CostFunction = HashMap<(Vertex, Vertex), i32>; +pub type CostFunction = HashMap<(Vertex, Vertex), i64>; impl Graph { pub fn new(vertices: &[Vertex]) -> Self { @@ -77,7 +77,7 @@ impl Graph { impl Graph { /// This function adds a directed edge to the graph with capacity c, and the /// corresponding reversed edge with capacity 0. - pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u32) -> Result<(), String> { + pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u64) -> Result<(), String> { let idu = self.get_vertex_id(&u)?; let idv = self.get_vertex_id(&v)?; if idu == idv { @@ -115,7 +115,7 @@ impl Graph { } /// This function returns the value of the flow incoming to v. - pub fn get_inflow(&self, v: Vertex) -> Result { + pub fn get_inflow(&self, v: Vertex) -> Result { let idv = self.get_vertex_id(&v)?; let mut result = 0; for edge in self.graph[idv].iter() { @@ -125,7 +125,7 @@ impl Graph { } /// This function returns the value of the flow outgoing from v. - pub fn get_outflow(&self, v: Vertex) -> Result { + pub fn get_outflow(&self, v: Vertex) -> Result { let idv = self.get_vertex_id(&v)?; let mut result = 0; for edge in self.graph[idv].iter() { @@ -136,7 +136,7 @@ impl Graph { /// This function computes the flow total value by computing the outgoing flow /// from the source. - pub fn get_flow_value(&mut self) -> Result { + pub fn get_flow_value(&mut self) -> Result { self.get_outflow(Vertex::Source) } @@ -156,7 +156,7 @@ impl Graph { } /// Computes an upper bound of the flow on the graph - pub fn flow_upper_bound(&self) -> Result { + pub fn flow_upper_bound(&self) -> Result { let idsource = self.get_vertex_id(&Vertex::Source)?; let mut flow_upper_bound = 0; for edge in self.graph[idsource].iter() { @@ -193,7 +193,7 @@ impl Graph { // it means id has not yet been reached level[id] = Some(lvl); for edge in self.graph[id].iter() { - if edge.cap as i32 - edge.flow > 0 { + if edge.cap as i64 - edge.flow > 0 { fifo.push_back((edge.dest, lvl + 1)); } } @@ -216,10 +216,10 @@ impl Graph { lifo.pop(); while let Some((id, _)) = lifo.pop() { let nbd = next_nbd[id]; - self.graph[id][nbd].flow += f as i32; + self.graph[id][nbd].flow += f as i64; let id_rev = self.graph[id][nbd].dest; let nbd_rev = self.graph[id][nbd].rev; - self.graph[id_rev][nbd_rev].flow -= f as i32; + self.graph[id_rev][nbd_rev].flow -= f as i64; } lifo.push((idsource, flow_upper_bound)); continue; @@ -236,9 +236,9 @@ impl Graph { } // else we can try to send flow from id to its nbd let new_flow = min( - f as i32, - self.graph[id][nbd].cap as i32 - self.graph[id][nbd].flow, - ) as u32; + f as i64, + self.graph[id][nbd].cap as i64 - self.graph[id][nbd].flow, + ) as u64; if new_flow == 0 { next_nbd[id] += 1; continue; @@ -302,7 +302,7 @@ impl Graph { let nb_vertices = self.id_to_vertex.len(); for i in 0..nb_vertices { for edge in self.graph[i].iter() { - if edge.cap as i32 - edge.flow > 0 { + if edge.cap as i64 - edge.flow > 0 { // It is possible to send overflow through this edge let u = self.id_to_vertex[i]; let v = self.id_to_vertex[edge.dest]; @@ -322,7 +322,7 @@ impl Graph { impl Graph { /// This function adds a single directed weighted edge to the graph. - pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i32) -> Result<(), String> { + pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i64) -> Result<(), String> { let idu = self.get_vertex_id(&u)?; let idv = self.get_vertex_id(&v)?; self.graph[idu].push(WeightedEdge { w, dest: idv }); diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 15765662..3c80b213 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -2,7 +2,7 @@ use std::cmp::Ordering; use std::collections::HashMap; use std::collections::HashSet; -use hex::ToHex; +use bytesize::ByteSize; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -32,7 +32,7 @@ pub struct ClusterLayout { /// This attribute is only used to retain the previously computed partition size, /// to know to what extent does it change with the layout update. - pub partition_size: u32, + pub partition_size: u64, /// Parameters used to compute the assignation currently given by /// ring_assignation_data pub parameters: LayoutParameters, @@ -86,8 +86,7 @@ pub struct NodeRole { /// The capacity of the node /// If this is set to None, the node does not participate in storing data for the system /// and is only active as an API gateway to other nodes - // TODO : change the capacity to u64 and use byte unit input/output - pub capacity: Option, + pub capacity: Option, /// A set of tags to recognize the node pub tags: Vec, } @@ -95,7 +94,7 @@ pub struct NodeRole { impl NodeRole { pub fn capacity_string(&self) -> String { match self.capacity { - Some(c) => format!("{}", c), + Some(c) => ByteSize::b(c).to_string_as(false), None => "gateway".to_string(), } } @@ -264,7 +263,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } /// Given a node uuids, this function returns its capacity or fails if it does not have any - pub fn get_node_capacity(&self, uuid: &Uuid) -> Result { + pub fn get_node_capacity(&self, uuid: &Uuid) -> Result { match self.node_role(uuid) { Some(NodeRole { capacity: Some(cap), @@ -300,7 +299,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } /// Returns the sum of capacities of non gateway nodes in the cluster - pub fn get_total_capacity(&self) -> Result { + pub fn get_total_capacity(&self) -> Result { let mut total_capacity = 0; for uuid in self.nongateway_nodes().iter() { total_capacity += self.get_node_capacity(uuid)?; @@ -458,13 +457,14 @@ impl ClusterLayout { if old_assignation_opt != None { msg.push(format!( "Optimal size of a partition: {} (was {} in the previous layout).", - partition_size, self.partition_size + ByteSize::b(partition_size).to_string_as(false), + ByteSize::b(self.partition_size).to_string_as(false) )); } else { msg.push(format!( "Given the replication and redundancy constraints, the \ optimal size of a partition is {}.", - partition_size + ByteSize::b(partition_size).to_string_as(false) )); } // We write the partition size. @@ -613,7 +613,7 @@ impl ClusterLayout { fn compute_optimal_partition_size( &self, zone_to_id: &HashMap, - ) -> Result { + ) -> Result { let empty_set = HashSet::<(usize, usize)>::new(); let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?; g.compute_maximal_flow()?; @@ -672,7 +672,7 @@ impl ClusterLayout { /// previous one. fn generate_flow_graph( &self, - partition_size: u32, + partition_size: u64, zone_to_id: &HashMap, exclude_assoc: &HashSet<(usize, usize)>, ) -> Result, Error> { @@ -682,18 +682,18 @@ impl ClusterLayout { let nb_zones = zone_to_id.len(); let redundancy = self.parameters.zone_redundancy; for p in 0..NB_PARTITIONS { - g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u32)?; + g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u64)?; g.add_edge( Vertex::Source, Vertex::Pdown(p), - (self.replication_factor - redundancy) as u32, + (self.replication_factor - redundancy) as u64, )?; for z in 0..nb_zones { g.add_edge(Vertex::Pup(p), Vertex::PZ(p, z), 1)?; g.add_edge( Vertex::Pdown(p), Vertex::PZ(p, z), - self.replication_factor as u32, + self.replication_factor as u64, )?; } } @@ -813,17 +813,19 @@ impl ClusterLayout { ) -> Result { let mut msg = Message::new(); - let used_cap = self.partition_size * NB_PARTITIONS as u32 * self.replication_factor as u32; + let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64; let total_cap = self.get_total_capacity()?; let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32); msg.push("".into()); msg.push(format!( "Usable capacity / Total cluster capacity: {} / {} ({:.1} %)", - used_cap, total_cap, percent_cap + ByteSize::b(used_cap).to_string_as(false), + ByteSize::b(total_cap).to_string_as(false), + percent_cap )); msg.push("".into()); msg.push( - "If the percentage is to low, it might be that the \ + "If the percentage is too low, it might be that the \ replication/redundancy constraints force the use of nodes/zones with small \ storage capacities. \ You might want to rebalance the storage capacities or relax the constraints. \ @@ -833,9 +835,9 @@ impl ClusterLayout { msg.push(format!( "Recall that because of the replication factor, the actual available \ storage capacity is {} / {} = {}.", - used_cap, + ByteSize::b(used_cap).to_string_as(false), self.replication_factor, - used_cap / self.replication_factor as u32 + ByteSize::b(used_cap / self.replication_factor as u64).to_string_as(false) )); // We define and fill in the following tables @@ -914,34 +916,34 @@ impl ClusterLayout { replicated_partitions )); - let available_cap_z: u32 = self.partition_size * replicated_partitions as u32; + let available_cap_z: u64 = self.partition_size * replicated_partitions as u64; let mut total_cap_z = 0; for n in nodes_of_z.iter() { total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?; } let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32); msg.push(format!( - " Usable capacity / Total capacity: {}/{} ({:.1}%).", - available_cap_z, total_cap_z, percent_cap_z + " Usable capacity / Total capacity: {} / {} ({:.1}%).", + ByteSize::b(available_cap_z).to_string_as(false), + ByteSize::b(total_cap_z).to_string_as(false), + percent_cap_z )); for n in nodes_of_z.iter() { - let available_cap_n = stored_partitions[*n] as u32 * self.partition_size; + let available_cap_n = stored_partitions[*n] as u64 * self.partition_size; let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?; let tags_n = (self .node_role(&self.node_id_vec[*n]) .ok_or("Node not found."))? .tags_string(); msg.push(format!( - " Node {}: {} partitions ({} new) ; \ + " Node {:?}: {} partitions ({} new) ; \ usable/total capacity: {} / {} ({:.1}%) ; tags:{}", - &self.node_id_vec[*n].to_vec()[0..2] - .to_vec() - .encode_hex::(), + self.node_id_vec[*n], stored_partitions[*n], new_partitions[*n], - available_cap_n, - total_cap_n, + ByteSize::b(available_cap_n).to_string_as(false), + ByteSize::b(total_cap_n).to_string_as(false), (available_cap_n as f32) / (total_cap_n as f32) * 100.0, tags_n )); @@ -1041,7 +1043,7 @@ mod tests { fn update_layout( cl: &mut ClusterLayout, node_id_vec: &Vec, - node_capacity_vec: &Vec, + node_capacity_vec: &Vec, node_zone_vec: &Vec, zone_redundancy: usize, ) {