Merge pull request 'admin refactoring, step 4' (#980) from refactor-admin into next-v2
All checks were successful
ci/woodpecker/pr/debug Pipeline was successful
ci/woodpecker/push/debug Pipeline was successful

Reviewed-on: #980
This commit is contained in:
Alex 2025-03-11 09:19:12 +00:00
commit 85a07c87d7
26 changed files with 1456 additions and 1386 deletions

View file

@ -157,6 +157,40 @@
}
}
},
"/v2/ClusterLayoutSkipDeadNodes": {
"post": {
"tags": [
"Cluster layout"
],
"description": "Force progress in layout update trackers",
"operationId": "ClusterLayoutSkipDeadNodes",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ClusterLayoutSkipDeadNodesRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "Request has been taken into account",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ClusterLayoutSkipDeadNodesResponse"
}
}
}
},
"500": {
"description": "Internal server error"
}
}
}
},
"/v2/ConnectClusterNodes": {
"post": {
"tags": [
@ -512,6 +546,30 @@
}
}
},
"/v2/GetClusterLayoutHistory": {
"get": {
"tags": [
"Cluster layout"
],
"description": "\nReturns the history of layouts in the cluster\n ",
"operationId": "GetClusterLayoutHistory",
"responses": {
"200": {
"description": "Cluster layout history",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/GetClusterLayoutHistoryResponse"
}
}
}
},
"500": {
"description": "Internal server error"
}
}
}
},
"/v2/GetClusterStatistics": {
"get": {
"tags": [
@ -950,6 +1008,30 @@
}
}
},
"/v2/PreviewClusterLayoutChanges": {
"post": {
"tags": [
"Cluster layout"
],
"description": "\nComputes a new layout taking into account the staged parameters, and returns it with detailed statistics. The new layout is not applied in the cluster.\n\n*Note: do not try to parse the `message` field of the response, it is given as an array of string specifically because its format is not stable.*\n ",
"operationId": "PreviewClusterLayoutChanges",
"responses": {
"200": {
"description": "Information about the new layout",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/PreviewClusterLayoutChangesResponse"
}
}
}
},
"500": {
"description": "Internal server error"
}
}
}
},
"/v2/PurgeBlocks": {
"post": {
"tags": [
@ -1330,6 +1412,7 @@
"version": {
"type": "integer",
"format": "int64",
"description": "As a safety measure, the new version number of the layout must\nbe specified here",
"minimum": 0
}
}
@ -1342,13 +1425,15 @@
],
"properties": {
"layout": {
"$ref": "#/components/schemas/GetClusterLayoutResponse"
"$ref": "#/components/schemas/GetClusterLayoutResponse",
"description": "Details about the new cluster layout"
},
"message": {
"type": "array",
"items": {
"type": "string"
}
},
"description": "Plain-text information about the layout computation\n(do not try to parse this)"
}
}
},
@ -1576,6 +1661,89 @@
}
}
},
"ClusterLayoutSkipDeadNodesRequest": {
"type": "object",
"required": [
"version",
"allowMissingData"
],
"properties": {
"allowMissingData": {
"type": "boolean",
"description": "Allow the skip even if a quorum of nodes could not be found for\nthe data among the remaining nodes"
},
"version": {
"type": "integer",
"format": "int64",
"description": "Version number of the layout to assume is currently up-to-date.\nThis will generally be the current layout version.",
"minimum": 0
}
}
},
"ClusterLayoutSkipDeadNodesResponse": {
"type": "object",
"required": [
"ackUpdated",
"syncUpdated"
],
"properties": {
"ackUpdated": {
"type": "array",
"items": {
"type": "string"
},
"description": "Nodes for which the ACK update tracker has been updated to `version`"
},
"syncUpdated": {
"type": "array",
"items": {
"type": "string"
},
"description": "If `allow_missing_data` is set,\nnodes for which the SYNC update tracker has been updated to `version`"
}
}
},
"ClusterLayoutVersion": {
"type": "object",
"required": [
"version",
"status",
"storageNodes",
"gatewayNodes"
],
"properties": {
"gatewayNodes": {
"type": "integer",
"format": "int64",
"description": "Number of nodes with a gateway role in this layout version",
"minimum": 0
},
"status": {
"$ref": "#/components/schemas/ClusterLayoutVersionStatus",
"description": "Status of this layout version"
},
"storageNodes": {
"type": "integer",
"format": "int64",
"description": "Number of nodes with an assigned storage capacity in this layout version",
"minimum": 0
},
"version": {
"type": "integer",
"format": "int64",
"description": "Version number of this layout version",
"minimum": 0
}
}
},
"ClusterLayoutVersionStatus": {
"type": "string",
"enum": [
"Current",
"Draining",
"Historical"
]
},
"ConnectClusterNodesRequest": {
"type": "array",
"items": {
@ -1679,11 +1847,13 @@
"available": {
"type": "integer",
"format": "int64",
"description": "Number of bytes available",
"minimum": 0
},
"total": {
"type": "integer",
"format": "int64",
"description": "Total number of bytes",
"minimum": 0
}
}
@ -1870,29 +2040,97 @@
}
}
},
"GetClusterLayoutHistoryResponse": {
"type": "object",
"required": [
"currentVersion",
"minAck",
"versions"
],
"properties": {
"currentVersion": {
"type": "integer",
"format": "int64",
"description": "The current version number of the cluster layout",
"minimum": 0
},
"minAck": {
"type": "integer",
"format": "int64",
"description": "All nodes in the cluster are aware of layout versions up to\nthis version number (at least)",
"minimum": 0
},
"updateTrackers": {
"type": [
"object",
"null"
],
"description": "Detailed update trackers for nodes (see\n`https://garagehq.deuxfleurs.fr/blog/2023-12-preserving-read-after-write-consistency/`)",
"additionalProperties": {
"$ref": "#/components/schemas/NodeUpdateTrackers"
},
"propertyNames": {
"type": "string"
}
},
"versions": {
"type": "array",
"items": {
"$ref": "#/components/schemas/ClusterLayoutVersion"
},
"description": "Layout version history"
}
}
},
"GetClusterLayoutResponse": {
"type": "object",
"required": [
"version",
"roles",
"parameters",
"partitionSize",
"stagedRoleChanges"
],
"properties": {
"parameters": {
"$ref": "#/components/schemas/LayoutParameters",
"description": "Layout parameters used when the current layout was computed"
},
"partitionSize": {
"type": "integer",
"format": "int64",
"description": "The size, in bytes, of one Garage partition (= a shard)",
"minimum": 0
},
"roles": {
"type": "array",
"items": {
"$ref": "#/components/schemas/NodeRoleResp"
}
"$ref": "#/components/schemas/LayoutNodeRole"
},
"description": "List of nodes that currently have a role in the cluster layout"
},
"stagedParameters": {
"oneOf": [
{
"type": "null"
},
{
"$ref": "#/components/schemas/LayoutParameters",
"description": "Layout parameters to use when computing the next version of\nthe cluster layout"
}
]
},
"stagedRoleChanges": {
"type": "array",
"items": {
"$ref": "#/components/schemas/NodeRoleChange"
}
},
"description": "List of nodes that will have a new role or whose role will be\nremoved in the next version of the cluster layout"
},
"version": {
"type": "integer",
"format": "int64",
"description": "The current version number of the cluster layout",
"minimum": 0
}
}
@ -1918,13 +2156,15 @@
"layoutVersion": {
"type": "integer",
"format": "int64",
"description": "Current version number of the cluster layout",
"minimum": 0
},
"nodes": {
"type": "array",
"items": {
"$ref": "#/components/schemas/NodeResp"
}
},
"description": "List of nodes that are either currently connected, part of the\ncurrent cluster layout, or part of an older cluster layout that\nis still active in the cluster (being drained)."
}
}
},
@ -2021,6 +2261,70 @@
}
}
},
"LayoutNodeRole": {
"type": "object",
"required": [
"id",
"zone",
"tags"
],
"properties": {
"capacity": {
"type": [
"integer",
"null"
],
"format": "int64",
"description": "Capacity (in bytes) assigned by the cluster administrator,\nabsent for gateway nodes",
"minimum": 0
},
"id": {
"type": "string",
"description": "Identifier of the node"
},
"storedPartitions": {
"type": [
"integer",
"null"
],
"format": "int64",
"description": "Number of partitions stored on this node\n(a result of the layout computation)",
"minimum": 0
},
"tags": {
"type": "array",
"items": {
"type": "string"
},
"description": "List of tags assigned by the cluster administrator"
},
"usableCapacity": {
"type": [
"integer",
"null"
],
"format": "int64",
"description": "Capacity (in bytes) that is actually usable on this node in the current\nlayout, which is equal to `stored_partitions` × `partition_size`",
"minimum": 0
},
"zone": {
"type": "string",
"description": "Zone name assigned by the cluster administrator"
}
}
},
"LayoutParameters": {
"type": "object",
"required": [
"zoneRedundancy"
],
"properties": {
"zoneRedundancy": {
"$ref": "#/components/schemas/ZoneRedundancy",
"description": "Minimum number of zones in which a data partition must be replicated"
}
}
},
"ListBucketsResponse": {
"type": "array",
"items": {
@ -2804,6 +3108,35 @@
}
}
},
"NodeAssignedRole": {
"type": "object",
"required": [
"zone",
"tags"
],
"properties": {
"capacity": {
"type": [
"integer",
"null"
],
"format": "int64",
"description": "Capacity (in bytes) assigned by the cluster administrator,\nabsent for gateway nodes",
"minimum": 0
},
"tags": {
"type": "array",
"items": {
"type": "string"
},
"description": "List of tags assigned by the cluster administrator"
},
"zone": {
"type": "string",
"description": "Zone name assigned by the cluster administrator"
}
}
},
"NodeResp": {
"type": "object",
"required": [
@ -2816,7 +3149,8 @@
"type": [
"string",
"null"
]
],
"description": "Socket address used by other nodes to connect to this node for RPC"
},
"dataPartition": {
"oneOf": [
@ -2824,24 +3158,29 @@
"type": "null"
},
{
"$ref": "#/components/schemas/FreeSpaceResp"
"$ref": "#/components/schemas/FreeSpaceResp",
"description": "Total and available space on the disk partition(s) containing the data\ndirectory(ies)"
}
]
},
"draining": {
"type": "boolean"
"type": "boolean",
"description": "Whether this node is part of an older layout version and is draining data."
},
"hostname": {
"type": [
"string",
"null"
]
],
"description": "Hostname of the node"
},
"id": {
"type": "string"
"type": "string",
"description": "Full-length node identifier"
},
"isUp": {
"type": "boolean"
"type": "boolean",
"description": "Whether this node is connected in the cluster"
},
"lastSeenSecsAgo": {
"type": [
@ -2849,6 +3188,7 @@
"null"
],
"format": "int64",
"description": "For disconnected nodes, the number of seconds since last contact,\nor `null` if no contact was established since Garage restarted.",
"minimum": 0
},
"metadataPartition": {
@ -2857,7 +3197,8 @@
"type": "null"
},
{
"$ref": "#/components/schemas/FreeSpaceResp"
"$ref": "#/components/schemas/FreeSpaceResp",
"description": "Total and available space on the disk partition containing the\nmetadata directory"
}
]
},
@ -2867,7 +3208,8 @@
"type": "null"
},
{
"$ref": "#/components/schemas/NodeRoleResp"
"$ref": "#/components/schemas/NodeAssignedRole",
"description": "Role assigned to this node in the current cluster layout"
}
]
}
@ -2906,67 +3248,72 @@
}
}
},
{
"$ref": "#/components/schemas/NodeAssignedRole"
}
]
},
"NodeUpdateTrackers": {
"type": "object",
"required": [
"ack",
"sync",
"syncAck"
],
"properties": {
"ack": {
"type": "integer",
"format": "int64",
"minimum": 0
},
"sync": {
"type": "integer",
"format": "int64",
"minimum": 0
},
"syncAck": {
"type": "integer",
"format": "int64",
"minimum": 0
}
}
},
"PreviewClusterLayoutChangesResponse": {
"oneOf": [
{
"type": "object",
"required": [
"zone",
"tags"
"error"
],
"properties": {
"capacity": {
"type": [
"integer",
"null"
],
"format": "int64",
"description": "New capacity (in bytes) of the node",
"minimum": 0
},
"tags": {
"error": {
"type": "string",
"description": "Error message indicating that the layout could not be computed\nwith the provided configuration"
}
}
},
{
"type": "object",
"required": [
"message",
"newLayout"
],
"properties": {
"message": {
"type": "array",
"items": {
"type": "string"
},
"description": "New tags of the node"
"description": "Plain-text information about the layout computation\n(do not try to parse this)"
},
"zone": {
"type": "string",
"description": "New zone of the node"
"newLayout": {
"$ref": "#/components/schemas/GetClusterLayoutResponse",
"description": "Details about the new cluster layout"
}
}
}
]
},
"NodeRoleResp": {
"type": "object",
"required": [
"id",
"zone",
"tags"
],
"properties": {
"capacity": {
"type": [
"integer",
"null"
],
"format": "int64",
"minimum": 0
},
"id": {
"type": "string"
},
"tags": {
"type": "array",
"items": {
"type": "string"
}
},
"zone": {
"type": "string"
}
}
},
"RemoveBucketAliasRequest": {
"allOf": [
{
@ -3109,9 +3456,26 @@
}
},
"UpdateClusterLayoutRequest": {
"type": "array",
"items": {
"$ref": "#/components/schemas/NodeRoleChange"
"type": "object",
"properties": {
"parameters": {
"oneOf": [
{
"type": "null"
},
{
"$ref": "#/components/schemas/LayoutParameters",
"description": "New layout computation parameters to use"
}
]
},
"roles": {
"type": "array",
"items": {
"$ref": "#/components/schemas/NodeRoleChange"
},
"description": "New node roles to assign or remove in the cluster layout"
}
}
},
"UpdateClusterLayoutResponse": {
@ -3289,6 +3653,31 @@
]
}
]
},
"ZoneRedundancy": {
"oneOf": [
{
"type": "object",
"description": "Partitions must be replicated in at least this number of\ndistinct zones.",
"required": [
"atLeast"
],
"properties": {
"atLeast": {
"type": "integer",
"description": "Partitions must be replicated in at least this number of\ndistinct zones.",
"minimum": 0
}
}
},
{
"type": "string",
"description": "Partitions must be replicated in as many zones as possible:\nas many zones as there are replicas, if there are enough distinct\nzones, or at least one in each zone otherwise.",
"enum": [
"maximum"
]
}
]
}
},
"securitySchemes": {

View file

@ -51,9 +51,12 @@ admin_endpoints![
// Layout operations
GetClusterLayout,
GetClusterLayoutHistory,
UpdateClusterLayout,
PreviewClusterLayoutChanges,
ApplyClusterLayout,
RevertClusterLayout,
ClusterLayoutSkipDeadNodes,
// Access key operations
ListKeys,
@ -165,40 +168,61 @@ pub struct GetClusterStatusRequest;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct GetClusterStatusResponse {
/// Current version number of the cluster layout
pub layout_version: u64,
/// List of nodes that are either currently connected, part of the
/// current cluster layout, or part of an older cluster layout that
/// is still active in the cluster (being drained).
pub nodes: Vec<NodeResp>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct NodeResp {
/// Full-length node identifier
pub id: String,
pub role: Option<NodeRoleResp>,
#[schema(value_type = Option<String> )]
/// Role assigned to this node in the current cluster layout
pub role: Option<NodeAssignedRole>,
/// Socket address used by other nodes to connect to this node for RPC
#[schema(value_type = Option<String>)]
pub addr: Option<SocketAddr>,
/// Hostname of the node
pub hostname: Option<String>,
/// Whether this node is connected in the cluster
pub is_up: bool,
/// For disconnected nodes, the number of seconds since last contact,
/// or `null` if no contact was established since Garage restarted.
pub last_seen_secs_ago: Option<u64>,
/// Whether this node is part of an older layout version and is draining data.
pub draining: bool,
#[serde(skip_serializing_if = "Option::is_none")]
/// Total and available space on the disk partition(s) containing the data
/// directory(ies)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub data_partition: Option<FreeSpaceResp>,
#[serde(skip_serializing_if = "Option::is_none")]
/// Total and available space on the disk partition containing the
/// metadata directory
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata_partition: Option<FreeSpaceResp>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct NodeRoleResp {
pub id: String,
pub struct NodeAssignedRole {
/// Zone name assigned by the cluster administrator
pub zone: String,
pub capacity: Option<u64>,
/// List of tags assigned by the cluster administrator
pub tags: Vec<String>,
/// Capacity (in bytes) assigned by the cluster administrator,
/// absent for gateway nodes
pub capacity: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct FreeSpaceResp {
/// Number of bytes available
pub available: u64,
/// Total number of bytes
pub total: u64,
}
@ -270,9 +294,40 @@ pub struct GetClusterLayoutRequest;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct GetClusterLayoutResponse {
/// The current version number of the cluster layout
pub version: u64,
pub roles: Vec<NodeRoleResp>,
/// List of nodes that currently have a role in the cluster layout
pub roles: Vec<LayoutNodeRole>,
/// Layout parameters used when the current layout was computed
pub parameters: LayoutParameters,
/// The size, in bytes, of one Garage partition (= a shard)
pub partition_size: u64,
/// List of nodes that will have a new role or whose role will be
/// removed in the next version of the cluster layout
pub staged_role_changes: Vec<NodeRoleChange>,
/// Layout parameters to use when computing the next version of
/// the cluster layout
pub staged_parameters: Option<LayoutParameters>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct LayoutNodeRole {
/// Identifier of the node
pub id: String,
/// Zone name assigned by the cluster administrator
pub zone: String,
/// List of tags assigned by the cluster administrator
pub tags: Vec<String>,
/// Capacity (in bytes) assigned by the cluster administrator,
/// absent for gateway nodes
pub capacity: Option<u64>,
/// Number of partitions stored on this node
/// (a result of the layout computation)
pub stored_partitions: Option<u64>,
/// Capacity (in bytes) that is actually usable on this node in the current
/// layout, which is equal to `stored_partitions` × `partition_size`
pub usable_capacity: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
@ -293,36 +348,139 @@ pub enum NodeRoleChangeEnum {
remove: bool,
},
#[serde(rename_all = "camelCase")]
Update {
/// New zone of the node
zone: String,
/// New capacity (in bytes) of the node
capacity: Option<u64>,
/// New tags of the node
tags: Vec<String>,
},
Update(NodeAssignedRole),
}
#[derive(Copy, Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct LayoutParameters {
/// Minimum number of zones in which a data partition must be replicated
pub zone_redundancy: ZoneRedundancy,
}
#[derive(Copy, Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub enum ZoneRedundancy {
/// Partitions must be replicated in at least this number of
/// distinct zones.
AtLeast(usize),
/// Partitions must be replicated in as many zones as possible:
/// as many zones as there are replicas, if there are enough distinct
/// zones, or at least one in each zone otherwise.
Maximum,
}
// ---- GetClusterLayoutHistory ----
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetClusterLayoutHistoryRequest;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct GetClusterLayoutHistoryResponse {
/// The current version number of the cluster layout
pub current_version: u64,
/// All nodes in the cluster are aware of layout versions up to
/// this version number (at least)
pub min_ack: u64,
/// Layout version history
pub versions: Vec<ClusterLayoutVersion>,
/// Detailed update trackers for nodes (see
/// `https://garagehq.deuxfleurs.fr/blog/2023-12-preserving-read-after-write-consistency/`)
pub update_trackers: Option<HashMap<String, NodeUpdateTrackers>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ClusterLayoutVersion {
/// Version number of this layout version
pub version: u64,
/// Status of this layout version
pub status: ClusterLayoutVersionStatus,
/// Number of nodes with an assigned storage capacity in this layout version
pub storage_nodes: u64,
/// Number of nodes with a gateway role in this layout version
pub gateway_nodes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub enum ClusterLayoutVersionStatus {
/// This is the most up-to-date layout version
Current,
/// This version is still active in the cluster because metadata
/// is being rebalanced or migrated from old nodes
Draining,
/// This version is no longer active in the cluster for metadata
/// reads and writes. Note that there is still the possibility
/// that data blocks are being migrated away from nodes in this
/// layout version.
Historical,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct NodeUpdateTrackers {
pub ack: u64,
pub sync: u64,
pub sync_ack: u64,
}
// ---- UpdateClusterLayout ----
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct UpdateClusterLayoutRequest(pub Vec<NodeRoleChange>);
pub struct UpdateClusterLayoutRequest {
/// New node roles to assign or remove in the cluster layout
#[serde(default)]
pub roles: Vec<NodeRoleChange>,
/// New layout computation parameters to use
#[serde(default)]
pub parameters: Option<LayoutParameters>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct UpdateClusterLayoutResponse(pub GetClusterLayoutResponse);
// ---- PreviewClusterLayoutChanges ----
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PreviewClusterLayoutChangesRequest;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(untagged)]
pub enum PreviewClusterLayoutChangesResponse {
#[serde(rename_all = "camelCase")]
Error {
/// Error message indicating that the layout could not be computed
/// with the provided configuration
error: String,
},
#[serde(rename_all = "camelCase")]
Success {
/// Plain-text information about the layout computation
/// (do not try to parse this)
message: Vec<String>,
/// Details about the new cluster layout
new_layout: GetClusterLayoutResponse,
},
}
// ---- ApplyClusterLayout ----
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ApplyClusterLayoutRequest {
/// As a safety measure, the new version number of the layout must
/// be specified here
pub version: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ApplyClusterLayoutResponse {
/// Plain-text information about the layout computation
/// (do not try to parse this)
pub message: Vec<String>,
/// Details about the new cluster layout
pub layout: GetClusterLayoutResponse,
}
@ -334,6 +492,29 @@ pub struct RevertClusterLayoutRequest;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct RevertClusterLayoutResponse(pub GetClusterLayoutResponse);
// ---- ClusterLayoutSkipDeadNodes ----
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ClusterLayoutSkipDeadNodesRequest {
/// Version number of the layout to assume is currently up-to-date.
/// This will generally be the current layout version.
pub version: u64,
/// Allow the skip even if a quorum of nodes could not be found for
/// the data among the remaining nodes
pub allow_missing_data: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ClusterLayoutSkipDeadNodesResponse {
/// Nodes for which the ACK update tracker has been updated to `version`
pub ack_updated: Vec<String>,
/// If `allow_missing_data` is set,
/// nodes for which the SYNC update tracker has been updated to `version`
pub sync_updated: Vec<String>,
}
// **********************************************
// Access key operations
// **********************************************
@ -367,7 +548,7 @@ pub struct GetKeyInfoRequest {
pub struct GetKeyInfoResponse {
pub name: String,
pub access_key_id: String,
#[serde(skip_serializing_if = "is_default")]
#[serde(default, skip_serializing_if = "is_default")]
pub secret_access_key: Option<String>,
pub permissions: KeyPerm,
pub buckets: Vec<KeyInfoBucketResponse>,

View file

@ -3,6 +3,7 @@ use std::sync::Arc;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_rpc::layout;
@ -54,8 +55,7 @@ impl RequestHandler for GetClusterStatusRequest {
for (id, _, role) in layout.current().roles.items().iter() {
if let layout::NodeRoleV(Some(r)) = role {
let role = NodeRoleResp {
id: hex::encode(id),
let role = NodeAssignedRole {
zone: r.zone.to_string(),
capacity: r.capacity,
tags: r.tags.clone(),
@ -181,17 +181,23 @@ impl RequestHandler for GetClusterLayoutRequest {
}
fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse {
let roles = layout
.current()
let current = layout.current();
let roles = current
.roles
.items()
.iter()
.filter_map(|(k, _, v)| v.0.clone().map(|x| (k, x)))
.map(|(k, v)| NodeRoleResp {
id: hex::encode(k),
zone: v.zone.clone(),
capacity: v.capacity,
tags: v.tags.clone(),
.map(|(k, v)| {
let stored_partitions = current.get_node_usage(k).ok().map(|x| x as u64);
LayoutNodeRole {
id: hex::encode(k),
zone: v.zone.clone(),
capacity: v.capacity,
stored_partitions,
usable_capacity: stored_partitions.map(|x| x * current.partition_size),
tags: v.tags.clone(),
}
})
.collect::<Vec<_>>();
@ -201,7 +207,7 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp
.roles
.items()
.iter()
.filter(|(k, _, v)| layout.current().roles.get(k) != Some(v))
.filter(|(k, _, v)| current.roles.get(k) != Some(v))
.map(|(k, _, v)| match &v.0 {
None => NodeRoleChange {
id: hex::encode(k),
@ -209,19 +215,108 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp
},
Some(r) => NodeRoleChange {
id: hex::encode(k),
action: NodeRoleChangeEnum::Update {
action: NodeRoleChangeEnum::Update(NodeAssignedRole {
zone: r.zone.clone(),
capacity: r.capacity,
tags: r.tags.clone(),
},
}),
},
})
.collect::<Vec<_>>();
let staged_parameters = if *layout.staging.get().parameters.get() != current.parameters {
Some((*layout.staging.get().parameters.get()).into())
} else {
None
};
GetClusterLayoutResponse {
version: layout.current().version,
version: current.version,
roles,
partition_size: current.partition_size,
parameters: current.parameters.into(),
staged_role_changes,
staged_parameters,
}
}
impl RequestHandler for GetClusterLayoutHistoryRequest {
type Response = GetClusterLayoutHistoryResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetClusterLayoutHistoryResponse, Error> {
let layout_helper = garage.system.cluster_layout();
let layout = layout_helper.inner();
let min_stored = layout.min_stored();
let versions = layout
.versions
.iter()
.rev()
.chain(layout.old_versions.iter().rev())
.map(|ver| {
let status = if ver.version == layout.current().version {
ClusterLayoutVersionStatus::Current
} else if ver.version >= min_stored {
ClusterLayoutVersionStatus::Draining
} else {
ClusterLayoutVersionStatus::Historical
};
ClusterLayoutVersion {
version: ver.version,
status,
storage_nodes: ver
.roles
.items()
.iter()
.filter(
|(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_some()),
)
.count() as u64,
gateway_nodes: ver
.roles
.items()
.iter()
.filter(
|(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_none()),
)
.count() as u64,
}
})
.collect::<Vec<_>>();
let all_nodes = layout.get_all_nodes();
let min_ack = layout_helper.ack_map_min();
let update_trackers = if layout.versions.len() > 1 {
Some(
all_nodes
.iter()
.map(|node| {
(
hex::encode(&node),
NodeUpdateTrackers {
ack: layout.update_trackers.ack_map.get(node, min_stored),
sync: layout.update_trackers.sync_map.get(node, min_stored),
sync_ack: layout.update_trackers.sync_ack_map.get(node, min_stored),
},
)
})
.collect(),
)
} else {
None
};
Ok(GetClusterLayoutHistoryResponse {
current_version: layout.current().version,
min_ack,
versions,
update_trackers,
})
}
}
@ -242,21 +337,26 @@ impl RequestHandler for UpdateClusterLayoutRequest {
let mut roles = layout.current().roles.clone();
roles.merge(&layout.staging.get().roles);
for change in self.0 {
for change in self.roles {
let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?;
let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
let new_role = match change.action {
NodeRoleChangeEnum::Remove { remove: true } => None,
NodeRoleChangeEnum::Update {
NodeRoleChangeEnum::Update(NodeAssignedRole {
zone,
capacity,
tags,
} => Some(layout::NodeRole {
zone,
capacity,
tags,
}),
}) => {
if matches!(capacity, Some(cap) if cap < 1024) {
return Err(Error::bad_request("Capacity should be at least 1K (1024)"));
}
Some(layout::NodeRole {
zone,
capacity,
tags,
})
}
_ => return Err(Error::bad_request("Invalid layout change")),
};
@ -267,6 +367,22 @@ impl RequestHandler for UpdateClusterLayoutRequest {
.merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
}
if let Some(param) = self.parameters {
if let ZoneRedundancy::AtLeast(r_int) = param.zone_redundancy {
if r_int > layout.current().replication_factor {
return Err(Error::bad_request(format!(
"The zone redundancy must be smaller or equal to the replication factor ({}).",
layout.current().replication_factor
)));
} else if r_int < 1 {
return Err(Error::bad_request(
"The zone redundancy must be at least 1.",
));
}
}
layout.staging.get_mut().parameters.update(param.into());
}
garage
.system
.layout_manager
@ -278,6 +394,29 @@ impl RequestHandler for UpdateClusterLayoutRequest {
}
}
impl RequestHandler for PreviewClusterLayoutChangesRequest {
type Response = PreviewClusterLayoutChangesResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<PreviewClusterLayoutChangesResponse, Error> {
let layout = garage.system.cluster_layout().inner().clone();
let new_ver = layout.current().version + 1;
match layout.apply_staged_changes(new_ver) {
Err(GarageError::Message(error)) => {
Ok(PreviewClusterLayoutChangesResponse::Error { error })
}
Err(e) => Err(e.into()),
Ok((new_layout, msg)) => Ok(PreviewClusterLayoutChangesResponse::Success {
message: msg,
new_layout: format_cluster_layout(&new_layout),
}),
}
}
}
impl RequestHandler for ApplyClusterLayoutRequest {
type Response = ApplyClusterLayoutResponse;
@ -287,7 +426,7 @@ impl RequestHandler for ApplyClusterLayoutRequest {
_admin: &Admin,
) -> Result<ApplyClusterLayoutResponse, Error> {
let layout = garage.system.cluster_layout().inner().clone();
let (layout, msg) = layout.apply_staged_changes(Some(self.version))?;
let (layout, msg) = layout.apply_staged_changes(self.version)?;
garage
.system
@ -322,3 +461,100 @@ impl RequestHandler for RevertClusterLayoutRequest {
Ok(RevertClusterLayoutResponse(res))
}
}
impl RequestHandler for ClusterLayoutSkipDeadNodesRequest {
type Response = ClusterLayoutSkipDeadNodesResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<ClusterLayoutSkipDeadNodesResponse, Error> {
let status = garage.system.get_known_nodes();
let mut layout = garage.system.cluster_layout().inner().clone();
let mut ack_updated = vec![];
let mut sync_updated = vec![];
if layout.versions.len() == 1 {
return Err(Error::bad_request(
"This command cannot be called when there is only one live cluster layout version",
));
}
let min_v = layout.min_stored();
if self.version <= min_v || self.version > layout.current().version {
return Err(Error::bad_request(format!(
"Invalid version, you may use the following version numbers: {}",
(min_v + 1..=layout.current().version)
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(" ")
)));
}
let all_nodes = layout.get_all_nodes();
for node in all_nodes.iter() {
// Update ACK tracker for dead nodes or for all nodes if --allow-missing-data
if self.allow_missing_data || !status.iter().any(|x| x.id == *node && x.is_up) {
if layout.update_trackers.ack_map.set_max(*node, self.version) {
ack_updated.push(hex::encode(node));
}
}
// If --allow-missing-data, update SYNC tracker for all nodes.
if self.allow_missing_data {
if layout.update_trackers.sync_map.set_max(*node, self.version) {
sync_updated.push(hex::encode(node));
}
}
}
garage
.system
.layout_manager
.update_cluster_layout(&layout)
.await?;
Ok(ClusterLayoutSkipDeadNodesResponse {
ack_updated,
sync_updated,
})
}
}
// ----
impl From<layout::ZoneRedundancy> for ZoneRedundancy {
fn from(x: layout::ZoneRedundancy) -> Self {
match x {
layout::ZoneRedundancy::Maximum => ZoneRedundancy::Maximum,
layout::ZoneRedundancy::AtLeast(x) => ZoneRedundancy::AtLeast(x),
}
}
}
impl Into<layout::ZoneRedundancy> for ZoneRedundancy {
fn into(self) -> layout::ZoneRedundancy {
match self {
ZoneRedundancy::Maximum => layout::ZoneRedundancy::Maximum,
ZoneRedundancy::AtLeast(x) => layout::ZoneRedundancy::AtLeast(x),
}
}
}
impl From<layout::LayoutParameters> for LayoutParameters {
fn from(x: layout::LayoutParameters) -> Self {
LayoutParameters {
zone_redundancy: x.zone_redundancy.into(),
}
}
}
impl Into<layout::LayoutParameters> for LayoutParameters {
fn into(self) -> layout::LayoutParameters {
layout::LayoutParameters {
zone_redundancy: self.zone_redundancy.into(),
}
}
}

View file

@ -88,6 +88,19 @@ Returns the cluster's current layout, including:
)]
fn GetClusterLayout() -> () {}
#[utoipa::path(get,
path = "/v2/GetClusterLayoutHistory",
tag = "Cluster layout",
description = "
Returns the history of layouts in the cluster
",
responses(
(status = 200, description = "Cluster layout history", body = GetClusterLayoutHistoryResponse),
(status = 500, description = "Internal server error")
),
)]
fn GetClusterLayoutHistory() -> () {}
#[utoipa::path(post,
path = "/v2/UpdateClusterLayout",
tag = "Cluster layout",
@ -117,6 +130,21 @@ Contrary to the CLI that may update only a subset of the fields capacity, zone a
)]
fn UpdateClusterLayout() -> () {}
#[utoipa::path(post,
path = "/v2/PreviewClusterLayoutChanges",
tag = "Cluster layout",
description = "
Computes a new layout taking into account the staged parameters, and returns it with detailed statistics. The new layout is not applied in the cluster.
*Note: do not try to parse the `message` field of the response, it is given as an array of string specifically because its format is not stable.*
",
responses(
(status = 200, description = "Information about the new layout", body = PreviewClusterLayoutChangesResponse),
(status = 500, description = "Internal server error")
),
)]
fn PreviewClusterLayoutChanges() -> () {}
#[utoipa::path(post,
path = "/v2/ApplyClusterLayout",
tag = "Cluster layout",
@ -144,6 +172,18 @@ fn ApplyClusterLayout() -> () {}
)]
fn RevertClusterLayout() -> () {}
#[utoipa::path(post,
path = "/v2/ClusterLayoutSkipDeadNodes",
tag = "Cluster layout",
description = "Force progress in layout update trackers",
request_body = ClusterLayoutSkipDeadNodesRequest,
responses(
(status = 200, description = "Request has been taken into account", body = ClusterLayoutSkipDeadNodesResponse),
(status = 500, description = "Internal server error")
),
)]
fn ClusterLayoutSkipDeadNodes() -> () {}
// **********************************************
// Access key operations
// **********************************************
@ -685,9 +725,12 @@ impl Modify for SecurityAddon {
ConnectClusterNodes,
// Layout operations
GetClusterLayout,
GetClusterLayoutHistory,
UpdateClusterLayout,
PreviewClusterLayoutChanges,
ApplyClusterLayout,
RevertClusterLayout,
ClusterLayoutSkipDeadNodes,
// Key operations
ListKeys,
GetKeyInfo,

View file

@ -36,9 +36,12 @@ impl AdminApiRequest {
POST ConnectClusterNodes (body),
// Layout endpoints
GET GetClusterLayout (),
GET GetClusterLayoutHistory (),
POST UpdateClusterLayout (body),
POST PreviewClusterLayoutChanges (),
POST ApplyClusterLayout (body),
POST RevertClusterLayout (),
POST ClusterLayoutSkipDeadNodes (body),
// API key endpoints
GET GetKeyInfo (query_opt::id, query_opt::search, parse_default(false)::show_secret_key),
POST UpdateKey (body_field, query::id),
@ -108,10 +111,7 @@ impl AdminApiRequest {
Endpoint::GetClusterLayout => {
Ok(AdminApiRequest::GetClusterLayout(GetClusterLayoutRequest))
}
Endpoint::UpdateClusterLayout => {
let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?;
Ok(AdminApiRequest::UpdateClusterLayout(updates))
}
// UpdateClusterLayout semantics changed
Endpoint::ApplyClusterLayout => {
let param = parse_json_body::<ApplyClusterLayoutRequest, _, Error>(req).await?;
Ok(AdminApiRequest::ApplyClusterLayout(param))

View file

@ -1,540 +0,0 @@
mod block;
mod bucket;
mod key;
use std::collections::HashMap;
use std::fmt::Write;
use std::future::Future;
use std::sync::Arc;
use futures::future::FutureExt;
use serde::{Deserialize, Serialize};
use format_table::format_table_to_string;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_table::replication::*;
use garage_table::*;
use garage_rpc::layout::PARTITION_BITS;
use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::Version;
use crate::cli::*;
use crate::repair::online::launch_online_repair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
pub enum AdminRpc {
BucketOperation(BucketOperation),
KeyOperation(KeyOperation),
LaunchRepair(RepairOpt),
Stats(StatsOpt),
Worker(WorkerOperation),
BlockOperation(BlockOperation),
MetaOperation(MetaOperation),
// Replies
Ok(String),
BucketList(Vec<Bucket>),
BucketInfo {
bucket: Bucket,
relevant_keys: HashMap<String, Key>,
counters: HashMap<String, i64>,
mpu_counters: HashMap<String, i64>,
},
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
WorkerList(
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
),
WorkerVars(Vec<(Uuid, String, String)>),
WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo {
hash: Hash,
refcount: u64,
versions: Vec<Result<Version, Uuid>>,
uploads: Vec<MultipartUpload>,
},
}
impl Rpc for AdminRpc {
type Response = Result<AdminRpc, Error>;
}
pub struct AdminRpcHandler {
garage: Arc<Garage>,
background: Arc<BackgroundRunner>,
endpoint: Arc<Endpoint<AdminRpc, Self>>,
}
impl AdminRpcHandler {
pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
let admin = Arc::new(Self {
garage,
background,
endpoint,
});
admin.endpoint.set_handler(admin.clone());
admin
}
// ================ REPAIR COMMANDS ====================
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
"Please provide the --yes flag to initiate repair operations.".to_string(),
));
}
if opt.all_nodes {
let mut opt_to_send = opt.clone();
opt_to_send.all_nodes = false;
let mut failures = vec![];
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in all_nodes.iter() {
let node = (*node).into();
let resp = self
.endpoint
.call(
&node,
AdminRpc::LaunchRepair(opt_to_send.clone()),
PRIO_NORMAL,
)
.await;
if !matches!(resp, Ok(Ok(_))) {
failures.push(node);
}
}
if failures.is_empty() {
Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
} else {
Err(Error::BadRequest(format!(
"Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
failures
)))
}
} else {
launch_online_repair(&self.garage, &self.background, opt).await?;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
)))
}
}
// ================ STATS COMMANDS ====================
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
let mut all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in self.garage.system.get_known_nodes().iter() {
if node.is_up && !all_nodes.contains(&node.id) {
all_nodes.push(node.id);
}
}
for node in all_nodes.iter() {
let mut opt = opt.clone();
opt.all_nodes = false;
opt.skip_global = true;
writeln!(&mut ret, "\n======================").unwrap();
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
let node_id = (*node).into();
match self
.endpoint
.call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
.await
{
Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
}
}
writeln!(&mut ret, "\n======================").unwrap();
write!(
&mut ret,
"Cluster statistics:\n\n{}",
self.gather_cluster_stats()
)
.unwrap();
Ok(AdminRpc::Ok(ret))
} else {
Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
}
}
fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
let mut ret = String::new();
writeln!(
&mut ret,
"\nGarage version: {} [features: {}]\nRust compiler version: {}",
garage_util::version::garage_version(),
garage_util::version::garage_features()
.map(|list| list.join(", "))
.unwrap_or_else(|| "(unknown)".into()),
garage_util::version::rust_version(),
)
.unwrap();
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
// Gather table statistics
let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
table.push(self.gather_table_stats(&self.garage.bucket_table)?);
table.push(self.gather_table_stats(&self.garage.key_table)?);
table.push(self.gather_table_stats(&self.garage.object_table)?);
table.push(self.gather_table_stats(&self.garage.version_table)?);
table.push(self.gather_table_stats(&self.garage.block_ref_table)?);
write!(
&mut ret,
"\nTable stats:\n{}",
format_table_to_string(table)
)
.unwrap();
// Gather block manager statistics
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
let rc_len = self.garage.block_manager.rc_len()?.to_string();
writeln!(
&mut ret,
" number of RC entries (~= number of blocks): {}",
rc_len
)
.unwrap();
writeln!(
&mut ret,
" resync queue length: {}",
self.garage.block_manager.resync.queue_len()?
)
.unwrap();
writeln!(
&mut ret,
" blocks with resync errors: {}",
self.garage.block_manager.resync.errors_len()?
)
.unwrap();
if !opt.skip_global {
write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
}
Ok(ret)
}
fn gather_cluster_stats(&self) -> String {
let mut ret = String::new();
// Gather storage node and free space statistics for current nodes
let layout = &self.garage.system.cluster_layout();
let mut node_partition_count = HashMap::<Uuid, u64>::new();
for short_id in layout.current().ring_assignment_data.iter() {
let id = layout.current().node_id_vec[*short_id as usize];
*node_partition_count.entry(id).or_default() += 1;
}
let node_info = self
.garage
.system
.get_known_nodes()
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<_, _>>();
let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
for (id, parts) in node_partition_count.iter() {
let info = node_info.get(id);
let status = info.map(|x| &x.status);
let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
let capacity = role
.map(|x| x.capacity_string())
.unwrap_or_else(|| "?".into());
let avail_str = |x| match x {
Some((avail, total)) => {
let pct = (avail as f64) / (total as f64) * 100.;
let avail = bytesize::ByteSize::b(avail);
let total = bytesize::ByteSize::b(total);
format!("{}/{} ({:.1}%)", avail, total, pct)
}
None => "?".into(),
};
let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
table.push(format!(
" {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
id, hostname, zone, capacity, parts, data_avail, meta_avail
));
}
write!(
&mut ret,
"Storage nodes:\n{}",
format_table_to_string(table)
)
.unwrap();
let meta_part_avail = node_partition_count
.iter()
.filter_map(|(id, parts)| {
node_info
.get(id)
.and_then(|x| x.status.meta_disk_avail)
.map(|c| c.0 / *parts)
})
.collect::<Vec<_>>();
let data_part_avail = node_partition_count
.iter()
.filter_map(|(id, parts)| {
node_info
.get(id)
.and_then(|x| x.status.data_disk_avail)
.map(|c| c.0 / *parts)
})
.collect::<Vec<_>>();
if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
let meta_avail =
bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
let data_avail =
bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
writeln!(
&mut ret,
"\nEstimated available storage space cluster-wide (might be lower in practice):"
)
.unwrap();
if meta_part_avail.len() < node_partition_count.len()
|| data_part_avail.len() < node_partition_count.len()
{
writeln!(&mut ret, " data: < {}", data_avail).unwrap();
writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
} else {
writeln!(&mut ret, " data: {}", data_avail).unwrap();
writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
}
}
ret
}
fn gather_table_stats<F, R>(&self, t: &Arc<Table<F, R>>) -> Result<String, Error>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
let data_len = t.data.store.len().map_err(GarageError::from)?.to_string();
let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
Ok(format!(
" {}\t{}\t{}\t{}\t{}",
F::TABLE_NAME,
data_len,
mkl_len,
t.merkle_updater.todo_len()?,
t.data.gc_todo_len()?
))
}
// ================ WORKER COMMANDS ====================
async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
match cmd {
WorkerOperation::List { opt } => {
let workers = self.background.get_worker_info();
Ok(AdminRpc::WorkerList(workers, *opt))
}
WorkerOperation::Info { tid } => {
let info = self
.background
.get_worker_info()
.get(tid)
.ok_or_bad_request(format!("No worker with TID {}", tid))?
.clone();
Ok(AdminRpc::WorkerInfo(*tid, info))
}
WorkerOperation::Get {
all_nodes,
variable,
} => self.handle_get_var(*all_nodes, variable).await,
WorkerOperation::Set {
all_nodes,
variable,
value,
} => self.handle_set_var(*all_nodes, variable, value).await,
}
}
async fn handle_get_var(
&self,
all_nodes: bool,
variable: &Option<String>,
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in all_nodes.iter() {
let node = (*node).into();
match self
.endpoint
.call(
&node,
AdminRpc::Worker(WorkerOperation::Get {
all_nodes: false,
variable: variable.clone(),
}),
PRIO_NORMAL,
)
.await??
{
AdminRpc::WorkerVars(v) => ret.extend(v),
m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
}
Ok(AdminRpc::WorkerVars(ret))
} else {
#[allow(clippy::collapsible_else_if)]
if let Some(v) = variable {
Ok(AdminRpc::WorkerVars(vec![(
self.garage.system.id,
v.clone(),
self.garage.bg_vars.get(v)?,
)]))
} else {
let mut vars = self.garage.bg_vars.get_all();
vars.sort();
Ok(AdminRpc::WorkerVars(
vars.into_iter()
.map(|(k, v)| (self.garage.system.id, k.to_string(), v))
.collect(),
))
}
}
}
async fn handle_set_var(
&self,
all_nodes: bool,
variable: &str,
value: &str,
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in all_nodes.iter() {
let node = (*node).into();
match self
.endpoint
.call(
&node,
AdminRpc::Worker(WorkerOperation::Set {
all_nodes: false,
variable: variable.to_string(),
value: value.to_string(),
}),
PRIO_NORMAL,
)
.await??
{
AdminRpc::WorkerVars(v) => ret.extend(v),
m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
}
Ok(AdminRpc::WorkerVars(ret))
} else {
self.garage.bg_vars.set(variable, value)?;
Ok(AdminRpc::WorkerVars(vec![(
self.garage.system.id,
variable.to_string(),
value.to_string(),
)]))
}
}
// ================ META DB COMMANDS ====================
async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> {
match mo {
MetaOperation::Snapshot { all: true } => {
let to = self.garage.system.cluster_layout().all_nodes().to_vec();
let resps = futures::future::join_all(to.iter().map(|to| async move {
let to = (*to).into();
self.endpoint
.call(
&to,
AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
PRIO_NORMAL,
)
.await?
}))
.await;
let mut ret = vec![];
for (to, resp) in to.iter().zip(resps.iter()) {
let res_str = match resp {
Ok(_) => "ok".to_string(),
Err(e) => format!("error: {}", e),
};
ret.push(format!("{:?}\t{}", to, res_str));
}
if resps.iter().any(Result::is_err) {
Err(GarageError::Message(format_table_to_string(ret)).into())
} else {
Ok(AdminRpc::Ok(format_table_to_string(ret)))
}
}
MetaOperation::Snapshot { all: false } => {
garage_model::snapshot::async_snapshot_metadata(&self.garage).await?;
Ok(AdminRpc::Ok("Snapshot has been saved.".into()))
}
}
}
}
impl EndpointHandler<AdminRpc> for AdminRpcHandler {
fn handle(
self: &Arc<Self>,
message: &AdminRpc,
_from: NodeID,
) -> impl Future<Output = Result<AdminRpc, Error>> + Send {
let self2 = self.clone();
async move {
match message {
AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await,
AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self2.handle_stats(opt.clone()).await,
AdminRpc::Worker(wo) => self2.handle_worker_cmd(wo).await,
AdminRpc::BlockOperation(bo) => self2.handle_block_cmd(bo).await,
AdminRpc::MetaOperation(mo) => self2.handle_meta_cmd(mo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}
.boxed()
}
}

View file

@ -1,387 +0,0 @@
use bytesize::ByteSize;
use format_table::format_table;
use garage_util::error::*;
use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_rpc::*;
use crate::cli::structs::*;
pub async fn cmd_show_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<(), Error> {
let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== CURRENT CLUSTER LAYOUT ====");
print_cluster_layout(layout.current(), "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes.");
println!();
println!(
"Current cluster layout version: {}",
layout.current().version
);
let has_role_changes = print_staging_role_changes(&layout);
if has_role_changes {
let v = layout.current().version;
let res_apply = layout.apply_staged_changes(Some(v + 1));
// this will print the stats of what partitions
// will move around when we apply
match res_apply {
Ok((layout, msg)) => {
println!();
println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
print_cluster_layout(layout.current(), "No nodes have a role in the new layout.");
println!();
for line in msg.iter() {
println!("{}", line);
}
println!("To enact the staged role changes, type:");
println!();
println!(" garage layout apply --version {}", v + 1);
println!();
println!("You can also revert all proposed changes with: garage layout revert");
}
Err(e) => {
println!("Error while trying to compute the assignment: {}", e);
println!("This new layout cannot yet be applied.");
println!("You can also revert all proposed changes with: garage layout revert");
}
}
}
Ok(())
}
pub async fn cmd_config_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
config_opt: ConfigLayoutOpt,
) -> Result<(), Error> {
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
let mut did_something = false;
match config_opt.redundancy {
None => (),
Some(r_str) => {
let r = r_str
.parse::<ZoneRedundancy>()
.ok_or_message("invalid zone redundancy value")?;
if let ZoneRedundancy::AtLeast(r_int) = r {
if r_int > layout.current().replication_factor {
return Err(Error::Message(format!(
"The zone redundancy must be smaller or equal to the \
replication factor ({}).",
layout.current().replication_factor
)));
} else if r_int < 1 {
return Err(Error::Message(
"The zone redundancy must be at least 1.".into(),
));
}
}
layout
.staging
.get_mut()
.parameters
.update(LayoutParameters { zone_redundancy: r });
println!("The zone redundancy parameter has been set to '{}'.", r);
did_something = true;
}
}
if !did_something {
return Err(Error::Message(
"Please specify an action for `garage layout config`".into(),
));
}
send_layout(rpc_cli, rpc_host, layout).await?;
Ok(())
}
pub async fn cmd_layout_history(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<(), Error> {
let layout = fetch_layout(rpc_cli, rpc_host).await?;
let min_stored = layout.min_stored();
println!("==== LAYOUT HISTORY ====");
let mut table = vec!["Version\tStatus\tStorage nodes\tGateway nodes".to_string()];
for ver in layout
.versions
.iter()
.rev()
.chain(layout.old_versions.iter().rev())
{
let status = if ver.version == layout.current().version {
"current"
} else if ver.version >= min_stored {
"draining"
} else {
"historical"
};
table.push(format!(
"#{}\t{}\t{}\t{}",
ver.version,
status,
ver.roles
.items()
.iter()
.filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_some()))
.count(),
ver.roles
.items()
.iter()
.filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_none()))
.count(),
));
}
format_table(table);
println!();
if layout.versions.len() > 1 {
println!("==== UPDATE TRACKERS ====");
println!("Several layout versions are currently live in the cluster, and data is being migrated.");
println!(
"This is the internal data that Garage stores to know which nodes have what data."
);
println!();
let mut table = vec!["Node\tAck\tSync\tSync_ack".to_string()];
let all_nodes = layout.get_all_nodes();
for node in all_nodes.iter() {
table.push(format!(
"{:?}\t#{}\t#{}\t#{}",
node,
layout.update_trackers.ack_map.get(node, min_stored),
layout.update_trackers.sync_map.get(node, min_stored),
layout.update_trackers.sync_ack_map.get(node, min_stored),
));
}
table[1..].sort();
format_table(table);
let min_ack = layout
.update_trackers
.ack_map
.min_among(&all_nodes, layout.min_stored());
println!();
println!(
"If some nodes are not catching up to the latest layout version in the update trackers,"
);
println!("it might be because they are offline or unable to complete a sync successfully.");
if min_ack < layout.current().version {
println!(
"You may force progress using `garage layout skip-dead-nodes --version {}`",
layout.current().version
);
} else {
println!(
"You may force progress using `garage layout skip-dead-nodes --version {} --allow-missing-data`.",
layout.current().version
);
}
} else {
println!("Your cluster is currently in a stable state with a single live layout version.");
println!("No metadata migration is in progress. Note that the migration of data blocks is not tracked,");
println!(
"so you might want to keep old nodes online until their data directories become empty."
);
}
Ok(())
}
pub async fn cmd_layout_skip_dead_nodes(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
opt: SkipDeadNodesOpt,
) -> Result<(), Error> {
let status = fetch_status(rpc_cli, rpc_host).await?;
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
if layout.versions.len() == 1 {
return Err(Error::Message(
"This command cannot be called when there is only one live cluster layout version"
.into(),
));
}
let min_v = layout.min_stored();
if opt.version <= min_v || opt.version > layout.current().version {
return Err(Error::Message(format!(
"Invalid version, you may use the following version numbers: {}",
(min_v + 1..=layout.current().version)
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(" ")
)));
}
let all_nodes = layout.get_all_nodes();
let mut did_something = false;
for node in all_nodes.iter() {
// Update ACK tracker for dead nodes or for all nodes if --allow-missing-data
if opt.allow_missing_data || !status.iter().any(|x| x.id == *node && x.is_up) {
if layout.update_trackers.ack_map.set_max(*node, opt.version) {
println!("Increased the ACK tracker for node {:?}", node);
did_something = true;
}
}
// If --allow-missing-data, update SYNC tracker for all nodes.
if opt.allow_missing_data {
if layout.update_trackers.sync_map.set_max(*node, opt.version) {
println!("Increased the SYNC tracker for node {:?}", node);
did_something = true;
}
}
}
if did_something {
send_layout(rpc_cli, rpc_host, layout).await?;
println!("Success.");
Ok(())
} else if !opt.allow_missing_data {
Err(Error::Message("Nothing was done, try passing the `--allow-missing-data` flag to force progress even when not enough nodes can complete a metadata sync.".into()))
} else {
Err(Error::Message(
"Sorry, there is nothing I can do for you. Please wait patiently. If you ask for help, please send the output of the `garage layout history` command.".into(),
))
}
}
// --- utility ---
pub async fn fetch_status(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<Vec<KnownNodeInfo>, Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
resp => Err(Error::unexpected_rpc_message(resp)),
}
}
pub async fn fetch_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<LayoutHistory, Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
resp => Err(Error::unexpected_rpc_message(resp)),
}
}
pub async fn send_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
layout: LayoutHistory,
) -> Result<(), Error> {
rpc_cli
.call(
&rpc_host,
SystemRpc::AdvertiseClusterLayout(layout),
PRIO_NORMAL,
)
.await??;
Ok(())
}
pub fn print_cluster_layout(layout: &LayoutVersion, empty_msg: &str) {
let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()];
for (id, _, role) in layout.roles.items().iter() {
let role = match &role.0 {
Some(r) => r,
_ => continue,
};
let tags = role.tags.join(",");
let usage = layout.get_node_usage(id).unwrap_or(0);
let capacity = layout.get_node_capacity(id).unwrap_or(0);
if capacity > 0 {
table.push(format!(
"{:?}\t{}\t{}\t{}\t{} ({:.1}%)",
id,
tags,
role.zone,
role.capacity_string(),
ByteSize::b(usage as u64 * layout.partition_size).to_string_as(false),
(100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32)
));
} else {
table.push(format!(
"{:?}\t{}\t{}\t{}",
id,
tags,
role.zone,
role.capacity_string()
));
};
}
if table.len() > 1 {
format_table(table);
println!();
println!("Zone redundancy: {}", layout.parameters.zone_redundancy);
} else {
println!("{}", empty_msg);
}
}
pub fn print_staging_role_changes(layout: &LayoutHistory) -> bool {
let staging = layout.staging.get();
let has_role_changes = staging
.roles
.items()
.iter()
.any(|(k, _, v)| layout.current().roles.get(k) != Some(v));
let has_layout_changes = *staging.parameters.get() != layout.current().parameters;
if has_role_changes || has_layout_changes {
println!();
println!("==== STAGED ROLE CHANGES ====");
if has_role_changes {
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
for (id, _, role) in staging.roles.items().iter() {
if layout.current().roles.get(id) == Some(role) {
continue;
}
if let Some(role) = &role.0 {
let tags = role.tags.join(",");
table.push(format!(
"{:?}\t{}\t{}\t{}",
id,
tags,
role.zone,
role.capacity_string()
));
} else {
table.push(format!("{:?}\tREMOVED", id));
}
}
format_table(table);
println!();
}
if has_layout_changes {
println!(
"Zone redundancy: {}",
staging.parameters.get().zone_redundancy
);
}
true
} else {
false
}
}

View file

@ -0,0 +1,3 @@
pub(crate) mod convert_db;
pub(crate) mod init;
pub(crate) mod repair;

View file

@ -1,7 +1,4 @@
pub(crate) mod structs;
pub mod structs;
pub(crate) mod convert_db;
pub(crate) mod init;
pub(crate) mod repair;
pub(crate) mod layout;
pub mod local;
pub mod remote;

View file

@ -5,8 +5,8 @@ use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli::remote::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_block(&self, cmd: BlockOperation) -> Result<(), Error> {

View file

@ -5,8 +5,8 @@ use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli::remote::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_bucket(&self, cmd: BucketOperation) -> Result<(), Error> {

View file

@ -4,9 +4,9 @@ use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli::remote::layout::*;
use crate::cli::remote::*;
use crate::cli::structs::*;
use crate::cli_v2::layout::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_status(&self) -> Result<(), Error> {

View file

@ -4,8 +4,8 @@ use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli::remote::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_key(&self, cmd: KeyOperation) -> Result<(), Error> {

View file

@ -0,0 +1,474 @@
use bytesize::ByteSize;
use format_table::format_table;
use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli::remote::*;
use crate::cli::structs::*;
impl Cli {
pub async fn layout_command_dispatch(&self, cmd: LayoutOperation) -> Result<(), Error> {
match cmd {
LayoutOperation::Show => self.cmd_show_layout().await,
LayoutOperation::Assign(assign_opt) => self.cmd_assign_role(assign_opt).await,
LayoutOperation::Remove(remove_opt) => self.cmd_remove_role(remove_opt).await,
LayoutOperation::Config(config_opt) => self.cmd_config_layout(config_opt).await,
LayoutOperation::Apply(apply_opt) => self.cmd_apply_layout(apply_opt).await,
LayoutOperation::Revert(revert_opt) => self.cmd_revert_layout(revert_opt).await,
LayoutOperation::History => self.cmd_layout_history().await,
LayoutOperation::SkipDeadNodes(opt) => self.cmd_skip_dead_nodes(opt).await,
}
}
pub async fn cmd_show_layout(&self) -> Result<(), Error> {
let layout = self.api_request(GetClusterLayoutRequest).await?;
println!("==== CURRENT CLUSTER LAYOUT ====");
print_cluster_layout(&layout, "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes.");
println!();
println!("Current cluster layout version: {}", layout.version);
let has_role_changes = print_staging_role_changes(&layout);
if has_role_changes {
let res_apply = self.api_request(PreviewClusterLayoutChangesRequest).await?;
// this will print the stats of what partitions
// will move around when we apply
match res_apply {
PreviewClusterLayoutChangesResponse::Success {
message,
new_layout,
} => {
println!();
println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
print_cluster_layout(&new_layout, "No nodes have a role in the new layout.");
println!();
for line in message.iter() {
println!("{}", line);
}
println!("To enact the staged role changes, type:");
println!();
println!(" garage layout apply --version {}", new_layout.version);
println!();
println!("You can also revert all proposed changes with: garage layout revert");
}
PreviewClusterLayoutChangesResponse::Error { error } => {
println!("Error while trying to compute the assignment: {}", error);
println!("This new layout cannot yet be applied.");
println!("You can also revert all proposed changes with: garage layout revert");
}
}
}
Ok(())
}
pub async fn cmd_assign_role(&self, opt: AssignRoleOpt) -> Result<(), Error> {
let status = self.api_request(GetClusterStatusRequest).await?;
let layout = self.api_request(GetClusterLayoutRequest).await?;
let mut actions = vec![];
for node in opt.replace.iter() {
let id = find_matching_node(&status, &layout, &node)?;
actions.push(NodeRoleChange {
id,
action: NodeRoleChangeEnum::Remove { remove: true },
});
}
for node in opt.node_ids.iter() {
let id = find_matching_node(&status, &layout, &node)?;
let current = get_staged_or_current_role(&id, &layout);
let zone = opt
.zone
.clone()
.or_else(|| current.as_ref().map(|c| c.zone.clone()))
.ok_or_message("Please specify a zone with the -z flag")?;
let capacity = if opt.gateway {
if opt.capacity.is_some() {
return Err(Error::Message("Please specify only -c or -g".into()));
}
None
} else if let Some(cap) = opt.capacity {
Some(cap.as_u64())
} else {
current.as_ref().ok_or_message("Please specify a capacity with the -c flag, or set node explicitly as gateway with -g")?.capacity
};
let tags = if !opt.tags.is_empty() {
opt.tags.clone()
} else if let Some(cur) = current.as_ref() {
cur.tags.clone()
} else {
vec![]
};
actions.push(NodeRoleChange {
id,
action: NodeRoleChangeEnum::Update(NodeAssignedRole {
zone,
capacity,
tags,
}),
});
}
self.api_request(UpdateClusterLayoutRequest {
roles: actions,
parameters: None,
})
.await?;
println!("Role changes are staged but not yet committed.");
println!("Use `garage layout show` to view staged role changes,");
println!("and `garage layout apply` to enact staged changes.");
Ok(())
}
pub async fn cmd_remove_role(&self, opt: RemoveRoleOpt) -> Result<(), Error> {
let status = self.api_request(GetClusterStatusRequest).await?;
let layout = self.api_request(GetClusterLayoutRequest).await?;
let id = find_matching_node(&status, &layout, &opt.node_id)?;
let actions = vec![NodeRoleChange {
id,
action: NodeRoleChangeEnum::Remove { remove: true },
}];
self.api_request(UpdateClusterLayoutRequest {
roles: actions,
parameters: None,
})
.await?;
println!("Role removal is staged but not yet committed.");
println!("Use `garage layout show` to view staged role changes,");
println!("and `garage layout apply` to enact staged changes.");
Ok(())
}
pub async fn cmd_config_layout(&self, config_opt: ConfigLayoutOpt) -> Result<(), Error> {
let mut did_something = false;
match config_opt.redundancy {
None => (),
Some(r_str) => {
let r = parse_zone_redundancy(&r_str)?;
self.api_request(UpdateClusterLayoutRequest {
roles: vec![],
parameters: Some(LayoutParameters { zone_redundancy: r }),
})
.await?;
println!(
"The zone redundancy parameter has been set to '{}'.",
display_zone_redundancy(r)
);
did_something = true;
}
}
if !did_something {
return Err(Error::Message(
"Please specify an action for `garage layout config`".into(),
));
}
Ok(())
}
pub async fn cmd_apply_layout(&self, apply_opt: ApplyLayoutOpt) -> Result<(), Error> {
let missing_version_error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
let req = ApplyClusterLayoutRequest {
version: apply_opt.version.ok_or_message(missing_version_error)?,
};
let res = self.api_request(req).await?;
for line in res.message.iter() {
println!("{}", line);
}
println!("New cluster layout with updated role assignment has been applied in cluster.");
println!("Data will now be moved around between nodes accordingly.");
Ok(())
}
pub async fn cmd_revert_layout(&self, revert_opt: RevertLayoutOpt) -> Result<(), Error> {
if !revert_opt.yes {
return Err(Error::Message(
"Please add the --yes flag to run the layout revert operation".into(),
));
}
self.api_request(RevertClusterLayoutRequest).await?;
println!("All proposed role changes in cluster layout have been canceled.");
Ok(())
}
pub async fn cmd_layout_history(&self) -> Result<(), Error> {
let history = self.api_request(GetClusterLayoutHistoryRequest).await?;
println!("==== LAYOUT HISTORY ====");
let mut table = vec!["Version\tStatus\tStorage nodes\tGateway nodes".to_string()];
for ver in history.versions.iter() {
table.push(format!(
"#{}\t{:?}\t{}\t{}",
ver.version, ver.status, ver.storage_nodes, ver.gateway_nodes,
));
}
format_table(table);
println!();
if let Some(update_trackers) = history.update_trackers {
println!("==== UPDATE TRACKERS ====");
println!("Several layout versions are currently live in the cluster, and data is being migrated.");
println!(
"This is the internal data that Garage stores to know which nodes have what data."
);
println!();
let mut table = vec!["Node\tAck\tSync\tSync_ack".to_string()];
for (node, trackers) in update_trackers.iter() {
table.push(format!(
"{:.16}\t#{}\t#{}\t#{}",
node, trackers.ack, trackers.sync, trackers.sync_ack,
));
}
table[1..].sort();
format_table(table);
println!();
println!(
"If some nodes are not catching up to the latest layout version in the update trackers,"
);
println!(
"it might be because they are offline or unable to complete a sync successfully."
);
if history.min_ack < history.current_version {
println!(
"You may force progress using `garage layout skip-dead-nodes --version {}`",
history.current_version
);
} else {
println!(
"You may force progress using `garage layout skip-dead-nodes --version {} --allow-missing-data`.",
history.current_version
);
}
} else {
println!(
"Your cluster is currently in a stable state with a single live layout version."
);
println!("No metadata migration is in progress. Note that the migration of data blocks is not tracked,");
println!(
"so you might want to keep old nodes online until their data directories become empty."
);
}
Ok(())
}
pub async fn cmd_skip_dead_nodes(&self, opt: SkipDeadNodesOpt) -> Result<(), Error> {
let res = self
.api_request(ClusterLayoutSkipDeadNodesRequest {
version: opt.version,
allow_missing_data: opt.allow_missing_data,
})
.await?;
if !res.sync_updated.is_empty() || !res.ack_updated.is_empty() {
for node in res.ack_updated.iter() {
println!("Increased the ACK tracker for node {:.16}", node);
}
for node in res.sync_updated.iter() {
println!("Increased the SYNC tracker for node {:.16}", node);
}
Ok(())
} else if !opt.allow_missing_data {
Err(Error::Message("Nothing was done, try passing the `--allow-missing-data` flag to force progress even when not enough nodes can complete a metadata sync.".into()))
} else {
Err(Error::Message(
"Sorry, there is nothing I can do for you. Please wait patiently. If you ask for help, please send the output of the `garage layout history` command.".into(),
))
}
}
}
// --------------------------
// ---- helper functions ----
// --------------------------
pub fn capacity_string(v: Option<u64>) -> String {
match v {
Some(c) => ByteSize::b(c).to_string_as(false),
None => "gateway".to_string(),
}
}
pub fn get_staged_or_current_role(
id: &str,
layout: &GetClusterLayoutResponse,
) -> Option<NodeAssignedRole> {
for node in layout.staged_role_changes.iter() {
if node.id == id {
return match &node.action {
NodeRoleChangeEnum::Remove { .. } => None,
NodeRoleChangeEnum::Update(role) => Some(role.clone()),
};
}
}
for node in layout.roles.iter() {
if node.id == id {
return Some(NodeAssignedRole {
zone: node.zone.clone(),
capacity: node.capacity,
tags: node.tags.clone(),
});
}
}
None
}
pub fn find_matching_node<'a>(
status: &GetClusterStatusResponse,
layout: &GetClusterLayoutResponse,
pattern: &'a str,
) -> Result<String, Error> {
let all_node_ids_iter = status
.nodes
.iter()
.map(|x| x.id.as_str())
.chain(layout.roles.iter().map(|x| x.id.as_str()));
let mut candidates = vec![];
for c in all_node_ids_iter {
if c.starts_with(pattern) && !candidates.contains(&c) {
candidates.push(c);
}
}
if candidates.len() != 1 {
Err(Error::Message(format!(
"{} nodes match '{}'",
candidates.len(),
pattern,
)))
} else {
Ok(candidates[0].to_string())
}
}
pub fn print_cluster_layout(layout: &GetClusterLayoutResponse, empty_msg: &str) {
let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()];
for role in layout.roles.iter() {
let tags = role.tags.join(",");
if let (Some(capacity), Some(usable_capacity)) = (role.capacity, role.usable_capacity) {
table.push(format!(
"{:.16}\t{}\t{}\t{}\t{} ({:.1}%)",
role.id,
tags,
role.zone,
capacity_string(role.capacity),
ByteSize::b(usable_capacity).to_string_as(false),
(100.0 * usable_capacity as f32) / (capacity as f32)
));
} else {
table.push(format!(
"{:.16}\t{}\t{}\t{}",
role.id,
tags,
role.zone,
capacity_string(role.capacity),
));
};
}
if table.len() > 1 {
format_table(table);
println!();
println!(
"Zone redundancy: {}",
display_zone_redundancy(layout.parameters.zone_redundancy),
);
} else {
println!("{}", empty_msg);
}
}
pub fn print_staging_role_changes(layout: &GetClusterLayoutResponse) -> bool {
let has_role_changes = !layout.staged_role_changes.is_empty();
let has_layout_changes = layout.staged_parameters.is_some();
if has_role_changes || has_layout_changes {
println!();
println!("==== STAGED ROLE CHANGES ====");
if has_role_changes {
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
for change in layout.staged_role_changes.iter() {
match &change.action {
NodeRoleChangeEnum::Update(NodeAssignedRole {
tags,
zone,
capacity,
}) => {
let tags = tags.join(",");
table.push(format!(
"{:.16}\t{}\t{}\t{}",
change.id,
tags,
zone,
capacity_string(*capacity),
));
}
NodeRoleChangeEnum::Remove { .. } => {
table.push(format!("{:.16}\tREMOVED", change.id));
}
}
}
format_table(table);
println!();
}
if let Some(p) = layout.staged_parameters.as_ref() {
println!(
"Zone redundancy: {}",
display_zone_redundancy(p.zone_redundancy)
);
}
true
} else {
false
}
}
pub fn display_zone_redundancy(z: ZoneRedundancy) -> String {
match z {
ZoneRedundancy::Maximum => "maximum".into(),
ZoneRedundancy::AtLeast(x) => x.to_string(),
}
}
pub fn parse_zone_redundancy(s: &str) -> Result<ZoneRedundancy, Error> {
match s {
"none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
x => {
let v = x.parse::<usize>().map_err(|_| {
Error::Message("zone redundancy must be 'none'/'max' or an integer".into())
})?;
Ok(ZoneRedundancy::AtLeast(v))
}
}
}

View file

@ -13,7 +13,6 @@ use std::time::Duration;
use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
use garage_api_admin::api::*;
@ -23,7 +22,6 @@ use garage_api_admin::RequestHandler;
use crate::cli::structs::*;
pub struct Cli {
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
pub rpc_host: NodeID,
}

View file

@ -4,8 +4,8 @@ use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli::remote::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_meta(&self, cmd: MetaOperation) -> Result<(), Error> {

View file

@ -4,8 +4,8 @@ use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli::remote::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> {

View file

@ -2,7 +2,7 @@ use structopt::StructOpt;
use garage_util::version::garage_version;
use crate::cli::convert_db;
use crate::cli::local::convert_db;
#[derive(StructOpt, Debug)]
pub enum Command {

View file

@ -1,284 +0,0 @@
use bytesize::ByteSize;
use format_table::format_table;
use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli::layout as cli_v1;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn layout_command_dispatch(&self, cmd: LayoutOperation) -> Result<(), Error> {
match cmd {
LayoutOperation::Assign(assign_opt) => self.cmd_assign_role(assign_opt).await,
LayoutOperation::Remove(remove_opt) => self.cmd_remove_role(remove_opt).await,
LayoutOperation::Apply(apply_opt) => self.cmd_apply_layout(apply_opt).await,
LayoutOperation::Revert(revert_opt) => self.cmd_revert_layout(revert_opt).await,
// TODO
LayoutOperation::Show => {
cli_v1::cmd_show_layout(&self.system_rpc_endpoint, self.rpc_host).await
}
LayoutOperation::Config(config_opt) => {
cli_v1::cmd_config_layout(&self.system_rpc_endpoint, self.rpc_host, config_opt)
.await
}
LayoutOperation::History => {
cli_v1::cmd_layout_history(&self.system_rpc_endpoint, self.rpc_host).await
}
LayoutOperation::SkipDeadNodes(assume_sync_opt) => {
cli_v1::cmd_layout_skip_dead_nodes(
&self.system_rpc_endpoint,
self.rpc_host,
assume_sync_opt,
)
.await
}
}
}
pub async fn cmd_assign_role(&self, opt: AssignRoleOpt) -> Result<(), Error> {
let status = self.api_request(GetClusterStatusRequest).await?;
let layout = self.api_request(GetClusterLayoutRequest).await?;
let all_node_ids_iter = status
.nodes
.iter()
.map(|x| x.id.as_str())
.chain(layout.roles.iter().map(|x| x.id.as_str()));
let mut actions = vec![];
for node in opt.replace.iter() {
let id = find_matching_node(all_node_ids_iter.clone(), &node)?;
actions.push(NodeRoleChange {
id,
action: NodeRoleChangeEnum::Remove { remove: true },
});
}
for node in opt.node_ids.iter() {
let id = find_matching_node(all_node_ids_iter.clone(), &node)?;
let current = get_staged_or_current_role(&id, &layout);
let zone = opt
.zone
.clone()
.or_else(|| current.as_ref().map(|c| c.zone.clone()))
.ok_or_message("Please specify a zone with the -z flag")?;
let capacity = if opt.gateway {
if opt.capacity.is_some() {
return Err(Error::Message("Please specify only -c or -g".into()));
}
None
} else if let Some(cap) = opt.capacity {
Some(cap.as_u64())
} else {
current.as_ref().ok_or_message("Please specify a capacity with the -c flag, or set node explicitly as gateway with -g")?.capacity
};
let tags = if !opt.tags.is_empty() {
opt.tags.clone()
} else if let Some(cur) = current.as_ref() {
cur.tags.clone()
} else {
vec![]
};
actions.push(NodeRoleChange {
id,
action: NodeRoleChangeEnum::Update {
zone,
capacity,
tags,
},
});
}
self.api_request(UpdateClusterLayoutRequest(actions))
.await?;
println!("Role changes are staged but not yet committed.");
println!("Use `garage layout show` to view staged role changes,");
println!("and `garage layout apply` to enact staged changes.");
Ok(())
}
pub async fn cmd_remove_role(&self, opt: RemoveRoleOpt) -> Result<(), Error> {
let status = self.api_request(GetClusterStatusRequest).await?;
let layout = self.api_request(GetClusterLayoutRequest).await?;
let all_node_ids_iter = status
.nodes
.iter()
.map(|x| x.id.as_str())
.chain(layout.roles.iter().map(|x| x.id.as_str()));
let id = find_matching_node(all_node_ids_iter.clone(), &opt.node_id)?;
let actions = vec![NodeRoleChange {
id,
action: NodeRoleChangeEnum::Remove { remove: true },
}];
self.api_request(UpdateClusterLayoutRequest(actions))
.await?;
println!("Role removal is staged but not yet committed.");
println!("Use `garage layout show` to view staged role changes,");
println!("and `garage layout apply` to enact staged changes.");
Ok(())
}
pub async fn cmd_apply_layout(&self, apply_opt: ApplyLayoutOpt) -> Result<(), Error> {
let missing_version_error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
let req = ApplyClusterLayoutRequest {
version: apply_opt.version.ok_or_message(missing_version_error)?,
};
let res = self.api_request(req).await?;
for line in res.message.iter() {
println!("{}", line);
}
println!("New cluster layout with updated role assignment has been applied in cluster.");
println!("Data will now be moved around between nodes accordingly.");
Ok(())
}
pub async fn cmd_revert_layout(&self, revert_opt: RevertLayoutOpt) -> Result<(), Error> {
if !revert_opt.yes {
return Err(Error::Message(
"Please add the --yes flag to run the layout revert operation".into(),
));
}
self.api_request(RevertClusterLayoutRequest).await?;
println!("All proposed role changes in cluster layout have been canceled.");
Ok(())
}
}
// --------------------------
// ---- helper functions ----
// --------------------------
pub fn capacity_string(v: Option<u64>) -> String {
match v {
Some(c) => ByteSize::b(c).to_string_as(false),
None => "gateway".to_string(),
}
}
pub fn get_staged_or_current_role(
id: &str,
layout: &GetClusterLayoutResponse,
) -> Option<NodeRoleResp> {
for node in layout.staged_role_changes.iter() {
if node.id == id {
return match &node.action {
NodeRoleChangeEnum::Remove { .. } => None,
NodeRoleChangeEnum::Update {
zone,
capacity,
tags,
} => Some(NodeRoleResp {
id: id.to_string(),
zone: zone.to_string(),
capacity: *capacity,
tags: tags.clone(),
}),
};
}
}
for node in layout.roles.iter() {
if node.id == id {
return Some(node.clone());
}
}
None
}
pub fn find_matching_node<'a>(
cand: impl std::iter::Iterator<Item = &'a str>,
pattern: &'a str,
) -> Result<String, Error> {
let mut candidates = vec![];
for c in cand {
if c.starts_with(pattern) && !candidates.contains(&c) {
candidates.push(c);
}
}
if candidates.len() != 1 {
Err(Error::Message(format!(
"{} nodes match '{}'",
candidates.len(),
pattern,
)))
} else {
Ok(candidates[0].to_string())
}
}
pub fn print_staging_role_changes(layout: &GetClusterLayoutResponse) -> bool {
let has_role_changes = !layout.staged_role_changes.is_empty();
// TODO!! Layout parameters
let has_layout_changes = false;
if has_role_changes || has_layout_changes {
println!();
println!("==== STAGED ROLE CHANGES ====");
if has_role_changes {
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
for change in layout.staged_role_changes.iter() {
match &change.action {
NodeRoleChangeEnum::Update {
tags,
zone,
capacity,
} => {
let tags = tags.join(",");
table.push(format!(
"{:.16}\t{}\t{}\t{}",
change.id,
tags,
zone,
capacity_string(*capacity),
));
}
NodeRoleChangeEnum::Remove { .. } => {
table.push(format!("{:.16}\tREMOVED", change.id));
}
}
}
format_table(table);
println!();
}
//TODO
/*
if has_layout_changes {
println!(
"Zone redundancy: {}",
staging.parameters.get().zone_redundancy
);
}
*/
true
} else {
false
}
}

View file

@ -5,7 +5,6 @@
extern crate tracing;
mod cli;
mod cli_v2;
mod secrets;
mod server;
#[cfg(feature = "telemetry-otlp")]
@ -144,13 +143,13 @@ async fn main() {
let res = match opt.cmd {
Command::Server => server::run_server(opt.config_file, opt.secrets).await,
Command::OfflineRepair(repair_opt) => {
cli::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await
cli::local::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await
}
Command::ConvertDb(conv_opt) => {
cli::convert_db::do_conversion(conv_opt).map_err(From::from)
cli::local::convert_db::do_conversion(conv_opt).map_err(From::from)
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
cli::init::node_id_command(opt.config_file, node_id_opt.quiet)
cli::local::init::node_id_command(opt.config_file, node_id_opt.quiet)
}
Command::AdminApiSchema => {
println!(
@ -260,7 +259,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
(id, addrs[0], false)
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
.err_context(cli::init::READ_KEY_ERROR)?;
.err_context(cli::local::init::READ_KEY_ERROR)?;
if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) {
use std::net::ToSocketAddrs;
let a = a
@ -289,11 +288,9 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct full-length node ID (public key).")?;
}
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into());
let cli = cli_v2::Cli {
system_rpc_endpoint,
let cli = cli::remote::Cli {
proxy_rpc_endpoint,
rpc_host: id,
};

View file

@ -267,20 +267,9 @@ impl LayoutHistory {
changed
}
pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> {
match version {
None => {
let error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
return Err(Error::Message(error.into()));
}
Some(v) => {
if v != self.current().version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
}
pub fn apply_staged_changes(mut self, version: u64) -> Result<(Self, Message), Error> {
if version != self.current().version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
// Compute new version and add it to history

View file

@ -1,5 +1,3 @@
use std::fmt;
use bytesize::ByteSize;
use garage_util::crdt::{AutoCrdt, Crdt};
@ -397,30 +395,6 @@ impl NodeRole {
}
}
impl fmt::Display for ZoneRedundancy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ZoneRedundancy::Maximum => write!(f, "maximum"),
ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
}
}
}
impl core::str::FromStr for ZoneRedundancy {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
x => {
let v = x
.parse::<usize>()
.map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
Ok(ZoneRedundancy::AtLeast(v))
}
}
}
}
impl UpdateTracker {
fn merge(&mut self, other: &UpdateTracker) -> bool {
let mut changed = false;
@ -455,7 +429,7 @@ impl UpdateTracker {
}
}
pub fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 {
fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 {
storage_nodes
.iter()
.map(|x| self.get(x, min_version))

View file

@ -124,7 +124,7 @@ fn test_assignment() {
let mut cl = LayoutHistory::new(ReplicationFactor::new(3).unwrap());
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3);
let v = cl.current().version;
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
let (mut cl, msg) = cl.apply_staged_changes(v + 1).unwrap();
show_msg(&msg);
assert_eq!(cl.check(), Ok(()));
assert!(check_against_naive(cl.current()).unwrap());
@ -133,7 +133,7 @@ fn test_assignment() {
node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"];
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 2);
let v = cl.current().version;
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
let (mut cl, msg) = cl.apply_staged_changes(v + 1).unwrap();
show_msg(&msg);
assert_eq!(cl.check(), Ok(()));
assert!(check_against_naive(cl.current()).unwrap());
@ -141,7 +141,7 @@ fn test_assignment() {
node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000];
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3);
let v = cl.current().version;
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
let (mut cl, msg) = cl.apply_staged_changes(v + 1).unwrap();
show_msg(&msg);
assert_eq!(cl.check(), Ok(()));
assert!(check_against_naive(cl.current()).unwrap());
@ -151,7 +151,7 @@ fn test_assignment() {
];
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 1);
let v = cl.current().version;
let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
let (cl, msg) = cl.apply_staged_changes(v + 1).unwrap();
show_msg(&msg);
assert_eq!(cl.check(), Ok(()));
assert!(check_against_naive(cl.current()).unwrap());