diff --git a/doc/api/garage-admin-v2.json b/doc/api/garage-admin-v2.json index e7b42620..97de3a71 100644 --- a/doc/api/garage-admin-v2.json +++ b/doc/api/garage-admin-v2.json @@ -157,6 +157,40 @@ } } }, + "/v2/ClusterLayoutSkipDeadNodes": { + "post": { + "tags": [ + "Cluster layout" + ], + "description": "Force progress in layout update trackers", + "operationId": "ClusterLayoutSkipDeadNodes", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ClusterLayoutSkipDeadNodesRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Request has been taken into account", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ClusterLayoutSkipDeadNodesResponse" + } + } + } + }, + "500": { + "description": "Internal server error" + } + } + } + }, "/v2/ConnectClusterNodes": { "post": { "tags": [ @@ -512,6 +546,30 @@ } } }, + "/v2/GetClusterLayoutHistory": { + "get": { + "tags": [ + "Cluster layout" + ], + "description": "\nReturns the history of layouts in the cluster\n ", + "operationId": "GetClusterLayoutHistory", + "responses": { + "200": { + "description": "Cluster layout history", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/GetClusterLayoutHistoryResponse" + } + } + } + }, + "500": { + "description": "Internal server error" + } + } + } + }, "/v2/GetClusterStatistics": { "get": { "tags": [ @@ -950,6 +1008,30 @@ } } }, + "/v2/PreviewClusterLayoutChanges": { + "post": { + "tags": [ + "Cluster layout" + ], + "description": "\nComputes a new layout taking into account the staged parameters, and returns it with detailed statistics. The new layout is not applied in the cluster.\n\n*Note: do not try to parse the `message` field of the response, it is given as an array of string specifically because its format is not stable.*\n ", + "operationId": "PreviewClusterLayoutChanges", + "responses": { + "200": { + "description": "Information about the new layout", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/PreviewClusterLayoutChangesResponse" + } + } + } + }, + "500": { + "description": "Internal server error" + } + } + } + }, "/v2/PurgeBlocks": { "post": { "tags": [ @@ -1330,6 +1412,7 @@ "version": { "type": "integer", "format": "int64", + "description": "As a safety measure, the new version number of the layout must\nbe specified here", "minimum": 0 } } @@ -1342,13 +1425,15 @@ ], "properties": { "layout": { - "$ref": "#/components/schemas/GetClusterLayoutResponse" + "$ref": "#/components/schemas/GetClusterLayoutResponse", + "description": "Details about the new cluster layout" }, "message": { "type": "array", "items": { "type": "string" - } + }, + "description": "Plain-text information about the layout computation\n(do not try to parse this)" } } }, @@ -1576,6 +1661,89 @@ } } }, + "ClusterLayoutSkipDeadNodesRequest": { + "type": "object", + "required": [ + "version", + "allowMissingData" + ], + "properties": { + "allowMissingData": { + "type": "boolean", + "description": "Allow the skip even if a quorum of nodes could not be found for\nthe data among the remaining nodes" + }, + "version": { + "type": "integer", + "format": "int64", + "description": "Version number of the layout to assume is currently up-to-date.\nThis will generally be the current layout version.", + "minimum": 0 + } + } + }, + "ClusterLayoutSkipDeadNodesResponse": { + "type": "object", + "required": [ + "ackUpdated", + "syncUpdated" + ], + "properties": { + "ackUpdated": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Nodes for which the ACK update tracker has been updated to `version`" + }, + "syncUpdated": { + "type": "array", + "items": { + "type": "string" + }, + "description": "If `allow_missing_data` is set,\nnodes for which the SYNC update tracker has been updated to `version`" + } + } + }, + "ClusterLayoutVersion": { + "type": "object", + "required": [ + "version", + "status", + "storageNodes", + "gatewayNodes" + ], + "properties": { + "gatewayNodes": { + "type": "integer", + "format": "int64", + "description": "Number of nodes with a gateway role in this layout version", + "minimum": 0 + }, + "status": { + "$ref": "#/components/schemas/ClusterLayoutVersionStatus", + "description": "Status of this layout version" + }, + "storageNodes": { + "type": "integer", + "format": "int64", + "description": "Number of nodes with an assigned storage capacity in this layout version", + "minimum": 0 + }, + "version": { + "type": "integer", + "format": "int64", + "description": "Version number of this layout version", + "minimum": 0 + } + } + }, + "ClusterLayoutVersionStatus": { + "type": "string", + "enum": [ + "Current", + "Draining", + "Historical" + ] + }, "ConnectClusterNodesRequest": { "type": "array", "items": { @@ -1679,11 +1847,13 @@ "available": { "type": "integer", "format": "int64", + "description": "Number of bytes available", "minimum": 0 }, "total": { "type": "integer", "format": "int64", + "description": "Total number of bytes", "minimum": 0 } } @@ -1870,29 +2040,97 @@ } } }, + "GetClusterLayoutHistoryResponse": { + "type": "object", + "required": [ + "currentVersion", + "minAck", + "versions" + ], + "properties": { + "currentVersion": { + "type": "integer", + "format": "int64", + "description": "The current version number of the cluster layout", + "minimum": 0 + }, + "minAck": { + "type": "integer", + "format": "int64", + "description": "All nodes in the cluster are aware of layout versions up to\nthis version number (at least)", + "minimum": 0 + }, + "updateTrackers": { + "type": [ + "object", + "null" + ], + "description": "Detailed update trackers for nodes (see\n`https://garagehq.deuxfleurs.fr/blog/2023-12-preserving-read-after-write-consistency/`)", + "additionalProperties": { + "$ref": "#/components/schemas/NodeUpdateTrackers" + }, + "propertyNames": { + "type": "string" + } + }, + "versions": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ClusterLayoutVersion" + }, + "description": "Layout version history" + } + } + }, "GetClusterLayoutResponse": { "type": "object", "required": [ "version", "roles", + "parameters", + "partitionSize", "stagedRoleChanges" ], "properties": { + "parameters": { + "$ref": "#/components/schemas/LayoutParameters", + "description": "Layout parameters used when the current layout was computed" + }, + "partitionSize": { + "type": "integer", + "format": "int64", + "description": "The size, in bytes, of one Garage partition (= a shard)", + "minimum": 0 + }, "roles": { "type": "array", "items": { - "$ref": "#/components/schemas/NodeRoleResp" - } + "$ref": "#/components/schemas/LayoutNodeRole" + }, + "description": "List of nodes that currently have a role in the cluster layout" + }, + "stagedParameters": { + "oneOf": [ + { + "type": "null" + }, + { + "$ref": "#/components/schemas/LayoutParameters", + "description": "Layout parameters to use when computing the next version of\nthe cluster layout" + } + ] }, "stagedRoleChanges": { "type": "array", "items": { "$ref": "#/components/schemas/NodeRoleChange" - } + }, + "description": "List of nodes that will have a new role or whose role will be\nremoved in the next version of the cluster layout" }, "version": { "type": "integer", "format": "int64", + "description": "The current version number of the cluster layout", "minimum": 0 } } @@ -1918,13 +2156,15 @@ "layoutVersion": { "type": "integer", "format": "int64", + "description": "Current version number of the cluster layout", "minimum": 0 }, "nodes": { "type": "array", "items": { "$ref": "#/components/schemas/NodeResp" - } + }, + "description": "List of nodes that are either currently connected, part of the\ncurrent cluster layout, or part of an older cluster layout that\nis still active in the cluster (being drained)." } } }, @@ -2021,6 +2261,70 @@ } } }, + "LayoutNodeRole": { + "type": "object", + "required": [ + "id", + "zone", + "tags" + ], + "properties": { + "capacity": { + "type": [ + "integer", + "null" + ], + "format": "int64", + "description": "Capacity (in bytes) assigned by the cluster administrator,\nabsent for gateway nodes", + "minimum": 0 + }, + "id": { + "type": "string", + "description": "Identifier of the node" + }, + "storedPartitions": { + "type": [ + "integer", + "null" + ], + "format": "int64", + "description": "Number of partitions stored on this node\n(a result of the layout computation)", + "minimum": 0 + }, + "tags": { + "type": "array", + "items": { + "type": "string" + }, + "description": "List of tags assigned by the cluster administrator" + }, + "usableCapacity": { + "type": [ + "integer", + "null" + ], + "format": "int64", + "description": "Capacity (in bytes) that is actually usable on this node in the current\nlayout, which is equal to `stored_partitions` × `partition_size`", + "minimum": 0 + }, + "zone": { + "type": "string", + "description": "Zone name assigned by the cluster administrator" + } + } + }, + "LayoutParameters": { + "type": "object", + "required": [ + "zoneRedundancy" + ], + "properties": { + "zoneRedundancy": { + "$ref": "#/components/schemas/ZoneRedundancy", + "description": "Minimum number of zones in which a data partition must be replicated" + } + } + }, "ListBucketsResponse": { "type": "array", "items": { @@ -2804,6 +3108,35 @@ } } }, + "NodeAssignedRole": { + "type": "object", + "required": [ + "zone", + "tags" + ], + "properties": { + "capacity": { + "type": [ + "integer", + "null" + ], + "format": "int64", + "description": "Capacity (in bytes) assigned by the cluster administrator,\nabsent for gateway nodes", + "minimum": 0 + }, + "tags": { + "type": "array", + "items": { + "type": "string" + }, + "description": "List of tags assigned by the cluster administrator" + }, + "zone": { + "type": "string", + "description": "Zone name assigned by the cluster administrator" + } + } + }, "NodeResp": { "type": "object", "required": [ @@ -2816,7 +3149,8 @@ "type": [ "string", "null" - ] + ], + "description": "Socket address used by other nodes to connect to this node for RPC" }, "dataPartition": { "oneOf": [ @@ -2824,24 +3158,29 @@ "type": "null" }, { - "$ref": "#/components/schemas/FreeSpaceResp" + "$ref": "#/components/schemas/FreeSpaceResp", + "description": "Total and available space on the disk partition(s) containing the data\ndirectory(ies)" } ] }, "draining": { - "type": "boolean" + "type": "boolean", + "description": "Whether this node is part of an older layout version and is draining data." }, "hostname": { "type": [ "string", "null" - ] + ], + "description": "Hostname of the node" }, "id": { - "type": "string" + "type": "string", + "description": "Full-length node identifier" }, "isUp": { - "type": "boolean" + "type": "boolean", + "description": "Whether this node is connected in the cluster" }, "lastSeenSecsAgo": { "type": [ @@ -2849,6 +3188,7 @@ "null" ], "format": "int64", + "description": "For disconnected nodes, the number of seconds since last contact,\nor `null` if no contact was established since Garage restarted.", "minimum": 0 }, "metadataPartition": { @@ -2857,7 +3197,8 @@ "type": "null" }, { - "$ref": "#/components/schemas/FreeSpaceResp" + "$ref": "#/components/schemas/FreeSpaceResp", + "description": "Total and available space on the disk partition containing the\nmetadata directory" } ] }, @@ -2867,7 +3208,8 @@ "type": "null" }, { - "$ref": "#/components/schemas/NodeRoleResp" + "$ref": "#/components/schemas/NodeAssignedRole", + "description": "Role assigned to this node in the current cluster layout" } ] } @@ -2906,67 +3248,72 @@ } } }, + { + "$ref": "#/components/schemas/NodeAssignedRole" + } + ] + }, + "NodeUpdateTrackers": { + "type": "object", + "required": [ + "ack", + "sync", + "syncAck" + ], + "properties": { + "ack": { + "type": "integer", + "format": "int64", + "minimum": 0 + }, + "sync": { + "type": "integer", + "format": "int64", + "minimum": 0 + }, + "syncAck": { + "type": "integer", + "format": "int64", + "minimum": 0 + } + } + }, + "PreviewClusterLayoutChangesResponse": { + "oneOf": [ { "type": "object", "required": [ - "zone", - "tags" + "error" ], "properties": { - "capacity": { - "type": [ - "integer", - "null" - ], - "format": "int64", - "description": "New capacity (in bytes) of the node", - "minimum": 0 - }, - "tags": { + "error": { + "type": "string", + "description": "Error message indicating that the layout could not be computed\nwith the provided configuration" + } + } + }, + { + "type": "object", + "required": [ + "message", + "newLayout" + ], + "properties": { + "message": { "type": "array", "items": { "type": "string" }, - "description": "New tags of the node" + "description": "Plain-text information about the layout computation\n(do not try to parse this)" }, - "zone": { - "type": "string", - "description": "New zone of the node" + "newLayout": { + "$ref": "#/components/schemas/GetClusterLayoutResponse", + "description": "Details about the new cluster layout" } } } ] }, - "NodeRoleResp": { - "type": "object", - "required": [ - "id", - "zone", - "tags" - ], - "properties": { - "capacity": { - "type": [ - "integer", - "null" - ], - "format": "int64", - "minimum": 0 - }, - "id": { - "type": "string" - }, - "tags": { - "type": "array", - "items": { - "type": "string" - } - }, - "zone": { - "type": "string" - } - } - }, "RemoveBucketAliasRequest": { "allOf": [ { @@ -3109,9 +3456,26 @@ } }, "UpdateClusterLayoutRequest": { - "type": "array", - "items": { - "$ref": "#/components/schemas/NodeRoleChange" + "type": "object", + "properties": { + "parameters": { + "oneOf": [ + { + "type": "null" + }, + { + "$ref": "#/components/schemas/LayoutParameters", + "description": "New layout computation parameters to use" + } + ] + }, + "roles": { + "type": "array", + "items": { + "$ref": "#/components/schemas/NodeRoleChange" + }, + "description": "New node roles to assign or remove in the cluster layout" + } } }, "UpdateClusterLayoutResponse": { @@ -3289,6 +3653,31 @@ ] } ] + }, + "ZoneRedundancy": { + "oneOf": [ + { + "type": "object", + "description": "Partitions must be replicated in at least this number of\ndistinct zones.", + "required": [ + "atLeast" + ], + "properties": { + "atLeast": { + "type": "integer", + "description": "Partitions must be replicated in at least this number of\ndistinct zones.", + "minimum": 0 + } + } + }, + { + "type": "string", + "description": "Partitions must be replicated in as many zones as possible:\nas many zones as there are replicas, if there are enough distinct\nzones, or at least one in each zone otherwise.", + "enum": [ + "maximum" + ] + } + ] } }, "securitySchemes": { diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs index 4ec62aa9..78706ce3 100644 --- a/src/api/admin/api.rs +++ b/src/api/admin/api.rs @@ -51,9 +51,12 @@ admin_endpoints![ // Layout operations GetClusterLayout, + GetClusterLayoutHistory, UpdateClusterLayout, + PreviewClusterLayoutChanges, ApplyClusterLayout, RevertClusterLayout, + ClusterLayoutSkipDeadNodes, // Access key operations ListKeys, @@ -165,40 +168,61 @@ pub struct GetClusterStatusRequest; #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct GetClusterStatusResponse { + /// Current version number of the cluster layout pub layout_version: u64, + /// List of nodes that are either currently connected, part of the + /// current cluster layout, or part of an older cluster layout that + /// is still active in the cluster (being drained). pub nodes: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, Default, ToSchema)] #[serde(rename_all = "camelCase")] pub struct NodeResp { + /// Full-length node identifier pub id: String, - pub role: Option, - #[schema(value_type = Option )] + /// Role assigned to this node in the current cluster layout + pub role: Option, + /// Socket address used by other nodes to connect to this node for RPC + #[schema(value_type = Option)] pub addr: Option, + /// Hostname of the node pub hostname: Option, + /// Whether this node is connected in the cluster pub is_up: bool, + /// For disconnected nodes, the number of seconds since last contact, + /// or `null` if no contact was established since Garage restarted. pub last_seen_secs_ago: Option, + /// Whether this node is part of an older layout version and is draining data. pub draining: bool, - #[serde(skip_serializing_if = "Option::is_none")] + /// Total and available space on the disk partition(s) containing the data + /// directory(ies) + #[serde(default, skip_serializing_if = "Option::is_none")] pub data_partition: Option, - #[serde(skip_serializing_if = "Option::is_none")] + /// Total and available space on the disk partition containing the + /// metadata directory + #[serde(default, skip_serializing_if = "Option::is_none")] pub metadata_partition: Option, } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] -pub struct NodeRoleResp { - pub id: String, +pub struct NodeAssignedRole { + /// Zone name assigned by the cluster administrator pub zone: String, - pub capacity: Option, + /// List of tags assigned by the cluster administrator pub tags: Vec, + /// Capacity (in bytes) assigned by the cluster administrator, + /// absent for gateway nodes + pub capacity: Option, } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct FreeSpaceResp { + /// Number of bytes available pub available: u64, + /// Total number of bytes pub total: u64, } @@ -270,9 +294,40 @@ pub struct GetClusterLayoutRequest; #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct GetClusterLayoutResponse { + /// The current version number of the cluster layout pub version: u64, - pub roles: Vec, + /// List of nodes that currently have a role in the cluster layout + pub roles: Vec, + /// Layout parameters used when the current layout was computed + pub parameters: LayoutParameters, + /// The size, in bytes, of one Garage partition (= a shard) + pub partition_size: u64, + /// List of nodes that will have a new role or whose role will be + /// removed in the next version of the cluster layout pub staged_role_changes: Vec, + /// Layout parameters to use when computing the next version of + /// the cluster layout + pub staged_parameters: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct LayoutNodeRole { + /// Identifier of the node + pub id: String, + /// Zone name assigned by the cluster administrator + pub zone: String, + /// List of tags assigned by the cluster administrator + pub tags: Vec, + /// Capacity (in bytes) assigned by the cluster administrator, + /// absent for gateway nodes + pub capacity: Option, + /// Number of partitions stored on this node + /// (a result of the layout computation) + pub stored_partitions: Option, + /// Capacity (in bytes) that is actually usable on this node in the current + /// layout, which is equal to `stored_partitions` × `partition_size` + pub usable_capacity: Option, } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] @@ -293,36 +348,139 @@ pub enum NodeRoleChangeEnum { remove: bool, }, #[serde(rename_all = "camelCase")] - Update { - /// New zone of the node - zone: String, - /// New capacity (in bytes) of the node - capacity: Option, - /// New tags of the node - tags: Vec, - }, + Update(NodeAssignedRole), +} + +#[derive(Copy, Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct LayoutParameters { + /// Minimum number of zones in which a data partition must be replicated + pub zone_redundancy: ZoneRedundancy, +} + +#[derive(Copy, Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub enum ZoneRedundancy { + /// Partitions must be replicated in at least this number of + /// distinct zones. + AtLeast(usize), + /// Partitions must be replicated in as many zones as possible: + /// as many zones as there are replicas, if there are enough distinct + /// zones, or at least one in each zone otherwise. + Maximum, +} + +// ---- GetClusterLayoutHistory ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetClusterLayoutHistoryRequest; + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct GetClusterLayoutHistoryResponse { + /// The current version number of the cluster layout + pub current_version: u64, + /// All nodes in the cluster are aware of layout versions up to + /// this version number (at least) + pub min_ack: u64, + /// Layout version history + pub versions: Vec, + /// Detailed update trackers for nodes (see + /// `https://garagehq.deuxfleurs.fr/blog/2023-12-preserving-read-after-write-consistency/`) + pub update_trackers: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ClusterLayoutVersion { + /// Version number of this layout version + pub version: u64, + /// Status of this layout version + pub status: ClusterLayoutVersionStatus, + /// Number of nodes with an assigned storage capacity in this layout version + pub storage_nodes: u64, + /// Number of nodes with a gateway role in this layout version + pub gateway_nodes: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub enum ClusterLayoutVersionStatus { + /// This is the most up-to-date layout version + Current, + /// This version is still active in the cluster because metadata + /// is being rebalanced or migrated from old nodes + Draining, + /// This version is no longer active in the cluster for metadata + /// reads and writes. Note that there is still the possibility + /// that data blocks are being migrated away from nodes in this + /// layout version. + Historical, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct NodeUpdateTrackers { + pub ack: u64, + pub sync: u64, + pub sync_ack: u64, } // ---- UpdateClusterLayout ---- #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -pub struct UpdateClusterLayoutRequest(pub Vec); +pub struct UpdateClusterLayoutRequest { + /// New node roles to assign or remove in the cluster layout + #[serde(default)] + pub roles: Vec, + /// New layout computation parameters to use + #[serde(default)] + pub parameters: Option, +} #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct UpdateClusterLayoutResponse(pub GetClusterLayoutResponse); +// ---- PreviewClusterLayoutChanges ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PreviewClusterLayoutChangesRequest; + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(untagged)] +pub enum PreviewClusterLayoutChangesResponse { + #[serde(rename_all = "camelCase")] + Error { + /// Error message indicating that the layout could not be computed + /// with the provided configuration + error: String, + }, + #[serde(rename_all = "camelCase")] + Success { + /// Plain-text information about the layout computation + /// (do not try to parse this) + message: Vec, + /// Details about the new cluster layout + new_layout: GetClusterLayoutResponse, + }, +} + // ---- ApplyClusterLayout ---- #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct ApplyClusterLayoutRequest { + /// As a safety measure, the new version number of the layout must + /// be specified here pub version: u64, } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] pub struct ApplyClusterLayoutResponse { + /// Plain-text information about the layout computation + /// (do not try to parse this) pub message: Vec, + /// Details about the new cluster layout pub layout: GetClusterLayoutResponse, } @@ -334,6 +492,29 @@ pub struct RevertClusterLayoutRequest; #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct RevertClusterLayoutResponse(pub GetClusterLayoutResponse); +// ---- ClusterLayoutSkipDeadNodes ---- + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ClusterLayoutSkipDeadNodesRequest { + /// Version number of the layout to assume is currently up-to-date. + /// This will generally be the current layout version. + pub version: u64, + /// Allow the skip even if a quorum of nodes could not be found for + /// the data among the remaining nodes + pub allow_missing_data: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ClusterLayoutSkipDeadNodesResponse { + /// Nodes for which the ACK update tracker has been updated to `version` + pub ack_updated: Vec, + /// If `allow_missing_data` is set, + /// nodes for which the SYNC update tracker has been updated to `version` + pub sync_updated: Vec, +} + // ********************************************** // Access key operations // ********************************************** @@ -367,7 +548,7 @@ pub struct GetKeyInfoRequest { pub struct GetKeyInfoResponse { pub name: String, pub access_key_id: String, - #[serde(skip_serializing_if = "is_default")] + #[serde(default, skip_serializing_if = "is_default")] pub secret_access_key: Option, pub permissions: KeyPerm, pub buckets: Vec, diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 13946e2b..f41766b9 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use garage_util::crdt::*; use garage_util::data::*; +use garage_util::error::Error as GarageError; use garage_rpc::layout; @@ -54,8 +55,7 @@ impl RequestHandler for GetClusterStatusRequest { for (id, _, role) in layout.current().roles.items().iter() { if let layout::NodeRoleV(Some(r)) = role { - let role = NodeRoleResp { - id: hex::encode(id), + let role = NodeAssignedRole { zone: r.zone.to_string(), capacity: r.capacity, tags: r.tags.clone(), @@ -181,17 +181,23 @@ impl RequestHandler for GetClusterLayoutRequest { } fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse { - let roles = layout - .current() + let current = layout.current(); + + let roles = current .roles .items() .iter() .filter_map(|(k, _, v)| v.0.clone().map(|x| (k, x))) - .map(|(k, v)| NodeRoleResp { - id: hex::encode(k), - zone: v.zone.clone(), - capacity: v.capacity, - tags: v.tags.clone(), + .map(|(k, v)| { + let stored_partitions = current.get_node_usage(k).ok().map(|x| x as u64); + LayoutNodeRole { + id: hex::encode(k), + zone: v.zone.clone(), + capacity: v.capacity, + stored_partitions, + usable_capacity: stored_partitions.map(|x| x * current.partition_size), + tags: v.tags.clone(), + } }) .collect::>(); @@ -201,7 +207,7 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp .roles .items() .iter() - .filter(|(k, _, v)| layout.current().roles.get(k) != Some(v)) + .filter(|(k, _, v)| current.roles.get(k) != Some(v)) .map(|(k, _, v)| match &v.0 { None => NodeRoleChange { id: hex::encode(k), @@ -209,19 +215,108 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp }, Some(r) => NodeRoleChange { id: hex::encode(k), - action: NodeRoleChangeEnum::Update { + action: NodeRoleChangeEnum::Update(NodeAssignedRole { zone: r.zone.clone(), capacity: r.capacity, tags: r.tags.clone(), - }, + }), }, }) .collect::>(); + let staged_parameters = if *layout.staging.get().parameters.get() != current.parameters { + Some((*layout.staging.get().parameters.get()).into()) + } else { + None + }; + GetClusterLayoutResponse { - version: layout.current().version, + version: current.version, roles, + partition_size: current.partition_size, + parameters: current.parameters.into(), staged_role_changes, + staged_parameters, + } +} + +impl RequestHandler for GetClusterLayoutHistoryRequest { + type Response = GetClusterLayoutHistoryResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let layout_helper = garage.system.cluster_layout(); + let layout = layout_helper.inner(); + let min_stored = layout.min_stored(); + + let versions = layout + .versions + .iter() + .rev() + .chain(layout.old_versions.iter().rev()) + .map(|ver| { + let status = if ver.version == layout.current().version { + ClusterLayoutVersionStatus::Current + } else if ver.version >= min_stored { + ClusterLayoutVersionStatus::Draining + } else { + ClusterLayoutVersionStatus::Historical + }; + ClusterLayoutVersion { + version: ver.version, + status, + storage_nodes: ver + .roles + .items() + .iter() + .filter( + |(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_some()), + ) + .count() as u64, + gateway_nodes: ver + .roles + .items() + .iter() + .filter( + |(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_none()), + ) + .count() as u64, + } + }) + .collect::>(); + + let all_nodes = layout.get_all_nodes(); + let min_ack = layout_helper.ack_map_min(); + + let update_trackers = if layout.versions.len() > 1 { + Some( + all_nodes + .iter() + .map(|node| { + ( + hex::encode(&node), + NodeUpdateTrackers { + ack: layout.update_trackers.ack_map.get(node, min_stored), + sync: layout.update_trackers.sync_map.get(node, min_stored), + sync_ack: layout.update_trackers.sync_ack_map.get(node, min_stored), + }, + ) + }) + .collect(), + ) + } else { + None + }; + + Ok(GetClusterLayoutHistoryResponse { + current_version: layout.current().version, + min_ack, + versions, + update_trackers, + }) } } @@ -242,21 +337,26 @@ impl RequestHandler for UpdateClusterLayoutRequest { let mut roles = layout.current().roles.clone(); roles.merge(&layout.staging.get().roles); - for change in self.0 { + for change in self.roles { let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?; let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?; let new_role = match change.action { NodeRoleChangeEnum::Remove { remove: true } => None, - NodeRoleChangeEnum::Update { + NodeRoleChangeEnum::Update(NodeAssignedRole { zone, capacity, tags, - } => Some(layout::NodeRole { - zone, - capacity, - tags, - }), + }) => { + if matches!(capacity, Some(cap) if cap < 1024) { + return Err(Error::bad_request("Capacity should be at least 1K (1024)")); + } + Some(layout::NodeRole { + zone, + capacity, + tags, + }) + } _ => return Err(Error::bad_request("Invalid layout change")), }; @@ -267,6 +367,22 @@ impl RequestHandler for UpdateClusterLayoutRequest { .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role))); } + if let Some(param) = self.parameters { + if let ZoneRedundancy::AtLeast(r_int) = param.zone_redundancy { + if r_int > layout.current().replication_factor { + return Err(Error::bad_request(format!( + "The zone redundancy must be smaller or equal to the replication factor ({}).", + layout.current().replication_factor + ))); + } else if r_int < 1 { + return Err(Error::bad_request( + "The zone redundancy must be at least 1.", + )); + } + } + layout.staging.get_mut().parameters.update(param.into()); + } + garage .system .layout_manager @@ -278,6 +394,29 @@ impl RequestHandler for UpdateClusterLayoutRequest { } } +impl RequestHandler for PreviewClusterLayoutChangesRequest { + type Response = PreviewClusterLayoutChangesResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let layout = garage.system.cluster_layout().inner().clone(); + let new_ver = layout.current().version + 1; + match layout.apply_staged_changes(new_ver) { + Err(GarageError::Message(error)) => { + Ok(PreviewClusterLayoutChangesResponse::Error { error }) + } + Err(e) => Err(e.into()), + Ok((new_layout, msg)) => Ok(PreviewClusterLayoutChangesResponse::Success { + message: msg, + new_layout: format_cluster_layout(&new_layout), + }), + } + } +} + impl RequestHandler for ApplyClusterLayoutRequest { type Response = ApplyClusterLayoutResponse; @@ -287,7 +426,7 @@ impl RequestHandler for ApplyClusterLayoutRequest { _admin: &Admin, ) -> Result { let layout = garage.system.cluster_layout().inner().clone(); - let (layout, msg) = layout.apply_staged_changes(Some(self.version))?; + let (layout, msg) = layout.apply_staged_changes(self.version)?; garage .system @@ -322,3 +461,100 @@ impl RequestHandler for RevertClusterLayoutRequest { Ok(RevertClusterLayoutResponse(res)) } } + +impl RequestHandler for ClusterLayoutSkipDeadNodesRequest { + type Response = ClusterLayoutSkipDeadNodesResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let status = garage.system.get_known_nodes(); + + let mut layout = garage.system.cluster_layout().inner().clone(); + let mut ack_updated = vec![]; + let mut sync_updated = vec![]; + + if layout.versions.len() == 1 { + return Err(Error::bad_request( + "This command cannot be called when there is only one live cluster layout version", + )); + } + + let min_v = layout.min_stored(); + if self.version <= min_v || self.version > layout.current().version { + return Err(Error::bad_request(format!( + "Invalid version, you may use the following version numbers: {}", + (min_v + 1..=layout.current().version) + .map(|x| x.to_string()) + .collect::>() + .join(" ") + ))); + } + + let all_nodes = layout.get_all_nodes(); + for node in all_nodes.iter() { + // Update ACK tracker for dead nodes or for all nodes if --allow-missing-data + if self.allow_missing_data || !status.iter().any(|x| x.id == *node && x.is_up) { + if layout.update_trackers.ack_map.set_max(*node, self.version) { + ack_updated.push(hex::encode(node)); + } + } + + // If --allow-missing-data, update SYNC tracker for all nodes. + if self.allow_missing_data { + if layout.update_trackers.sync_map.set_max(*node, self.version) { + sync_updated.push(hex::encode(node)); + } + } + } + + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; + + Ok(ClusterLayoutSkipDeadNodesResponse { + ack_updated, + sync_updated, + }) + } +} + +// ---- + +impl From for ZoneRedundancy { + fn from(x: layout::ZoneRedundancy) -> Self { + match x { + layout::ZoneRedundancy::Maximum => ZoneRedundancy::Maximum, + layout::ZoneRedundancy::AtLeast(x) => ZoneRedundancy::AtLeast(x), + } + } +} + +impl Into for ZoneRedundancy { + fn into(self) -> layout::ZoneRedundancy { + match self { + ZoneRedundancy::Maximum => layout::ZoneRedundancy::Maximum, + ZoneRedundancy::AtLeast(x) => layout::ZoneRedundancy::AtLeast(x), + } + } +} + +impl From for LayoutParameters { + fn from(x: layout::LayoutParameters) -> Self { + LayoutParameters { + zone_redundancy: x.zone_redundancy.into(), + } + } +} + +impl Into for LayoutParameters { + fn into(self) -> layout::LayoutParameters { + layout::LayoutParameters { + zone_redundancy: self.zone_redundancy.into(), + } + } +} diff --git a/src/api/admin/openapi.rs b/src/api/admin/openapi.rs index 0e48bf54..01a694e5 100644 --- a/src/api/admin/openapi.rs +++ b/src/api/admin/openapi.rs @@ -88,6 +88,19 @@ Returns the cluster's current layout, including: )] fn GetClusterLayout() -> () {} +#[utoipa::path(get, + path = "/v2/GetClusterLayoutHistory", + tag = "Cluster layout", + description = " +Returns the history of layouts in the cluster + ", + responses( + (status = 200, description = "Cluster layout history", body = GetClusterLayoutHistoryResponse), + (status = 500, description = "Internal server error") + ), +)] +fn GetClusterLayoutHistory() -> () {} + #[utoipa::path(post, path = "/v2/UpdateClusterLayout", tag = "Cluster layout", @@ -117,6 +130,21 @@ Contrary to the CLI that may update only a subset of the fields capacity, zone a )] fn UpdateClusterLayout() -> () {} +#[utoipa::path(post, + path = "/v2/PreviewClusterLayoutChanges", + tag = "Cluster layout", + description = " +Computes a new layout taking into account the staged parameters, and returns it with detailed statistics. The new layout is not applied in the cluster. + +*Note: do not try to parse the `message` field of the response, it is given as an array of string specifically because its format is not stable.* + ", + responses( + (status = 200, description = "Information about the new layout", body = PreviewClusterLayoutChangesResponse), + (status = 500, description = "Internal server error") + ), +)] +fn PreviewClusterLayoutChanges() -> () {} + #[utoipa::path(post, path = "/v2/ApplyClusterLayout", tag = "Cluster layout", @@ -144,6 +172,18 @@ fn ApplyClusterLayout() -> () {} )] fn RevertClusterLayout() -> () {} +#[utoipa::path(post, + path = "/v2/ClusterLayoutSkipDeadNodes", + tag = "Cluster layout", + description = "Force progress in layout update trackers", + request_body = ClusterLayoutSkipDeadNodesRequest, + responses( + (status = 200, description = "Request has been taken into account", body = ClusterLayoutSkipDeadNodesResponse), + (status = 500, description = "Internal server error") + ), +)] +fn ClusterLayoutSkipDeadNodes() -> () {} + // ********************************************** // Access key operations // ********************************************** @@ -685,9 +725,12 @@ impl Modify for SecurityAddon { ConnectClusterNodes, // Layout operations GetClusterLayout, + GetClusterLayoutHistory, UpdateClusterLayout, + PreviewClusterLayoutChanges, ApplyClusterLayout, RevertClusterLayout, + ClusterLayoutSkipDeadNodes, // Key operations ListKeys, GetKeyInfo, diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs index 2c2067dc..9f6106e5 100644 --- a/src/api/admin/router_v2.rs +++ b/src/api/admin/router_v2.rs @@ -36,9 +36,12 @@ impl AdminApiRequest { POST ConnectClusterNodes (body), // Layout endpoints GET GetClusterLayout (), + GET GetClusterLayoutHistory (), POST UpdateClusterLayout (body), + POST PreviewClusterLayoutChanges (), POST ApplyClusterLayout (body), POST RevertClusterLayout (), + POST ClusterLayoutSkipDeadNodes (body), // API key endpoints GET GetKeyInfo (query_opt::id, query_opt::search, parse_default(false)::show_secret_key), POST UpdateKey (body_field, query::id), @@ -108,10 +111,7 @@ impl AdminApiRequest { Endpoint::GetClusterLayout => { Ok(AdminApiRequest::GetClusterLayout(GetClusterLayoutRequest)) } - Endpoint::UpdateClusterLayout => { - let updates = parse_json_body::(req).await?; - Ok(AdminApiRequest::UpdateClusterLayout(updates)) - } + // UpdateClusterLayout semantics changed Endpoint::ApplyClusterLayout => { let param = parse_json_body::(req).await?; Ok(AdminApiRequest::ApplyClusterLayout(param)) diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs deleted file mode 100644 index 3bbc2b86..00000000 --- a/src/garage/admin/mod.rs +++ /dev/null @@ -1,540 +0,0 @@ -mod block; -mod bucket; -mod key; - -use std::collections::HashMap; -use std::fmt::Write; -use std::future::Future; -use std::sync::Arc; - -use futures::future::FutureExt; - -use serde::{Deserialize, Serialize}; - -use format_table::format_table_to_string; - -use garage_util::background::BackgroundRunner; -use garage_util::data::*; -use garage_util::error::Error as GarageError; - -use garage_table::replication::*; -use garage_table::*; - -use garage_rpc::layout::PARTITION_BITS; -use garage_rpc::*; - -use garage_block::manager::BlockResyncErrorInfo; - -use garage_model::bucket_table::*; -use garage_model::garage::Garage; -use garage_model::helper::error::{Error, OkOrBadRequest}; -use garage_model::key_table::*; -use garage_model::s3::mpu_table::MultipartUpload; -use garage_model::s3::version_table::Version; - -use crate::cli::*; -use crate::repair::online::launch_online_repair; - -pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; - -#[derive(Debug, Serialize, Deserialize)] -#[allow(clippy::large_enum_variant)] -pub enum AdminRpc { - BucketOperation(BucketOperation), - KeyOperation(KeyOperation), - LaunchRepair(RepairOpt), - Stats(StatsOpt), - Worker(WorkerOperation), - BlockOperation(BlockOperation), - MetaOperation(MetaOperation), - - // Replies - Ok(String), - BucketList(Vec), - BucketInfo { - bucket: Bucket, - relevant_keys: HashMap, - counters: HashMap, - mpu_counters: HashMap, - }, - KeyList(Vec<(String, String)>), - KeyInfo(Key, HashMap), - WorkerList( - HashMap, - WorkerListOpt, - ), - WorkerVars(Vec<(Uuid, String, String)>), - WorkerInfo(usize, garage_util::background::WorkerInfo), - BlockErrorList(Vec), - BlockInfo { - hash: Hash, - refcount: u64, - versions: Vec>, - uploads: Vec, - }, -} - -impl Rpc for AdminRpc { - type Response = Result; -} - -pub struct AdminRpcHandler { - garage: Arc, - background: Arc, - endpoint: Arc>, -} - -impl AdminRpcHandler { - pub fn new(garage: Arc, background: Arc) -> Arc { - let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); - let admin = Arc::new(Self { - garage, - background, - endpoint, - }); - admin.endpoint.set_handler(admin.clone()); - admin - } - - // ================ REPAIR COMMANDS ==================== - - async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { - if !opt.yes { - return Err(Error::BadRequest( - "Please provide the --yes flag to initiate repair operations.".to_string(), - )); - } - if opt.all_nodes { - let mut opt_to_send = opt.clone(); - opt_to_send.all_nodes = false; - - let mut failures = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - let resp = self - .endpoint - .call( - &node, - AdminRpc::LaunchRepair(opt_to_send.clone()), - PRIO_NORMAL, - ) - .await; - if !matches!(resp, Ok(Ok(_))) { - failures.push(node); - } - } - if failures.is_empty() { - Ok(AdminRpc::Ok("Repair launched on all nodes".to_string())) - } else { - Err(Error::BadRequest(format!( - "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", - failures - ))) - } - } else { - launch_online_repair(&self.garage, &self.background, opt).await?; - Ok(AdminRpc::Ok(format!( - "Repair launched on {:?}", - self.garage.system.id - ))) - } - } - - // ================ STATS COMMANDS ==================== - - async fn handle_stats(&self, opt: StatsOpt) -> Result { - if opt.all_nodes { - let mut ret = String::new(); - let mut all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in self.garage.system.get_known_nodes().iter() { - if node.is_up && !all_nodes.contains(&node.id) { - all_nodes.push(node.id); - } - } - - for node in all_nodes.iter() { - let mut opt = opt.clone(); - opt.all_nodes = false; - opt.skip_global = true; - - writeln!(&mut ret, "\n======================").unwrap(); - writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); - - let node_id = (*node).into(); - match self - .endpoint - .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL) - .await - { - Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(), - Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), - Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(), - Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(), - } - } - - writeln!(&mut ret, "\n======================").unwrap(); - write!( - &mut ret, - "Cluster statistics:\n\n{}", - self.gather_cluster_stats() - ) - .unwrap(); - - Ok(AdminRpc::Ok(ret)) - } else { - Ok(AdminRpc::Ok(self.gather_stats_local(opt)?)) - } - } - - fn gather_stats_local(&self, opt: StatsOpt) -> Result { - let mut ret = String::new(); - writeln!( - &mut ret, - "\nGarage version: {} [features: {}]\nRust compiler version: {}", - garage_util::version::garage_version(), - garage_util::version::garage_features() - .map(|list| list.join(", ")) - .unwrap_or_else(|| "(unknown)".into()), - garage_util::version::rust_version(), - ) - .unwrap(); - - writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); - - // Gather table statistics - let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; - table.push(self.gather_table_stats(&self.garage.bucket_table)?); - table.push(self.gather_table_stats(&self.garage.key_table)?); - table.push(self.gather_table_stats(&self.garage.object_table)?); - table.push(self.gather_table_stats(&self.garage.version_table)?); - table.push(self.gather_table_stats(&self.garage.block_ref_table)?); - write!( - &mut ret, - "\nTable stats:\n{}", - format_table_to_string(table) - ) - .unwrap(); - - // Gather block manager statistics - writeln!(&mut ret, "\nBlock manager stats:").unwrap(); - let rc_len = self.garage.block_manager.rc_len()?.to_string(); - - writeln!( - &mut ret, - " number of RC entries (~= number of blocks): {}", - rc_len - ) - .unwrap(); - writeln!( - &mut ret, - " resync queue length: {}", - self.garage.block_manager.resync.queue_len()? - ) - .unwrap(); - writeln!( - &mut ret, - " blocks with resync errors: {}", - self.garage.block_manager.resync.errors_len()? - ) - .unwrap(); - - if !opt.skip_global { - write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap(); - } - - Ok(ret) - } - - fn gather_cluster_stats(&self) -> String { - let mut ret = String::new(); - - // Gather storage node and free space statistics for current nodes - let layout = &self.garage.system.cluster_layout(); - let mut node_partition_count = HashMap::::new(); - for short_id in layout.current().ring_assignment_data.iter() { - let id = layout.current().node_id_vec[*short_id as usize]; - *node_partition_count.entry(id).or_default() += 1; - } - let node_info = self - .garage - .system - .get_known_nodes() - .into_iter() - .map(|n| (n.id, n)) - .collect::>(); - - let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; - for (id, parts) in node_partition_count.iter() { - let info = node_info.get(id); - let status = info.map(|x| &x.status); - let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref()); - let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?"); - let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); - let capacity = role - .map(|x| x.capacity_string()) - .unwrap_or_else(|| "?".into()); - let avail_str = |x| match x { - Some((avail, total)) => { - let pct = (avail as f64) / (total as f64) * 100.; - let avail = bytesize::ByteSize::b(avail); - let total = bytesize::ByteSize::b(total); - format!("{}/{} ({:.1}%)", avail, total, pct) - } - None => "?".into(), - }; - let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); - let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); - table.push(format!( - " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", - id, hostname, zone, capacity, parts, data_avail, meta_avail - )); - } - write!( - &mut ret, - "Storage nodes:\n{}", - format_table_to_string(table) - ) - .unwrap(); - - let meta_part_avail = node_partition_count - .iter() - .filter_map(|(id, parts)| { - node_info - .get(id) - .and_then(|x| x.status.meta_disk_avail) - .map(|c| c.0 / *parts) - }) - .collect::>(); - let data_part_avail = node_partition_count - .iter() - .filter_map(|(id, parts)| { - node_info - .get(id) - .and_then(|x| x.status.data_disk_avail) - .map(|c| c.0 / *parts) - }) - .collect::>(); - if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { - let meta_avail = - bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); - let data_avail = - bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); - writeln!( - &mut ret, - "\nEstimated available storage space cluster-wide (might be lower in practice):" - ) - .unwrap(); - if meta_part_avail.len() < node_partition_count.len() - || data_part_avail.len() < node_partition_count.len() - { - writeln!(&mut ret, " data: < {}", data_avail).unwrap(); - writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); - writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); - } else { - writeln!(&mut ret, " data: {}", data_avail).unwrap(); - writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); - } - } - - ret - } - - fn gather_table_stats(&self, t: &Arc>) -> Result - where - F: TableSchema + 'static, - R: TableReplication + 'static, - { - let data_len = t.data.store.len().map_err(GarageError::from)?.to_string(); - let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string(); - - Ok(format!( - " {}\t{}\t{}\t{}\t{}", - F::TABLE_NAME, - data_len, - mkl_len, - t.merkle_updater.todo_len()?, - t.data.gc_todo_len()? - )) - } - - // ================ WORKER COMMANDS ==================== - - async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result { - match cmd { - WorkerOperation::List { opt } => { - let workers = self.background.get_worker_info(); - Ok(AdminRpc::WorkerList(workers, *opt)) - } - WorkerOperation::Info { tid } => { - let info = self - .background - .get_worker_info() - .get(tid) - .ok_or_bad_request(format!("No worker with TID {}", tid))? - .clone(); - Ok(AdminRpc::WorkerInfo(*tid, info)) - } - WorkerOperation::Get { - all_nodes, - variable, - } => self.handle_get_var(*all_nodes, variable).await, - WorkerOperation::Set { - all_nodes, - variable, - value, - } => self.handle_set_var(*all_nodes, variable, value).await, - } - } - - async fn handle_get_var( - &self, - all_nodes: bool, - variable: &Option, - ) -> Result { - if all_nodes { - let mut ret = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Get { - all_nodes: false, - variable: variable.clone(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - #[allow(clippy::collapsible_else_if)] - if let Some(v) = variable { - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - v.clone(), - self.garage.bg_vars.get(v)?, - )])) - } else { - let mut vars = self.garage.bg_vars.get_all(); - vars.sort(); - Ok(AdminRpc::WorkerVars( - vars.into_iter() - .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) - .collect(), - )) - } - } - } - - async fn handle_set_var( - &self, - all_nodes: bool, - variable: &str, - value: &str, - ) -> Result { - if all_nodes { - let mut ret = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Set { - all_nodes: false, - variable: variable.to_string(), - value: value.to_string(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - self.garage.bg_vars.set(variable, value)?; - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - variable.to_string(), - value.to_string(), - )])) - } - } - - // ================ META DB COMMANDS ==================== - - async fn handle_meta_cmd(self: &Arc, mo: &MetaOperation) -> Result { - match mo { - MetaOperation::Snapshot { all: true } => { - let to = self.garage.system.cluster_layout().all_nodes().to_vec(); - - let resps = futures::future::join_all(to.iter().map(|to| async move { - let to = (*to).into(); - self.endpoint - .call( - &to, - AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }), - PRIO_NORMAL, - ) - .await? - })) - .await; - - let mut ret = vec![]; - for (to, resp) in to.iter().zip(resps.iter()) { - let res_str = match resp { - Ok(_) => "ok".to_string(), - Err(e) => format!("error: {}", e), - }; - ret.push(format!("{:?}\t{}", to, res_str)); - } - - if resps.iter().any(Result::is_err) { - Err(GarageError::Message(format_table_to_string(ret)).into()) - } else { - Ok(AdminRpc::Ok(format_table_to_string(ret))) - } - } - MetaOperation::Snapshot { all: false } => { - garage_model::snapshot::async_snapshot_metadata(&self.garage).await?; - Ok(AdminRpc::Ok("Snapshot has been saved.".into())) - } - } - } -} - -impl EndpointHandler for AdminRpcHandler { - fn handle( - self: &Arc, - message: &AdminRpc, - _from: NodeID, - ) -> impl Future> + Send { - let self2 = self.clone(); - async move { - match message { - AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, - AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await, - AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt.clone()).await, - AdminRpc::Stats(opt) => self2.handle_stats(opt.clone()).await, - AdminRpc::Worker(wo) => self2.handle_worker_cmd(wo).await, - AdminRpc::BlockOperation(bo) => self2.handle_block_cmd(bo).await, - AdminRpc::MetaOperation(mo) => self2.handle_meta_cmd(mo).await, - m => Err(GarageError::unexpected_rpc_message(m).into()), - } - } - .boxed() - } -} diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs deleted file mode 100644 index bb77cc2a..00000000 --- a/src/garage/cli/layout.rs +++ /dev/null @@ -1,387 +0,0 @@ -use bytesize::ByteSize; - -use format_table::format_table; -use garage_util::error::*; - -use garage_rpc::layout::*; -use garage_rpc::system::*; -use garage_rpc::*; - -use crate::cli::structs::*; - -pub async fn cmd_show_layout( - rpc_cli: &Endpoint, - rpc_host: NodeID, -) -> Result<(), Error> { - let layout = fetch_layout(rpc_cli, rpc_host).await?; - - println!("==== CURRENT CLUSTER LAYOUT ===="); - print_cluster_layout(layout.current(), "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes."); - println!(); - println!( - "Current cluster layout version: {}", - layout.current().version - ); - - let has_role_changes = print_staging_role_changes(&layout); - if has_role_changes { - let v = layout.current().version; - let res_apply = layout.apply_staged_changes(Some(v + 1)); - - // this will print the stats of what partitions - // will move around when we apply - match res_apply { - Ok((layout, msg)) => { - println!(); - println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ===="); - print_cluster_layout(layout.current(), "No nodes have a role in the new layout."); - println!(); - - for line in msg.iter() { - println!("{}", line); - } - println!("To enact the staged role changes, type:"); - println!(); - println!(" garage layout apply --version {}", v + 1); - println!(); - println!("You can also revert all proposed changes with: garage layout revert"); - } - Err(e) => { - println!("Error while trying to compute the assignment: {}", e); - println!("This new layout cannot yet be applied."); - println!("You can also revert all proposed changes with: garage layout revert"); - } - } - } - - Ok(()) -} - -pub async fn cmd_config_layout( - rpc_cli: &Endpoint, - rpc_host: NodeID, - config_opt: ConfigLayoutOpt, -) -> Result<(), Error> { - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - let mut did_something = false; - match config_opt.redundancy { - None => (), - Some(r_str) => { - let r = r_str - .parse::() - .ok_or_message("invalid zone redundancy value")?; - if let ZoneRedundancy::AtLeast(r_int) = r { - if r_int > layout.current().replication_factor { - return Err(Error::Message(format!( - "The zone redundancy must be smaller or equal to the \ - replication factor ({}).", - layout.current().replication_factor - ))); - } else if r_int < 1 { - return Err(Error::Message( - "The zone redundancy must be at least 1.".into(), - )); - } - } - - layout - .staging - .get_mut() - .parameters - .update(LayoutParameters { zone_redundancy: r }); - println!("The zone redundancy parameter has been set to '{}'.", r); - did_something = true; - } - } - - if !did_something { - return Err(Error::Message( - "Please specify an action for `garage layout config`".into(), - )); - } - - send_layout(rpc_cli, rpc_host, layout).await?; - Ok(()) -} - -pub async fn cmd_layout_history( - rpc_cli: &Endpoint, - rpc_host: NodeID, -) -> Result<(), Error> { - let layout = fetch_layout(rpc_cli, rpc_host).await?; - let min_stored = layout.min_stored(); - - println!("==== LAYOUT HISTORY ===="); - let mut table = vec!["Version\tStatus\tStorage nodes\tGateway nodes".to_string()]; - for ver in layout - .versions - .iter() - .rev() - .chain(layout.old_versions.iter().rev()) - { - let status = if ver.version == layout.current().version { - "current" - } else if ver.version >= min_stored { - "draining" - } else { - "historical" - }; - table.push(format!( - "#{}\t{}\t{}\t{}", - ver.version, - status, - ver.roles - .items() - .iter() - .filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_some())) - .count(), - ver.roles - .items() - .iter() - .filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_none())) - .count(), - )); - } - format_table(table); - println!(); - - if layout.versions.len() > 1 { - println!("==== UPDATE TRACKERS ===="); - println!("Several layout versions are currently live in the cluster, and data is being migrated."); - println!( - "This is the internal data that Garage stores to know which nodes have what data." - ); - println!(); - let mut table = vec!["Node\tAck\tSync\tSync_ack".to_string()]; - let all_nodes = layout.get_all_nodes(); - for node in all_nodes.iter() { - table.push(format!( - "{:?}\t#{}\t#{}\t#{}", - node, - layout.update_trackers.ack_map.get(node, min_stored), - layout.update_trackers.sync_map.get(node, min_stored), - layout.update_trackers.sync_ack_map.get(node, min_stored), - )); - } - table[1..].sort(); - format_table(table); - - let min_ack = layout - .update_trackers - .ack_map - .min_among(&all_nodes, layout.min_stored()); - - println!(); - println!( - "If some nodes are not catching up to the latest layout version in the update trackers," - ); - println!("it might be because they are offline or unable to complete a sync successfully."); - if min_ack < layout.current().version { - println!( - "You may force progress using `garage layout skip-dead-nodes --version {}`", - layout.current().version - ); - } else { - println!( - "You may force progress using `garage layout skip-dead-nodes --version {} --allow-missing-data`.", - layout.current().version - ); - } - } else { - println!("Your cluster is currently in a stable state with a single live layout version."); - println!("No metadata migration is in progress. Note that the migration of data blocks is not tracked,"); - println!( - "so you might want to keep old nodes online until their data directories become empty." - ); - } - - Ok(()) -} - -pub async fn cmd_layout_skip_dead_nodes( - rpc_cli: &Endpoint, - rpc_host: NodeID, - opt: SkipDeadNodesOpt, -) -> Result<(), Error> { - let status = fetch_status(rpc_cli, rpc_host).await?; - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - - if layout.versions.len() == 1 { - return Err(Error::Message( - "This command cannot be called when there is only one live cluster layout version" - .into(), - )); - } - - let min_v = layout.min_stored(); - if opt.version <= min_v || opt.version > layout.current().version { - return Err(Error::Message(format!( - "Invalid version, you may use the following version numbers: {}", - (min_v + 1..=layout.current().version) - .map(|x| x.to_string()) - .collect::>() - .join(" ") - ))); - } - - let all_nodes = layout.get_all_nodes(); - let mut did_something = false; - for node in all_nodes.iter() { - // Update ACK tracker for dead nodes or for all nodes if --allow-missing-data - if opt.allow_missing_data || !status.iter().any(|x| x.id == *node && x.is_up) { - if layout.update_trackers.ack_map.set_max(*node, opt.version) { - println!("Increased the ACK tracker for node {:?}", node); - did_something = true; - } - } - - // If --allow-missing-data, update SYNC tracker for all nodes. - if opt.allow_missing_data { - if layout.update_trackers.sync_map.set_max(*node, opt.version) { - println!("Increased the SYNC tracker for node {:?}", node); - did_something = true; - } - } - } - - if did_something { - send_layout(rpc_cli, rpc_host, layout).await?; - println!("Success."); - Ok(()) - } else if !opt.allow_missing_data { - Err(Error::Message("Nothing was done, try passing the `--allow-missing-data` flag to force progress even when not enough nodes can complete a metadata sync.".into())) - } else { - Err(Error::Message( - "Sorry, there is nothing I can do for you. Please wait patiently. If you ask for help, please send the output of the `garage layout history` command.".into(), - )) - } -} - -// --- utility --- - -pub async fn fetch_status( - rpc_cli: &Endpoint, - rpc_host: NodeID, -) -> Result, Error> { - match rpc_cli - .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await?? - { - SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes), - resp => Err(Error::unexpected_rpc_message(resp)), - } -} - -pub async fn fetch_layout( - rpc_cli: &Endpoint, - rpc_host: NodeID, -) -> Result { - match rpc_cli - .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL) - .await?? - { - SystemRpc::AdvertiseClusterLayout(t) => Ok(t), - resp => Err(Error::unexpected_rpc_message(resp)), - } -} - -pub async fn send_layout( - rpc_cli: &Endpoint, - rpc_host: NodeID, - layout: LayoutHistory, -) -> Result<(), Error> { - rpc_cli - .call( - &rpc_host, - SystemRpc::AdvertiseClusterLayout(layout), - PRIO_NORMAL, - ) - .await??; - Ok(()) -} - -pub fn print_cluster_layout(layout: &LayoutVersion, empty_msg: &str) { - let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()]; - for (id, _, role) in layout.roles.items().iter() { - let role = match &role.0 { - Some(r) => r, - _ => continue, - }; - let tags = role.tags.join(","); - let usage = layout.get_node_usage(id).unwrap_or(0); - let capacity = layout.get_node_capacity(id).unwrap_or(0); - if capacity > 0 { - table.push(format!( - "{:?}\t{}\t{}\t{}\t{} ({:.1}%)", - id, - tags, - role.zone, - role.capacity_string(), - ByteSize::b(usage as u64 * layout.partition_size).to_string_as(false), - (100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32) - )); - } else { - table.push(format!( - "{:?}\t{}\t{}\t{}", - id, - tags, - role.zone, - role.capacity_string() - )); - }; - } - if table.len() > 1 { - format_table(table); - println!(); - println!("Zone redundancy: {}", layout.parameters.zone_redundancy); - } else { - println!("{}", empty_msg); - } -} - -pub fn print_staging_role_changes(layout: &LayoutHistory) -> bool { - let staging = layout.staging.get(); - let has_role_changes = staging - .roles - .items() - .iter() - .any(|(k, _, v)| layout.current().roles.get(k) != Some(v)); - let has_layout_changes = *staging.parameters.get() != layout.current().parameters; - - if has_role_changes || has_layout_changes { - println!(); - println!("==== STAGED ROLE CHANGES ===="); - if has_role_changes { - let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; - for (id, _, role) in staging.roles.items().iter() { - if layout.current().roles.get(id) == Some(role) { - continue; - } - if let Some(role) = &role.0 { - let tags = role.tags.join(","); - table.push(format!( - "{:?}\t{}\t{}\t{}", - id, - tags, - role.zone, - role.capacity_string() - )); - } else { - table.push(format!("{:?}\tREMOVED", id)); - } - } - format_table(table); - println!(); - } - if has_layout_changes { - println!( - "Zone redundancy: {}", - staging.parameters.get().zone_redundancy - ); - } - true - } else { - false - } -} diff --git a/src/garage/cli/convert_db.rs b/src/garage/cli/local/convert_db.rs similarity index 100% rename from src/garage/cli/convert_db.rs rename to src/garage/cli/local/convert_db.rs diff --git a/src/garage/cli/init.rs b/src/garage/cli/local/init.rs similarity index 100% rename from src/garage/cli/init.rs rename to src/garage/cli/local/init.rs diff --git a/src/garage/cli/local/mod.rs b/src/garage/cli/local/mod.rs new file mode 100644 index 00000000..476010b8 --- /dev/null +++ b/src/garage/cli/local/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod convert_db; +pub(crate) mod init; +pub(crate) mod repair; diff --git a/src/garage/cli/repair.rs b/src/garage/cli/local/repair.rs similarity index 100% rename from src/garage/cli/repair.rs rename to src/garage/cli/local/repair.rs diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs index e007808b..60e9a5de 100644 --- a/src/garage/cli/mod.rs +++ b/src/garage/cli/mod.rs @@ -1,7 +1,4 @@ -pub(crate) mod structs; +pub mod structs; -pub(crate) mod convert_db; -pub(crate) mod init; -pub(crate) mod repair; - -pub(crate) mod layout; +pub mod local; +pub mod remote; diff --git a/src/garage/cli_v2/block.rs b/src/garage/cli/remote/block.rs similarity index 99% rename from src/garage/cli_v2/block.rs rename to src/garage/cli/remote/block.rs index bfc0db4a..933dcbdb 100644 --- a/src/garage/cli_v2/block.rs +++ b/src/garage/cli/remote/block.rs @@ -5,8 +5,8 @@ use garage_util::error::*; use garage_api_admin::api::*; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::*; impl Cli { pub async fn cmd_block(&self, cmd: BlockOperation) -> Result<(), Error> { diff --git a/src/garage/cli_v2/bucket.rs b/src/garage/cli/remote/bucket.rs similarity index 99% rename from src/garage/cli_v2/bucket.rs rename to src/garage/cli/remote/bucket.rs index c25c2c3e..9adcdbe5 100644 --- a/src/garage/cli_v2/bucket.rs +++ b/src/garage/cli/remote/bucket.rs @@ -5,8 +5,8 @@ use garage_util::error::*; use garage_api_admin::api::*; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::*; impl Cli { pub async fn cmd_bucket(&self, cmd: BucketOperation) -> Result<(), Error> { diff --git a/src/garage/cli_v2/cluster.rs b/src/garage/cli/remote/cluster.rs similarity index 98% rename from src/garage/cli_v2/cluster.rs rename to src/garage/cli/remote/cluster.rs index 6eb65d12..9639df8b 100644 --- a/src/garage/cli_v2/cluster.rs +++ b/src/garage/cli/remote/cluster.rs @@ -4,9 +4,9 @@ use garage_util::error::*; use garage_api_admin::api::*; +use crate::cli::remote::layout::*; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::layout::*; -use crate::cli_v2::*; impl Cli { pub async fn cmd_status(&self) -> Result<(), Error> { diff --git a/src/garage/cli_v2/key.rs b/src/garage/cli/remote/key.rs similarity index 99% rename from src/garage/cli_v2/key.rs rename to src/garage/cli/remote/key.rs index b956906d..67843a83 100644 --- a/src/garage/cli_v2/key.rs +++ b/src/garage/cli/remote/key.rs @@ -4,8 +4,8 @@ use garage_util::error::*; use garage_api_admin::api::*; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::*; impl Cli { pub async fn cmd_key(&self, cmd: KeyOperation) -> Result<(), Error> { diff --git a/src/garage/cli/remote/layout.rs b/src/garage/cli/remote/layout.rs new file mode 100644 index 00000000..f350ab66 --- /dev/null +++ b/src/garage/cli/remote/layout.rs @@ -0,0 +1,474 @@ +use bytesize::ByteSize; +use format_table::format_table; + +use garage_util::error::*; + +use garage_api_admin::api::*; + +use crate::cli::remote::*; +use crate::cli::structs::*; + +impl Cli { + pub async fn layout_command_dispatch(&self, cmd: LayoutOperation) -> Result<(), Error> { + match cmd { + LayoutOperation::Show => self.cmd_show_layout().await, + LayoutOperation::Assign(assign_opt) => self.cmd_assign_role(assign_opt).await, + LayoutOperation::Remove(remove_opt) => self.cmd_remove_role(remove_opt).await, + LayoutOperation::Config(config_opt) => self.cmd_config_layout(config_opt).await, + LayoutOperation::Apply(apply_opt) => self.cmd_apply_layout(apply_opt).await, + LayoutOperation::Revert(revert_opt) => self.cmd_revert_layout(revert_opt).await, + LayoutOperation::History => self.cmd_layout_history().await, + LayoutOperation::SkipDeadNodes(opt) => self.cmd_skip_dead_nodes(opt).await, + } + } + + pub async fn cmd_show_layout(&self) -> Result<(), Error> { + let layout = self.api_request(GetClusterLayoutRequest).await?; + + println!("==== CURRENT CLUSTER LAYOUT ===="); + print_cluster_layout(&layout, "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes."); + println!(); + println!("Current cluster layout version: {}", layout.version); + + let has_role_changes = print_staging_role_changes(&layout); + if has_role_changes { + let res_apply = self.api_request(PreviewClusterLayoutChangesRequest).await?; + + // this will print the stats of what partitions + // will move around when we apply + match res_apply { + PreviewClusterLayoutChangesResponse::Success { + message, + new_layout, + } => { + println!(); + println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ===="); + print_cluster_layout(&new_layout, "No nodes have a role in the new layout."); + println!(); + + for line in message.iter() { + println!("{}", line); + } + println!("To enact the staged role changes, type:"); + println!(); + println!(" garage layout apply --version {}", new_layout.version); + println!(); + println!("You can also revert all proposed changes with: garage layout revert"); + } + PreviewClusterLayoutChangesResponse::Error { error } => { + println!("Error while trying to compute the assignment: {}", error); + println!("This new layout cannot yet be applied."); + println!("You can also revert all proposed changes with: garage layout revert"); + } + } + } + + Ok(()) + } + + pub async fn cmd_assign_role(&self, opt: AssignRoleOpt) -> Result<(), Error> { + let status = self.api_request(GetClusterStatusRequest).await?; + let layout = self.api_request(GetClusterLayoutRequest).await?; + + let mut actions = vec![]; + + for node in opt.replace.iter() { + let id = find_matching_node(&status, &layout, &node)?; + + actions.push(NodeRoleChange { + id, + action: NodeRoleChangeEnum::Remove { remove: true }, + }); + } + + for node in opt.node_ids.iter() { + let id = find_matching_node(&status, &layout, &node)?; + + let current = get_staged_or_current_role(&id, &layout); + + let zone = opt + .zone + .clone() + .or_else(|| current.as_ref().map(|c| c.zone.clone())) + .ok_or_message("Please specify a zone with the -z flag")?; + + let capacity = if opt.gateway { + if opt.capacity.is_some() { + return Err(Error::Message("Please specify only -c or -g".into())); + } + None + } else if let Some(cap) = opt.capacity { + Some(cap.as_u64()) + } else { + current.as_ref().ok_or_message("Please specify a capacity with the -c flag, or set node explicitly as gateway with -g")?.capacity + }; + + let tags = if !opt.tags.is_empty() { + opt.tags.clone() + } else if let Some(cur) = current.as_ref() { + cur.tags.clone() + } else { + vec![] + }; + + actions.push(NodeRoleChange { + id, + action: NodeRoleChangeEnum::Update(NodeAssignedRole { + zone, + capacity, + tags, + }), + }); + } + + self.api_request(UpdateClusterLayoutRequest { + roles: actions, + parameters: None, + }) + .await?; + + println!("Role changes are staged but not yet committed."); + println!("Use `garage layout show` to view staged role changes,"); + println!("and `garage layout apply` to enact staged changes."); + Ok(()) + } + + pub async fn cmd_remove_role(&self, opt: RemoveRoleOpt) -> Result<(), Error> { + let status = self.api_request(GetClusterStatusRequest).await?; + let layout = self.api_request(GetClusterLayoutRequest).await?; + + let id = find_matching_node(&status, &layout, &opt.node_id)?; + + let actions = vec![NodeRoleChange { + id, + action: NodeRoleChangeEnum::Remove { remove: true }, + }]; + + self.api_request(UpdateClusterLayoutRequest { + roles: actions, + parameters: None, + }) + .await?; + + println!("Role removal is staged but not yet committed."); + println!("Use `garage layout show` to view staged role changes,"); + println!("and `garage layout apply` to enact staged changes."); + Ok(()) + } + + pub async fn cmd_config_layout(&self, config_opt: ConfigLayoutOpt) -> Result<(), Error> { + let mut did_something = false; + match config_opt.redundancy { + None => (), + Some(r_str) => { + let r = parse_zone_redundancy(&r_str)?; + + self.api_request(UpdateClusterLayoutRequest { + roles: vec![], + parameters: Some(LayoutParameters { zone_redundancy: r }), + }) + .await?; + println!( + "The zone redundancy parameter has been set to '{}'.", + display_zone_redundancy(r) + ); + did_something = true; + } + } + + if !did_something { + return Err(Error::Message( + "Please specify an action for `garage layout config`".into(), + )); + } + + Ok(()) + } + + pub async fn cmd_apply_layout(&self, apply_opt: ApplyLayoutOpt) -> Result<(), Error> { + let missing_version_error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + + let req = ApplyClusterLayoutRequest { + version: apply_opt.version.ok_or_message(missing_version_error)?, + }; + let res = self.api_request(req).await?; + + for line in res.message.iter() { + println!("{}", line); + } + + println!("New cluster layout with updated role assignment has been applied in cluster."); + println!("Data will now be moved around between nodes accordingly."); + + Ok(()) + } + + pub async fn cmd_revert_layout(&self, revert_opt: RevertLayoutOpt) -> Result<(), Error> { + if !revert_opt.yes { + return Err(Error::Message( + "Please add the --yes flag to run the layout revert operation".into(), + )); + } + + self.api_request(RevertClusterLayoutRequest).await?; + + println!("All proposed role changes in cluster layout have been canceled."); + Ok(()) + } + + pub async fn cmd_layout_history(&self) -> Result<(), Error> { + let history = self.api_request(GetClusterLayoutHistoryRequest).await?; + + println!("==== LAYOUT HISTORY ===="); + let mut table = vec!["Version\tStatus\tStorage nodes\tGateway nodes".to_string()]; + for ver in history.versions.iter() { + table.push(format!( + "#{}\t{:?}\t{}\t{}", + ver.version, ver.status, ver.storage_nodes, ver.gateway_nodes, + )); + } + format_table(table); + println!(); + + if let Some(update_trackers) = history.update_trackers { + println!("==== UPDATE TRACKERS ===="); + println!("Several layout versions are currently live in the cluster, and data is being migrated."); + println!( + "This is the internal data that Garage stores to know which nodes have what data." + ); + println!(); + let mut table = vec!["Node\tAck\tSync\tSync_ack".to_string()]; + for (node, trackers) in update_trackers.iter() { + table.push(format!( + "{:.16}\t#{}\t#{}\t#{}", + node, trackers.ack, trackers.sync, trackers.sync_ack, + )); + } + table[1..].sort(); + format_table(table); + + println!(); + println!( + "If some nodes are not catching up to the latest layout version in the update trackers," + ); + println!( + "it might be because they are offline or unable to complete a sync successfully." + ); + if history.min_ack < history.current_version { + println!( + "You may force progress using `garage layout skip-dead-nodes --version {}`", + history.current_version + ); + } else { + println!( + "You may force progress using `garage layout skip-dead-nodes --version {} --allow-missing-data`.", + history.current_version + ); + } + } else { + println!( + "Your cluster is currently in a stable state with a single live layout version." + ); + println!("No metadata migration is in progress. Note that the migration of data blocks is not tracked,"); + println!( + "so you might want to keep old nodes online until their data directories become empty." + ); + } + + Ok(()) + } + + pub async fn cmd_skip_dead_nodes(&self, opt: SkipDeadNodesOpt) -> Result<(), Error> { + let res = self + .api_request(ClusterLayoutSkipDeadNodesRequest { + version: opt.version, + allow_missing_data: opt.allow_missing_data, + }) + .await?; + + if !res.sync_updated.is_empty() || !res.ack_updated.is_empty() { + for node in res.ack_updated.iter() { + println!("Increased the ACK tracker for node {:.16}", node); + } + for node in res.sync_updated.iter() { + println!("Increased the SYNC tracker for node {:.16}", node); + } + Ok(()) + } else if !opt.allow_missing_data { + Err(Error::Message("Nothing was done, try passing the `--allow-missing-data` flag to force progress even when not enough nodes can complete a metadata sync.".into())) + } else { + Err(Error::Message( + "Sorry, there is nothing I can do for you. Please wait patiently. If you ask for help, please send the output of the `garage layout history` command.".into(), + )) + } + } +} + +// -------------------------- +// ---- helper functions ---- +// -------------------------- + +pub fn capacity_string(v: Option) -> String { + match v { + Some(c) => ByteSize::b(c).to_string_as(false), + None => "gateway".to_string(), + } +} + +pub fn get_staged_or_current_role( + id: &str, + layout: &GetClusterLayoutResponse, +) -> Option { + for node in layout.staged_role_changes.iter() { + if node.id == id { + return match &node.action { + NodeRoleChangeEnum::Remove { .. } => None, + NodeRoleChangeEnum::Update(role) => Some(role.clone()), + }; + } + } + + for node in layout.roles.iter() { + if node.id == id { + return Some(NodeAssignedRole { + zone: node.zone.clone(), + capacity: node.capacity, + tags: node.tags.clone(), + }); + } + } + + None +} + +pub fn find_matching_node<'a>( + status: &GetClusterStatusResponse, + layout: &GetClusterLayoutResponse, + pattern: &'a str, +) -> Result { + let all_node_ids_iter = status + .nodes + .iter() + .map(|x| x.id.as_str()) + .chain(layout.roles.iter().map(|x| x.id.as_str())); + + let mut candidates = vec![]; + for c in all_node_ids_iter { + if c.starts_with(pattern) && !candidates.contains(&c) { + candidates.push(c); + } + } + if candidates.len() != 1 { + Err(Error::Message(format!( + "{} nodes match '{}'", + candidates.len(), + pattern, + ))) + } else { + Ok(candidates[0].to_string()) + } +} + +pub fn print_cluster_layout(layout: &GetClusterLayoutResponse, empty_msg: &str) { + let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()]; + for role in layout.roles.iter() { + let tags = role.tags.join(","); + if let (Some(capacity), Some(usable_capacity)) = (role.capacity, role.usable_capacity) { + table.push(format!( + "{:.16}\t{}\t{}\t{}\t{} ({:.1}%)", + role.id, + tags, + role.zone, + capacity_string(role.capacity), + ByteSize::b(usable_capacity).to_string_as(false), + (100.0 * usable_capacity as f32) / (capacity as f32) + )); + } else { + table.push(format!( + "{:.16}\t{}\t{}\t{}", + role.id, + tags, + role.zone, + capacity_string(role.capacity), + )); + }; + } + if table.len() > 1 { + format_table(table); + println!(); + println!( + "Zone redundancy: {}", + display_zone_redundancy(layout.parameters.zone_redundancy), + ); + } else { + println!("{}", empty_msg); + } +} + +pub fn print_staging_role_changes(layout: &GetClusterLayoutResponse) -> bool { + let has_role_changes = !layout.staged_role_changes.is_empty(); + + let has_layout_changes = layout.staged_parameters.is_some(); + + if has_role_changes || has_layout_changes { + println!(); + println!("==== STAGED ROLE CHANGES ===="); + if has_role_changes { + let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; + for change in layout.staged_role_changes.iter() { + match &change.action { + NodeRoleChangeEnum::Update(NodeAssignedRole { + tags, + zone, + capacity, + }) => { + let tags = tags.join(","); + table.push(format!( + "{:.16}\t{}\t{}\t{}", + change.id, + tags, + zone, + capacity_string(*capacity), + )); + } + NodeRoleChangeEnum::Remove { .. } => { + table.push(format!("{:.16}\tREMOVED", change.id)); + } + } + } + format_table(table); + println!(); + } + if let Some(p) = layout.staged_parameters.as_ref() { + println!( + "Zone redundancy: {}", + display_zone_redundancy(p.zone_redundancy) + ); + } + true + } else { + false + } +} + +pub fn display_zone_redundancy(z: ZoneRedundancy) -> String { + match z { + ZoneRedundancy::Maximum => "maximum".into(), + ZoneRedundancy::AtLeast(x) => x.to_string(), + } +} + +pub fn parse_zone_redundancy(s: &str) -> Result { + match s { + "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), + x => { + let v = x.parse::().map_err(|_| { + Error::Message("zone redundancy must be 'none'/'max' or an integer".into()) + })?; + Ok(ZoneRedundancy::AtLeast(v)) + } + } +} diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli/remote/mod.rs similarity index 97% rename from src/garage/cli_v2/mod.rs rename to src/garage/cli/remote/mod.rs index 28c7c824..40673b91 100644 --- a/src/garage/cli_v2/mod.rs +++ b/src/garage/cli/remote/mod.rs @@ -13,7 +13,6 @@ use std::time::Duration; use garage_util::error::*; -use garage_rpc::system::*; use garage_rpc::*; use garage_api_admin::api::*; @@ -23,7 +22,6 @@ use garage_api_admin::RequestHandler; use crate::cli::structs::*; pub struct Cli { - pub system_rpc_endpoint: Arc>, pub proxy_rpc_endpoint: Arc>, pub rpc_host: NodeID, } diff --git a/src/garage/cli_v2/node.rs b/src/garage/cli/remote/node.rs similarity index 99% rename from src/garage/cli_v2/node.rs rename to src/garage/cli/remote/node.rs index c5d0cdea..419d6bf7 100644 --- a/src/garage/cli_v2/node.rs +++ b/src/garage/cli/remote/node.rs @@ -4,8 +4,8 @@ use garage_util::error::*; use garage_api_admin::api::*; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::*; impl Cli { pub async fn cmd_meta(&self, cmd: MetaOperation) -> Result<(), Error> { diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli/remote/worker.rs similarity index 99% rename from src/garage/cli_v2/worker.rs rename to src/garage/cli/remote/worker.rs index 9c248a39..45f0b3cd 100644 --- a/src/garage/cli_v2/worker.rs +++ b/src/garage/cli/remote/worker.rs @@ -4,8 +4,8 @@ use garage_util::error::*; use garage_api_admin::api::*; +use crate::cli::remote::*; use crate::cli::structs::*; -use crate::cli_v2::*; impl Cli { pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> { diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 58d066b3..0af92c35 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -2,7 +2,7 @@ use structopt::StructOpt; use garage_util::version::garage_version; -use crate::cli::convert_db; +use crate::cli::local::convert_db; #[derive(StructOpt, Debug)] pub enum Command { diff --git a/src/garage/cli_v2/layout.rs b/src/garage/cli_v2/layout.rs deleted file mode 100644 index 2f14b332..00000000 --- a/src/garage/cli_v2/layout.rs +++ /dev/null @@ -1,284 +0,0 @@ -use bytesize::ByteSize; -use format_table::format_table; - -use garage_util::error::*; - -use garage_api_admin::api::*; - -use crate::cli::layout as cli_v1; -use crate::cli::structs::*; -use crate::cli_v2::*; - -impl Cli { - pub async fn layout_command_dispatch(&self, cmd: LayoutOperation) -> Result<(), Error> { - match cmd { - LayoutOperation::Assign(assign_opt) => self.cmd_assign_role(assign_opt).await, - LayoutOperation::Remove(remove_opt) => self.cmd_remove_role(remove_opt).await, - LayoutOperation::Apply(apply_opt) => self.cmd_apply_layout(apply_opt).await, - LayoutOperation::Revert(revert_opt) => self.cmd_revert_layout(revert_opt).await, - - // TODO - LayoutOperation::Show => { - cli_v1::cmd_show_layout(&self.system_rpc_endpoint, self.rpc_host).await - } - LayoutOperation::Config(config_opt) => { - cli_v1::cmd_config_layout(&self.system_rpc_endpoint, self.rpc_host, config_opt) - .await - } - LayoutOperation::History => { - cli_v1::cmd_layout_history(&self.system_rpc_endpoint, self.rpc_host).await - } - LayoutOperation::SkipDeadNodes(assume_sync_opt) => { - cli_v1::cmd_layout_skip_dead_nodes( - &self.system_rpc_endpoint, - self.rpc_host, - assume_sync_opt, - ) - .await - } - } - } - - pub async fn cmd_assign_role(&self, opt: AssignRoleOpt) -> Result<(), Error> { - let status = self.api_request(GetClusterStatusRequest).await?; - let layout = self.api_request(GetClusterLayoutRequest).await?; - - let all_node_ids_iter = status - .nodes - .iter() - .map(|x| x.id.as_str()) - .chain(layout.roles.iter().map(|x| x.id.as_str())); - - let mut actions = vec![]; - - for node in opt.replace.iter() { - let id = find_matching_node(all_node_ids_iter.clone(), &node)?; - - actions.push(NodeRoleChange { - id, - action: NodeRoleChangeEnum::Remove { remove: true }, - }); - } - - for node in opt.node_ids.iter() { - let id = find_matching_node(all_node_ids_iter.clone(), &node)?; - - let current = get_staged_or_current_role(&id, &layout); - - let zone = opt - .zone - .clone() - .or_else(|| current.as_ref().map(|c| c.zone.clone())) - .ok_or_message("Please specify a zone with the -z flag")?; - - let capacity = if opt.gateway { - if opt.capacity.is_some() { - return Err(Error::Message("Please specify only -c or -g".into())); - } - None - } else if let Some(cap) = opt.capacity { - Some(cap.as_u64()) - } else { - current.as_ref().ok_or_message("Please specify a capacity with the -c flag, or set node explicitly as gateway with -g")?.capacity - }; - - let tags = if !opt.tags.is_empty() { - opt.tags.clone() - } else if let Some(cur) = current.as_ref() { - cur.tags.clone() - } else { - vec![] - }; - - actions.push(NodeRoleChange { - id, - action: NodeRoleChangeEnum::Update { - zone, - capacity, - tags, - }, - }); - } - - self.api_request(UpdateClusterLayoutRequest(actions)) - .await?; - - println!("Role changes are staged but not yet committed."); - println!("Use `garage layout show` to view staged role changes,"); - println!("and `garage layout apply` to enact staged changes."); - Ok(()) - } - - pub async fn cmd_remove_role(&self, opt: RemoveRoleOpt) -> Result<(), Error> { - let status = self.api_request(GetClusterStatusRequest).await?; - let layout = self.api_request(GetClusterLayoutRequest).await?; - - let all_node_ids_iter = status - .nodes - .iter() - .map(|x| x.id.as_str()) - .chain(layout.roles.iter().map(|x| x.id.as_str())); - - let id = find_matching_node(all_node_ids_iter.clone(), &opt.node_id)?; - - let actions = vec![NodeRoleChange { - id, - action: NodeRoleChangeEnum::Remove { remove: true }, - }]; - - self.api_request(UpdateClusterLayoutRequest(actions)) - .await?; - - println!("Role removal is staged but not yet committed."); - println!("Use `garage layout show` to view staged role changes,"); - println!("and `garage layout apply` to enact staged changes."); - Ok(()) - } - - pub async fn cmd_apply_layout(&self, apply_opt: ApplyLayoutOpt) -> Result<(), Error> { - let missing_version_error = r#" -Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. -To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. - "#; - - let req = ApplyClusterLayoutRequest { - version: apply_opt.version.ok_or_message(missing_version_error)?, - }; - let res = self.api_request(req).await?; - - for line in res.message.iter() { - println!("{}", line); - } - - println!("New cluster layout with updated role assignment has been applied in cluster."); - println!("Data will now be moved around between nodes accordingly."); - - Ok(()) - } - - pub async fn cmd_revert_layout(&self, revert_opt: RevertLayoutOpt) -> Result<(), Error> { - if !revert_opt.yes { - return Err(Error::Message( - "Please add the --yes flag to run the layout revert operation".into(), - )); - } - - self.api_request(RevertClusterLayoutRequest).await?; - - println!("All proposed role changes in cluster layout have been canceled."); - Ok(()) - } -} - -// -------------------------- -// ---- helper functions ---- -// -------------------------- - -pub fn capacity_string(v: Option) -> String { - match v { - Some(c) => ByteSize::b(c).to_string_as(false), - None => "gateway".to_string(), - } -} - -pub fn get_staged_or_current_role( - id: &str, - layout: &GetClusterLayoutResponse, -) -> Option { - for node in layout.staged_role_changes.iter() { - if node.id == id { - return match &node.action { - NodeRoleChangeEnum::Remove { .. } => None, - NodeRoleChangeEnum::Update { - zone, - capacity, - tags, - } => Some(NodeRoleResp { - id: id.to_string(), - zone: zone.to_string(), - capacity: *capacity, - tags: tags.clone(), - }), - }; - } - } - - for node in layout.roles.iter() { - if node.id == id { - return Some(node.clone()); - } - } - - None -} - -pub fn find_matching_node<'a>( - cand: impl std::iter::Iterator, - pattern: &'a str, -) -> Result { - let mut candidates = vec![]; - for c in cand { - if c.starts_with(pattern) && !candidates.contains(&c) { - candidates.push(c); - } - } - if candidates.len() != 1 { - Err(Error::Message(format!( - "{} nodes match '{}'", - candidates.len(), - pattern, - ))) - } else { - Ok(candidates[0].to_string()) - } -} - -pub fn print_staging_role_changes(layout: &GetClusterLayoutResponse) -> bool { - let has_role_changes = !layout.staged_role_changes.is_empty(); - - // TODO!! Layout parameters - let has_layout_changes = false; - - if has_role_changes || has_layout_changes { - println!(); - println!("==== STAGED ROLE CHANGES ===="); - if has_role_changes { - let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; - for change in layout.staged_role_changes.iter() { - match &change.action { - NodeRoleChangeEnum::Update { - tags, - zone, - capacity, - } => { - let tags = tags.join(","); - table.push(format!( - "{:.16}\t{}\t{}\t{}", - change.id, - tags, - zone, - capacity_string(*capacity), - )); - } - NodeRoleChangeEnum::Remove { .. } => { - table.push(format!("{:.16}\tREMOVED", change.id)); - } - } - } - format_table(table); - println!(); - } - //TODO - /* - if has_layout_changes { - println!( - "Zone redundancy: {}", - staging.parameters.get().zone_redundancy - ); - } - */ - true - } else { - false - } -} diff --git a/src/garage/main.rs b/src/garage/main.rs index 683042d9..a72b860c 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -5,7 +5,6 @@ extern crate tracing; mod cli; -mod cli_v2; mod secrets; mod server; #[cfg(feature = "telemetry-otlp")] @@ -144,13 +143,13 @@ async fn main() { let res = match opt.cmd { Command::Server => server::run_server(opt.config_file, opt.secrets).await, Command::OfflineRepair(repair_opt) => { - cli::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await + cli::local::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await } Command::ConvertDb(conv_opt) => { - cli::convert_db::do_conversion(conv_opt).map_err(From::from) + cli::local::convert_db::do_conversion(conv_opt).map_err(From::from) } Command::Node(NodeOperation::NodeId(node_id_opt)) => { - cli::init::node_id_command(opt.config_file, node_id_opt.quiet) + cli::local::init::node_id_command(opt.config_file, node_id_opt.quiet) } Command::AdminApiSchema => { println!( @@ -260,7 +259,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { (id, addrs[0], false) } else { let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) - .err_context(cli::init::READ_KEY_ERROR)?; + .err_context(cli::local::init::READ_KEY_ERROR)?; if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) { use std::net::ToSocketAddrs; let a = a @@ -289,11 +288,9 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct full-length node ID (public key).")?; } - let system_rpc_endpoint = netapp.endpoint::(SYSTEM_RPC_PATH.into()); let proxy_rpc_endpoint = netapp.endpoint::(PROXY_RPC_PATH.into()); - let cli = cli_v2::Cli { - system_rpc_endpoint, + let cli = cli::remote::Cli { proxy_rpc_endpoint, rpc_host: id, }; diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 574c50c2..16c32fb2 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -267,20 +267,9 @@ impl LayoutHistory { changed } - pub fn apply_staged_changes(mut self, version: Option) -> Result<(Self, Message), Error> { - match version { - None => { - let error = r#" -Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. -To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. - "#; - return Err(Error::Message(error.into())); - } - Some(v) => { - if v != self.current().version + 1 { - return Err(Error::Message("Invalid new layout version".into())); - } - } + pub fn apply_staged_changes(mut self, version: u64) -> Result<(Self, Message), Error> { + if version != self.current().version + 1 { + return Err(Error::Message("Invalid new layout version".into())); } // Compute new version and add it to history diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index ce21a524..cfd576a7 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -1,5 +1,3 @@ -use std::fmt; - use bytesize::ByteSize; use garage_util::crdt::{AutoCrdt, Crdt}; @@ -397,30 +395,6 @@ impl NodeRole { } } -impl fmt::Display for ZoneRedundancy { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ZoneRedundancy::Maximum => write!(f, "maximum"), - ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), - } - } -} - -impl core::str::FromStr for ZoneRedundancy { - type Err = &'static str; - fn from_str(s: &str) -> Result { - match s { - "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), - x => { - let v = x - .parse::() - .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; - Ok(ZoneRedundancy::AtLeast(v)) - } - } - } -} - impl UpdateTracker { fn merge(&mut self, other: &UpdateTracker) -> bool { let mut changed = false; @@ -455,7 +429,7 @@ impl UpdateTracker { } } - pub fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { + fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { storage_nodes .iter() .map(|x| self.get(x, min_version)) diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs index 5462160b..2d29914e 100644 --- a/src/rpc/layout/test.rs +++ b/src/rpc/layout/test.rs @@ -124,7 +124,7 @@ fn test_assignment() { let mut cl = LayoutHistory::new(ReplicationFactor::new(3).unwrap()); update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); let v = cl.current().version; - let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + let (mut cl, msg) = cl.apply_staged_changes(v + 1).unwrap(); show_msg(&msg); assert_eq!(cl.check(), Ok(())); assert!(check_against_naive(cl.current()).unwrap()); @@ -133,7 +133,7 @@ fn test_assignment() { node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"]; update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 2); let v = cl.current().version; - let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + let (mut cl, msg) = cl.apply_staged_changes(v + 1).unwrap(); show_msg(&msg); assert_eq!(cl.check(), Ok(())); assert!(check_against_naive(cl.current()).unwrap()); @@ -141,7 +141,7 @@ fn test_assignment() { node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000]; update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); let v = cl.current().version; - let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + let (mut cl, msg) = cl.apply_staged_changes(v + 1).unwrap(); show_msg(&msg); assert_eq!(cl.check(), Ok(())); assert!(check_against_naive(cl.current()).unwrap()); @@ -151,7 +151,7 @@ fn test_assignment() { ]; update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 1); let v = cl.current().version; - let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); + let (cl, msg) = cl.apply_staged_changes(v + 1).unwrap(); show_msg(&msg); assert_eq!(cl.check(), Ok(())); assert!(check_against_naive(cl.current()).unwrap());