admin api: implement ClusterLayoutSkipDeadNodes and use it in CLI
All checks were successful
ci/woodpecker/push/debug Pipeline was successful

This commit is contained in:
Alex 2025-03-06 18:49:56 +01:00
parent 3d94eb8d4b
commit 0951b5db75
10 changed files with 190 additions and 126 deletions

View file

@ -157,6 +157,40 @@
}
}
},
"/v2/ClusterLayoutSkipDeadNodes": {
"post": {
"tags": [
"Cluster layout"
],
"description": "Force progress in layout update trackers",
"operationId": "ClusterLayoutSkipDeadNodes",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ClusterLayoutSkipDeadNodesRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "Request has been taken into account",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ClusterLayoutSkipDeadNodesResponse"
}
}
}
},
"500": {
"description": "Internal server error"
}
}
}
},
"/v2/ConnectClusterNodes": {
"post": {
"tags": [
@ -1624,6 +1658,44 @@
}
}
},
"ClusterLayoutSkipDeadNodesRequest": {
"type": "object",
"required": [
"version",
"allowMissingData"
],
"properties": {
"allowMissingData": {
"type": "boolean"
},
"version": {
"type": "integer",
"format": "int64",
"minimum": 0
}
}
},
"ClusterLayoutSkipDeadNodesResponse": {
"type": "object",
"required": [
"ackUpdated",
"syncUpdated"
],
"properties": {
"ackUpdated": {
"type": "array",
"items": {
"type": "string"
}
},
"syncUpdated": {
"type": "array",
"items": {
"type": "string"
}
}
}
},
"ClusterLayoutVersion": {
"type": "object",
"required": [

View file

@ -56,6 +56,7 @@ admin_endpoints![
PreviewClusterLayoutChanges,
ApplyClusterLayout,
RevertClusterLayout,
ClusterLayoutSkipDeadNodes,
// Access key operations
ListKeys,
@ -422,6 +423,22 @@ pub struct RevertClusterLayoutRequest;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct RevertClusterLayoutResponse(pub GetClusterLayoutResponse);
// ---- ClusterLayoutSkipDeadNodes ----
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ClusterLayoutSkipDeadNodesRequest {
pub version: u64,
pub allow_missing_data: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ClusterLayoutSkipDeadNodesResponse {
pub ack_updated: Vec<String>,
pub sync_updated: Vec<String>,
}
// **********************************************
// Access key operations
// **********************************************

View file

@ -465,6 +465,67 @@ impl RequestHandler for RevertClusterLayoutRequest {
}
}
impl RequestHandler for ClusterLayoutSkipDeadNodesRequest {
type Response = ClusterLayoutSkipDeadNodesResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<ClusterLayoutSkipDeadNodesResponse, Error> {
let status = garage.system.get_known_nodes();
let mut layout = garage.system.cluster_layout().inner().clone();
let mut ack_updated = vec![];
let mut sync_updated = vec![];
if layout.versions.len() == 1 {
return Err(Error::bad_request(
"This command cannot be called when there is only one live cluster layout version",
));
}
let min_v = layout.min_stored();
if self.version <= min_v || self.version > layout.current().version {
return Err(Error::bad_request(format!(
"Invalid version, you may use the following version numbers: {}",
(min_v + 1..=layout.current().version)
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(" ")
)));
}
let all_nodes = layout.get_all_nodes();
for node in all_nodes.iter() {
// Update ACK tracker for dead nodes or for all nodes if --allow-missing-data
if self.allow_missing_data || !status.iter().any(|x| x.id == *node && x.is_up) {
if layout.update_trackers.ack_map.set_max(*node, self.version) {
ack_updated.push(hex::encode(node));
}
}
// If --allow-missing-data, update SYNC tracker for all nodes.
if self.allow_missing_data {
if layout.update_trackers.sync_map.set_max(*node, self.version) {
sync_updated.push(hex::encode(node));
}
}
}
garage
.system
.layout_manager
.update_cluster_layout(&layout)
.await?;
Ok(ClusterLayoutSkipDeadNodesResponse {
ack_updated,
sync_updated,
})
}
}
// ----
impl From<layout::ZoneRedundancy> for ZoneRedundancy {

View file

@ -172,6 +172,18 @@ fn ApplyClusterLayout() -> () {}
)]
fn RevertClusterLayout() -> () {}
#[utoipa::path(post,
path = "/v2/ClusterLayoutSkipDeadNodes",
tag = "Cluster layout",
description = "Force progress in layout update trackers",
request_body = ClusterLayoutSkipDeadNodesRequest,
responses(
(status = 200, description = "Request has been taken into account", body = ClusterLayoutSkipDeadNodesResponse),
(status = 500, description = "Internal server error")
),
)]
fn ClusterLayoutSkipDeadNodes() -> () {}
// **********************************************
// Access key operations
// **********************************************
@ -718,6 +730,7 @@ impl Modify for SecurityAddon {
PreviewClusterLayoutChanges,
ApplyClusterLayout,
RevertClusterLayout,
ClusterLayoutSkipDeadNodes,
// Key operations
ListKeys,
GetKeyInfo,

View file

@ -41,6 +41,7 @@ impl AdminApiRequest {
POST PreviewClusterLayoutChanges (),
POST ApplyClusterLayout (body),
POST RevertClusterLayout (),
POST ClusterLayoutSkipDeadNodes (body),
// API key endpoints
GET GetKeyInfo (query_opt::id, query_opt::search, parse_default(false)::show_secret_key),
POST UpdateKey (body_field, query::id),

View file

@ -1,109 +0,0 @@
use garage_util::error::*;
use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_rpc::*;
use crate::cli::structs::*;
pub async fn cmd_layout_skip_dead_nodes(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
opt: SkipDeadNodesOpt,
) -> Result<(), Error> {
let status = fetch_status(rpc_cli, rpc_host).await?;
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
if layout.versions.len() == 1 {
return Err(Error::Message(
"This command cannot be called when there is only one live cluster layout version"
.into(),
));
}
let min_v = layout.min_stored();
if opt.version <= min_v || opt.version > layout.current().version {
return Err(Error::Message(format!(
"Invalid version, you may use the following version numbers: {}",
(min_v + 1..=layout.current().version)
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(" ")
)));
}
let all_nodes = layout.get_all_nodes();
let mut did_something = false;
for node in all_nodes.iter() {
// Update ACK tracker for dead nodes or for all nodes if --allow-missing-data
if opt.allow_missing_data || !status.iter().any(|x| x.id == *node && x.is_up) {
if layout.update_trackers.ack_map.set_max(*node, opt.version) {
println!("Increased the ACK tracker for node {:?}", node);
did_something = true;
}
}
// If --allow-missing-data, update SYNC tracker for all nodes.
if opt.allow_missing_data {
if layout.update_trackers.sync_map.set_max(*node, opt.version) {
println!("Increased the SYNC tracker for node {:?}", node);
did_something = true;
}
}
}
if did_something {
send_layout(rpc_cli, rpc_host, layout).await?;
println!("Success.");
Ok(())
} else if !opt.allow_missing_data {
Err(Error::Message("Nothing was done, try passing the `--allow-missing-data` flag to force progress even when not enough nodes can complete a metadata sync.".into()))
} else {
Err(Error::Message(
"Sorry, there is nothing I can do for you. Please wait patiently. If you ask for help, please send the output of the `garage layout history` command.".into(),
))
}
}
// --- utility ---
async fn fetch_status(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<Vec<KnownNodeInfo>, Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
resp => Err(Error::unexpected_rpc_message(resp)),
}
}
async fn fetch_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<LayoutHistory, Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
resp => Err(Error::unexpected_rpc_message(resp)),
}
}
async fn send_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
layout: LayoutHistory,
) -> Result<(), Error> {
rpc_cli
.call(
&rpc_host,
SystemRpc::AdvertiseClusterLayout(layout),
PRIO_NORMAL,
)
.await??;
Ok(())
}

View file

@ -3,5 +3,3 @@ pub(crate) mod structs;
pub(crate) mod convert_db;
pub(crate) mod init;
pub(crate) mod repair;
pub(crate) mod layout;

View file

@ -6,7 +6,6 @@ use garage_util::error::*;
use garage_api_admin::api::*;
use garage_rpc::layout;
use crate::cli::layout as cli_v1;
use crate::cli::structs::*;
use crate::cli_v2::*;
@ -20,16 +19,7 @@ impl Cli {
LayoutOperation::Apply(apply_opt) => self.cmd_apply_layout(apply_opt).await,
LayoutOperation::Revert(revert_opt) => self.cmd_revert_layout(revert_opt).await,
LayoutOperation::History => self.cmd_layout_history().await,
// TODO
LayoutOperation::SkipDeadNodes(assume_sync_opt) => {
cli_v1::cmd_layout_skip_dead_nodes(
&self.system_rpc_endpoint,
self.rpc_host,
assume_sync_opt,
)
.await
}
LayoutOperation::SkipDeadNodes(opt) => self.cmd_skip_dead_nodes(opt).await,
}
}
@ -304,6 +294,31 @@ To know the correct value of the new layout version, invoke `garage layout show`
Ok(())
}
pub async fn cmd_skip_dead_nodes(&self, opt: SkipDeadNodesOpt) -> Result<(), Error> {
let res = self
.api_request(ClusterLayoutSkipDeadNodesRequest {
version: opt.version,
allow_missing_data: opt.allow_missing_data,
})
.await?;
if !res.sync_updated.is_empty() || !res.ack_updated.is_empty() {
for node in res.ack_updated.iter() {
println!("Increased the ACK tracker for node {:.16}", node);
}
for node in res.sync_updated.iter() {
println!("Increased the SYNC tracker for node {:.16}", node);
}
Ok(())
} else if !opt.allow_missing_data {
Err(Error::Message("Nothing was done, try passing the `--allow-missing-data` flag to force progress even when not enough nodes can complete a metadata sync.".into()))
} else {
Err(Error::Message(
"Sorry, there is nothing I can do for you. Please wait patiently. If you ask for help, please send the output of the `garage layout history` command.".into(),
))
}
}
}
// --------------------------

View file

@ -13,7 +13,6 @@ use std::time::Duration;
use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
use garage_api_admin::api::*;
@ -23,7 +22,6 @@ use garage_api_admin::RequestHandler;
use crate::cli::structs::*;
pub struct Cli {
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
pub rpc_host: NodeID,
}

View file

@ -289,11 +289,9 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
Err(e).err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct full-length node ID (public key).")?;
}
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into());
let cli = cli_v2::Cli {
system_rpc_endpoint,
proxy_rpc_endpoint,
rpc_host: id,
};