bg var operation on all nodes at once

This commit is contained in:
Alex 2023-01-04 13:25:57 +01:00
parent f3f27293df
commit 29dbcb8278
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
4 changed files with 117 additions and 23 deletions

View file

@ -59,7 +59,7 @@ pub enum AdminRpc {
HashMap<usize, garage_util::background::WorkerInfo>, HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt, WorkerListOpt,
), ),
WorkerVars(Vec<(String, String)>), WorkerVars(Vec<(Uuid, String, String)>),
WorkerInfo(usize, garage_util::background::WorkerInfo), WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>), BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo { BlockInfo {
@ -943,30 +943,104 @@ impl AdminRpcHandler {
.clone(); .clone();
Ok(AdminRpc::WorkerInfo(*tid, info)) Ok(AdminRpc::WorkerInfo(*tid, info))
} }
WorkerOperation::Get { variable } => { WorkerOperation::Get {
if let Some(v) = variable { all_nodes,
Ok(AdminRpc::WorkerVars(vec![( variable,
v.clone(), } => self.handle_get_var(*all_nodes, variable).await,
self.garage.bg_vars.get(&v)?, WorkerOperation::Set {
)])) all_nodes,
} else { variable,
Ok(AdminRpc::WorkerVars( value,
self.garage } => self.handle_set_var(*all_nodes, variable, value).await,
.bg_vars }
.get_all() }
.into_iter()
.map(|(k, v)| (k.to_string(), v)) async fn handle_get_var(
.collect(), &self,
)) all_nodes: bool,
variable: &Option<String>,
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
let ring = self.garage.system.ring.borrow().clone();
for node in ring.layout.node_ids().iter() {
let node = (*node).into();
match self
.endpoint
.call(
&node,
AdminRpc::Worker(WorkerOperation::Get {
all_nodes: false,
variable: variable.clone(),
}),
PRIO_NORMAL,
)
.await??
{
AdminRpc::WorkerVars(v) => ret.extend(v),
m => return Err(GarageError::unexpected_rpc_message(m).into()),
} }
} }
WorkerOperation::Set { variable, value } => { Ok(AdminRpc::WorkerVars(ret))
self.garage.bg_vars.set(&variable, &value)?; } else {
Ok(AdminRpc::Ok(format!("{} was set to {}", variable, value))) #[allow(clippy::collapsible_else_if)]
if let Some(v) = variable {
Ok(AdminRpc::WorkerVars(vec![(
self.garage.system.id,
v.clone(),
self.garage.bg_vars.get(v)?,
)]))
} else {
let mut vars = self.garage.bg_vars.get_all();
vars.sort();
Ok(AdminRpc::WorkerVars(
vars.into_iter()
.map(|(k, v)| (self.garage.system.id, k.to_string(), v))
.collect(),
))
} }
} }
} }
async fn handle_set_var(
&self,
all_nodes: bool,
variable: &str,
value: &str,
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
let ring = self.garage.system.ring.borrow().clone();
for node in ring.layout.node_ids().iter() {
let node = (*node).into();
match self
.endpoint
.call(
&node,
AdminRpc::Worker(WorkerOperation::Set {
all_nodes: false,
variable: variable.to_string(),
value: value.to_string(),
}),
PRIO_NORMAL,
)
.await??
{
AdminRpc::WorkerVars(v) => ret.extend(v),
m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
}
Ok(AdminRpc::WorkerVars(ret))
} else {
self.garage.bg_vars.set(variable, value)?;
Ok(AdminRpc::WorkerVars(vec![(
self.garage.system.id,
variable.to_string(),
value.to_string(),
)]))
}
}
// ================ BLOCK COMMANDS ==================== // ================ BLOCK COMMANDS ====================
async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> { async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {

View file

@ -519,10 +519,24 @@ pub enum WorkerOperation {
Info { tid: usize }, Info { tid: usize },
/// Get worker parameter /// Get worker parameter
#[structopt(name = "get", version = garage_version())] #[structopt(name = "get", version = garage_version())]
Get { variable: Option<String> }, Get {
/// Gather variable values from all nodes
#[structopt(short = "a", long = "all-nodes")]
all_nodes: bool,
/// Variable name to get, or none to get all variables
variable: Option<String>,
},
/// Set worker parameter /// Set worker parameter
#[structopt(name = "set", version = garage_version())] #[structopt(name = "set", version = garage_version())]
Set { variable: String, value: String }, Set {
/// Set variable values on all nodes
#[structopt(short = "a", long = "all-nodes")]
all_nodes: bool,
/// Variable node to set
variable: String,
/// Value to set the variable to
value: String,
},
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]

View file

@ -357,10 +357,10 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) {
format_table(table); format_table(table);
} }
pub fn print_worker_vars(wv: Vec<(String, String)>) { pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) {
let table = wv let table = wv
.into_iter() .into_iter()
.map(|(k, v)| format!("{}\t{}", k, v)) .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
format_table(table); format_table(table);
} }

View file

@ -71,6 +71,12 @@ impl BgVars {
} }
} }
impl Default for BgVars {
fn default() -> Self {
Self::new()
}
}
// ---- // ----
trait BgVarTrait: Send + Sync + 'static { trait BgVarTrait: Send + Sync + 'static {