admin api: implement GetClusterLayoutHistory and use it in CLI
All checks were successful
ci/woodpecker/push/debug Pipeline was successful

This commit is contained in:
Alex 2025-03-06 18:33:05 +01:00
parent 004866caac
commit 3d94eb8d4b
7 changed files with 340 additions and 114 deletions

View file

@ -512,6 +512,30 @@
}
}
},
"/v2/GetClusterLayoutHistory": {
"get": {
"tags": [
"Cluster layout"
],
"description": "\nReturns the history of layouts in the cluster\n ",
"operationId": "GetClusterLayoutHistory",
"responses": {
"200": {
"description": "Cluster layout history",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/GetClusterLayoutHistoryResponse"
}
}
}
},
"500": {
"description": "Internal server error"
}
}
}
},
"/v2/GetClusterStatistics": {
"get": {
"tags": [
@ -1600,6 +1624,43 @@
}
}
},
"ClusterLayoutVersion": {
"type": "object",
"required": [
"version",
"status",
"storageNodes",
"gatewayNodes"
],
"properties": {
"gatewayNodes": {
"type": "integer",
"format": "int64",
"minimum": 0
},
"status": {
"$ref": "#/components/schemas/ClusterLayoutVersionStatus"
},
"storageNodes": {
"type": "integer",
"format": "int64",
"minimum": 0
},
"version": {
"type": "integer",
"format": "int64",
"minimum": 0
}
}
},
"ClusterLayoutVersionStatus": {
"type": "string",
"enum": [
"Current",
"Draining",
"Historical"
]
},
"ConnectClusterNodesRequest": {
"type": "array",
"items": {
@ -1894,6 +1955,44 @@
}
}
},
"GetClusterLayoutHistoryResponse": {
"type": "object",
"required": [
"currentVersion",
"minAck",
"versions"
],
"properties": {
"currentVersion": {
"type": "integer",
"format": "int64",
"minimum": 0
},
"minAck": {
"type": "integer",
"format": "int64",
"minimum": 0
},
"updateTrackers": {
"type": [
"object",
"null"
],
"additionalProperties": {
"$ref": "#/components/schemas/NodeUpdateTrackers"
},
"propertyNames": {
"type": "string"
}
},
"versions": {
"type": "array",
"items": {
"$ref": "#/components/schemas/ClusterLayoutVersion"
}
}
}
},
"GetClusterLayoutResponse": {
"type": "object",
"required": [
@ -3060,6 +3159,31 @@
}
]
},
"NodeUpdateTrackers": {
"type": "object",
"required": [
"ack",
"sync",
"syncAck"
],
"properties": {
"ack": {
"type": "integer",
"format": "int64",
"minimum": 0
},
"sync": {
"type": "integer",
"format": "int64",
"minimum": 0
},
"syncAck": {
"type": "integer",
"format": "int64",
"minimum": 0
}
}
},
"PreviewClusterLayoutChangesResponse": {
"oneOf": [
{

View file

@ -51,6 +51,7 @@ admin_endpoints![
// Layout operations
GetClusterLayout,
GetClusterLayoutHistory,
UpdateClusterLayout,
PreviewClusterLayoutChanges,
ApplyClusterLayout,
@ -330,6 +331,57 @@ pub enum ZoneRedundancy {
Maximum,
}
// ---- GetClusterLayoutHistory ----
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetClusterLayoutHistoryRequest;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct GetClusterLayoutHistoryResponse {
pub current_version: u64,
pub min_ack: u64,
pub versions: Vec<ClusterLayoutVersion>,
pub update_trackers: Option<HashMap<String, NodeUpdateTrackers>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ClusterLayoutVersion {
pub version: u64,
pub status: ClusterLayoutVersionStatus,
pub storage_nodes: u64,
pub gateway_nodes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub enum ClusterLayoutVersionStatus {
Current,
Draining,
Historical,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct NodeUpdateTrackers {
pub ack: u64,
pub sync: u64,
pub sync_ack: u64,
}
// ---- UpdateClusterLayout ----
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct UpdateClusterLayoutRequest {
#[serde(default)]
pub roles: Vec<NodeRoleChange>,
#[serde(default)]
pub parameters: Option<LayoutParameters>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct UpdateClusterLayoutResponse(pub GetClusterLayoutResponse);
// ---- PreviewClusterLayoutChanges ----
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -347,19 +399,6 @@ pub enum PreviewClusterLayoutChangesResponse {
},
}
// ---- UpdateClusterLayout ----
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct UpdateClusterLayoutRequest {
#[serde(default)]
pub roles: Vec<NodeRoleChange>,
#[serde(default)]
pub parameters: Option<LayoutParameters>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct UpdateClusterLayoutResponse(pub GetClusterLayoutResponse);
// ---- ApplyClusterLayout ----
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]

View file

@ -240,6 +240,89 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp
}
}
impl RequestHandler for GetClusterLayoutHistoryRequest {
type Response = GetClusterLayoutHistoryResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetClusterLayoutHistoryResponse, Error> {
let layout = garage.system.cluster_layout();
let layout = layout.inner();
let min_stored = layout.min_stored();
let versions = layout
.versions
.iter()
.rev()
.chain(layout.old_versions.iter().rev())
.map(|ver| {
let status = if ver.version == layout.current().version {
ClusterLayoutVersionStatus::Current
} else if ver.version >= min_stored {
ClusterLayoutVersionStatus::Draining
} else {
ClusterLayoutVersionStatus::Historical
};
ClusterLayoutVersion {
version: ver.version,
status,
storage_nodes: ver
.roles
.items()
.iter()
.filter(
|(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_some()),
)
.count() as u64,
gateway_nodes: ver
.roles
.items()
.iter()
.filter(
|(_, _, x)| matches!(x, layout::NodeRoleV(Some(c)) if c.capacity.is_none()),
)
.count() as u64,
}
})
.collect::<Vec<_>>();
let all_nodes = layout.get_all_nodes();
let min_ack = layout
.update_trackers
.ack_map
.min_among(&all_nodes, layout.min_stored());
let update_trackers = if layout.versions.len() > 1 {
Some(
all_nodes
.iter()
.map(|node| {
(
hex::encode(&node),
NodeUpdateTrackers {
ack: layout.update_trackers.ack_map.get(node, min_stored),
sync: layout.update_trackers.sync_map.get(node, min_stored),
sync_ack: layout.update_trackers.sync_ack_map.get(node, min_stored),
},
)
})
.collect(),
)
} else {
None
};
Ok(GetClusterLayoutHistoryResponse {
current_version: layout.current().version,
min_ack,
versions,
update_trackers,
})
}
}
// ----
// ---- update functions ----

View file

@ -88,6 +88,19 @@ Returns the cluster's current layout, including:
)]
fn GetClusterLayout() -> () {}
#[utoipa::path(get,
path = "/v2/GetClusterLayoutHistory",
tag = "Cluster layout",
description = "
Returns the history of layouts in the cluster
",
responses(
(status = 200, description = "Cluster layout history", body = GetClusterLayoutHistoryResponse),
(status = 500, description = "Internal server error")
),
)]
fn GetClusterLayoutHistory() -> () {}
#[utoipa::path(post,
path = "/v2/UpdateClusterLayout",
tag = "Cluster layout",
@ -700,6 +713,7 @@ impl Modify for SecurityAddon {
ConnectClusterNodes,
// Layout operations
GetClusterLayout,
GetClusterLayoutHistory,
UpdateClusterLayout,
PreviewClusterLayoutChanges,
ApplyClusterLayout,

View file

@ -36,6 +36,7 @@ impl AdminApiRequest {
POST ConnectClusterNodes (body),
// Layout endpoints
GET GetClusterLayout (),
GET GetClusterLayoutHistory (),
POST UpdateClusterLayout (body),
POST PreviewClusterLayoutChanges (),
POST ApplyClusterLayout (body),

View file

@ -1,4 +1,3 @@
use format_table::format_table;
use garage_util::error::*;
use garage_rpc::layout::*;
@ -7,100 +6,6 @@ use garage_rpc::*;
use crate::cli::structs::*;
pub async fn cmd_layout_history(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<(), Error> {
let layout = fetch_layout(rpc_cli, rpc_host).await?;
let min_stored = layout.min_stored();
println!("==== LAYOUT HISTORY ====");
let mut table = vec!["Version\tStatus\tStorage nodes\tGateway nodes".to_string()];
for ver in layout
.versions
.iter()
.rev()
.chain(layout.old_versions.iter().rev())
{
let status = if ver.version == layout.current().version {
"current"
} else if ver.version >= min_stored {
"draining"
} else {
"historical"
};
table.push(format!(
"#{}\t{}\t{}\t{}",
ver.version,
status,
ver.roles
.items()
.iter()
.filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_some()))
.count(),
ver.roles
.items()
.iter()
.filter(|(_, _, x)| matches!(x, NodeRoleV(Some(c)) if c.capacity.is_none()))
.count(),
));
}
format_table(table);
println!();
if layout.versions.len() > 1 {
println!("==== UPDATE TRACKERS ====");
println!("Several layout versions are currently live in the cluster, and data is being migrated.");
println!(
"This is the internal data that Garage stores to know which nodes have what data."
);
println!();
let mut table = vec!["Node\tAck\tSync\tSync_ack".to_string()];
let all_nodes = layout.get_all_nodes();
for node in all_nodes.iter() {
table.push(format!(
"{:?}\t#{}\t#{}\t#{}",
node,
layout.update_trackers.ack_map.get(node, min_stored),
layout.update_trackers.sync_map.get(node, min_stored),
layout.update_trackers.sync_ack_map.get(node, min_stored),
));
}
table[1..].sort();
format_table(table);
let min_ack = layout
.update_trackers
.ack_map
.min_among(&all_nodes, layout.min_stored());
println!();
println!(
"If some nodes are not catching up to the latest layout version in the update trackers,"
);
println!("it might be because they are offline or unable to complete a sync successfully.");
if min_ack < layout.current().version {
println!(
"You may force progress using `garage layout skip-dead-nodes --version {}`",
layout.current().version
);
} else {
println!(
"You may force progress using `garage layout skip-dead-nodes --version {} --allow-missing-data`.",
layout.current().version
);
}
} else {
println!("Your cluster is currently in a stable state with a single live layout version.");
println!("No metadata migration is in progress. Note that the migration of data blocks is not tracked,");
println!(
"so you might want to keep old nodes online until their data directories become empty."
);
}
Ok(())
}
pub async fn cmd_layout_skip_dead_nodes(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
@ -162,7 +67,7 @@ pub async fn cmd_layout_skip_dead_nodes(
// --- utility ---
pub async fn fetch_status(
async fn fetch_status(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<Vec<KnownNodeInfo>, Error> {
@ -175,7 +80,7 @@ pub async fn fetch_status(
}
}
pub async fn fetch_layout(
async fn fetch_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<LayoutHistory, Error> {
@ -188,7 +93,7 @@ pub async fn fetch_layout(
}
}
pub async fn send_layout(
async fn send_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
layout: LayoutHistory,

View file

@ -19,11 +19,9 @@ impl Cli {
LayoutOperation::Config(config_opt) => self.cmd_config_layout(config_opt).await,
LayoutOperation::Apply(apply_opt) => self.cmd_apply_layout(apply_opt).await,
LayoutOperation::Revert(revert_opt) => self.cmd_revert_layout(revert_opt).await,
LayoutOperation::History => self.cmd_layout_history().await,
// TODO
LayoutOperation::History => {
cli_v1::cmd_layout_history(&self.system_rpc_endpoint, self.rpc_host).await
}
LayoutOperation::SkipDeadNodes(assume_sync_opt) => {
cli_v1::cmd_layout_skip_dead_nodes(
&self.system_rpc_endpoint,
@ -244,6 +242,68 @@ To know the correct value of the new layout version, invoke `garage layout show`
println!("All proposed role changes in cluster layout have been canceled.");
Ok(())
}
pub async fn cmd_layout_history(&self) -> Result<(), Error> {
let history = self.api_request(GetClusterLayoutHistoryRequest).await?;
println!("==== LAYOUT HISTORY ====");
let mut table = vec!["Version\tStatus\tStorage nodes\tGateway nodes".to_string()];
for ver in history.versions.iter() {
table.push(format!(
"#{}\t{:?}\t{}\t{}",
ver.version, ver.status, ver.storage_nodes, ver.gateway_nodes,
));
}
format_table(table);
println!();
if let Some(update_trackers) = history.update_trackers {
println!("==== UPDATE TRACKERS ====");
println!("Several layout versions are currently live in the cluster, and data is being migrated.");
println!(
"This is the internal data that Garage stores to know which nodes have what data."
);
println!();
let mut table = vec!["Node\tAck\tSync\tSync_ack".to_string()];
for (node, trackers) in update_trackers.iter() {
table.push(format!(
"{:.16}\t#{}\t#{}\t#{}",
node, trackers.ack, trackers.sync, trackers.sync_ack,
));
}
table[1..].sort();
format_table(table);
println!();
println!(
"If some nodes are not catching up to the latest layout version in the update trackers,"
);
println!(
"it might be because they are offline or unable to complete a sync successfully."
);
if history.min_ack < history.current_version {
println!(
"You may force progress using `garage layout skip-dead-nodes --version {}`",
history.current_version
);
} else {
println!(
"You may force progress using `garage layout skip-dead-nodes --version {} --allow-missing-data`.",
history.current_version
);
}
} else {
println!(
"Your cluster is currently in a stable state with a single live layout version."
);
println!("No metadata migration is in progress. Note that the migration of data blocks is not tracked,");
println!(
"so you might want to keep old nodes online until their data directories become empty."
);
}
Ok(())
}
}
// --------------------------