admin api: implement SetWorkerVariable
All checks were successful
ci/woodpecker/pr/debug Pipeline was successful
ci/woodpecker/push/debug Pipeline was successful

This commit is contained in:
Alex 2025-01-30 20:39:03 +01:00
parent 783c1e1c70
commit cf30ae8b99
6 changed files with 71 additions and 57 deletions

View file

@ -78,9 +78,10 @@ admin_endpoints![
// Worker operations
GetWorkerVariable,
SetWorkerVariable,
];
local_admin_endpoints![GetWorkerVariable,];
local_admin_endpoints![GetWorkerVariable, SetWorkerVariable,];
// **********************************************
// Special endpoints
@ -594,6 +595,8 @@ pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
// Worker operations
// **********************************************
// ---- GetWorkerVariable ----
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalGetWorkerVariableRequest {
pub variable: Option<String>,
@ -601,3 +604,17 @@ pub struct LocalGetWorkerVariableRequest {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalGetWorkerVariableResponse(pub HashMap<String, String>);
// ---- SetWorkerVariable ----
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalSetWorkerVariableRequest {
pub variable: String,
pub value: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalSetWorkerVariableResponse {
pub variable: String,
pub value: String,
}

View file

@ -30,3 +30,21 @@ impl RequestHandler for LocalGetWorkerVariableRequest {
Ok(LocalGetWorkerVariableResponse(res))
}
}
#[async_trait]
impl RequestHandler for LocalSetWorkerVariableRequest {
type Response = LocalSetWorkerVariableResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalSetWorkerVariableResponse, Error> {
garage.bg_vars.set(&self.variable, &self.value)?;
Ok(LocalSetWorkerVariableResponse {
variable: self.variable,
value: self.value,
})
}
}

View file

@ -46,7 +46,6 @@ pub enum AdminRpc {
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
),
WorkerVars(Vec<(Uuid, String, String)>),
WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo {
@ -354,54 +353,10 @@ impl AdminRpcHandler {
.clone();
Ok(AdminRpc::WorkerInfo(*tid, info))
}
WorkerOperation::Set {
all_nodes,
variable,
value,
} => self.handle_set_var(*all_nodes, variable, value).await,
_ => unreachable!(),
}
}
async fn handle_set_var(
&self,
all_nodes: bool,
variable: &str,
value: &str,
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in all_nodes.iter() {
let node = (*node).into();
match self
.endpoint
.call(
&node,
AdminRpc::Worker(WorkerOperation::Set {
all_nodes: false,
variable: variable.to_string(),
value: value.to_string(),
}),
PRIO_NORMAL,
)
.await??
{
AdminRpc::WorkerVars(v) => ret.extend(v),
m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
}
Ok(AdminRpc::WorkerVars(ret))
} else {
self.garage.bg_vars.set(variable, value)?;
Ok(AdminRpc::WorkerVars(vec![(
self.garage.system.id,
variable.to_string(),
value.to_string(),
)]))
}
}
// ================ META DB COMMANDS ====================
async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> {

View file

@ -20,9 +20,6 @@ pub async fn cmd_admin(
AdminRpc::WorkerList(wi, wlo) => {
print_worker_list(wi, wlo);
}
AdminRpc::WorkerVars(wv) => {
print_worker_vars(wv);
}
AdminRpc::WorkerInfo(tid, wi) => {
print_worker_info(tid, wi);
}

View file

@ -126,14 +126,6 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) {
format_table(table);
}
pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) {
let table = wv
.into_iter()
.map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v))
.collect::<Vec<_>>();
format_table(table);
}
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec();
let tf = timeago::Formatter::new();

View file

@ -15,6 +15,11 @@ impl Cli {
all_nodes,
variable,
} => self.cmd_get_var(all_nodes, variable).await,
WorkerOperation::Set {
all_nodes,
variable,
value,
} => self.cmd_set_var(all_nodes, variable, value).await,
wo => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
@ -51,4 +56,34 @@ impl Cli {
Ok(())
}
pub async fn cmd_set_var(
&self,
all: bool,
variable: String,
value: String,
) -> Result<(), Error> {
let res = self
.api_request(SetWorkerVariableRequest {
node: if all {
"*".to_string()
} else {
hex::encode(self.rpc_host)
},
body: LocalSetWorkerVariableRequest { variable, value },
})
.await?;
let mut table = vec![];
for (node, kv) in res.success.iter() {
table.push(format!("{:.16}\t{}\t{}", node, kv.variable, kv.value));
}
format_table(table);
for (node, err) in res.error.iter() {
eprintln!("{:.16}: error: {}", node, err);
}
Ok(())
}
}